mirror of
https://github.com/weyne85/rustdesk.git
synced 2025-10-29 17:00:05 +00:00
refactor to prepare for tcp rendezvous
This commit is contained in:
@@ -9,29 +9,34 @@ use std::{
|
||||
|
||||
use uuid::Uuid;
|
||||
|
||||
use hbb_common::tcp::FramedStream;
|
||||
use hbb_common::{
|
||||
allow_err,
|
||||
anyhow::bail,
|
||||
anyhow::{self, bail},
|
||||
config::{Config, CONNECT_TIMEOUT, READ_TIMEOUT, REG_INTERVAL, RENDEZVOUS_PORT},
|
||||
futures::future::join_all,
|
||||
log,
|
||||
protobuf::Message as _,
|
||||
rendezvous_proto::*,
|
||||
sleep,
|
||||
socket_client::{self, is_ipv4},
|
||||
socket_client::{self, connect_tcp, is_ipv4},
|
||||
tcp::FramedStream,
|
||||
tokio::{
|
||||
self, select,
|
||||
time::{interval, Duration},
|
||||
},
|
||||
udp::FramedSocket,
|
||||
AddrMangle, ResultType,
|
||||
AddrMangle, IntoTargetAddr, ResultType, TargetAddr,
|
||||
};
|
||||
|
||||
use crate::server::{check_zombie, new as new_server, ServerPtr};
|
||||
use crate::{
|
||||
check_port,
|
||||
server::{check_zombie, new as new_server, ServerPtr},
|
||||
};
|
||||
|
||||
type Message = RendezvousMessage;
|
||||
|
||||
const TIMER_OUT: Duration = Duration::from_secs(1);
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
static ref SOLVING_PK_MISMATCH: Arc<Mutex<String>> = Default::default();
|
||||
}
|
||||
@@ -39,7 +44,7 @@ static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RendezvousMediator {
|
||||
addr: hbb_common::tokio_socks::TargetAddr<'static>,
|
||||
addr: TargetAddr<'static>,
|
||||
host: String,
|
||||
host_prefix: String,
|
||||
last_id_pk_registry: String,
|
||||
@@ -112,8 +117,7 @@ impl RendezvousMediator {
|
||||
// crate::platform::linux_desktop_manager::stop_xdesktop();
|
||||
}
|
||||
|
||||
pub async fn start(server: ServerPtr, host: String) -> ResultType<()> {
|
||||
log::info!("start rendezvous mediator of {}", host);
|
||||
pub async fn start_udp(server: ServerPtr, host: String) -> ResultType<()> {
|
||||
let host_prefix: String = host
|
||||
.split(".")
|
||||
.next()
|
||||
@@ -125,16 +129,15 @@ impl RendezvousMediator {
|
||||
}
|
||||
})
|
||||
.unwrap_or(host.to_owned());
|
||||
let host = crate::check_port(&host, RENDEZVOUS_PORT);
|
||||
let host = check_port(&host, RENDEZVOUS_PORT);
|
||||
let (mut socket, addr) = socket_client::new_udp_for(&host, CONNECT_TIMEOUT).await?;
|
||||
let mut rz = Self {
|
||||
addr: addr,
|
||||
addr: addr.clone(),
|
||||
host: host.clone(),
|
||||
host_prefix,
|
||||
last_id_pk_registry: "".to_owned(),
|
||||
};
|
||||
|
||||
const TIMER_OUT: Duration = Duration::from_secs(1);
|
||||
let mut timer = interval(TIMER_OUT);
|
||||
let mut last_timer: Option<Instant> = None;
|
||||
const REG_TIMEOUT: i64 = 3_000;
|
||||
@@ -177,63 +180,8 @@ impl RendezvousMediator {
|
||||
n = socket.next() => {
|
||||
match n {
|
||||
Some(Ok((bytes, _))) => {
|
||||
if let Ok(msg_in) = Message::parse_from_bytes(&bytes) {
|
||||
match msg_in.union {
|
||||
Some(rendezvous_message::Union::RegisterPeerResponse(rpr)) => {
|
||||
update_latency();
|
||||
if rpr.request_pk {
|
||||
log::info!("request_pk received from {}", host);
|
||||
allow_err!(rz.register_pk(&mut socket).await);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Some(rendezvous_message::Union::RegisterPkResponse(rpr)) => {
|
||||
update_latency();
|
||||
match rpr.result.enum_value() {
|
||||
Ok(register_pk_response::Result::OK) => {
|
||||
Config::set_key_confirmed(true);
|
||||
Config::set_host_key_confirmed(&rz.host_prefix, true);
|
||||
*SOLVING_PK_MISMATCH.lock().unwrap() = "".to_owned();
|
||||
}
|
||||
Ok(register_pk_response::Result::UUID_MISMATCH) => {
|
||||
allow_err!(rz.handle_uuid_mismatch(&mut socket).await);
|
||||
}
|
||||
_ => {
|
||||
log::error!("unknown RegisterPkResponse");
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(rendezvous_message::Union::PunchHole(ph)) => {
|
||||
let rz = rz.clone();
|
||||
let server = server.clone();
|
||||
tokio::spawn(async move {
|
||||
allow_err!(rz.handle_punch_hole(ph, server).await);
|
||||
});
|
||||
}
|
||||
Some(rendezvous_message::Union::RequestRelay(rr)) => {
|
||||
let rz = rz.clone();
|
||||
let server = server.clone();
|
||||
tokio::spawn(async move {
|
||||
allow_err!(rz.handle_request_relay(rr, server).await);
|
||||
});
|
||||
}
|
||||
Some(rendezvous_message::Union::FetchLocalAddr(fla)) => {
|
||||
let rz = rz.clone();
|
||||
let server = server.clone();
|
||||
tokio::spawn(async move {
|
||||
allow_err!(rz.handle_intranet(fla, server).await);
|
||||
});
|
||||
}
|
||||
Some(rendezvous_message::Union::ConfigureUpdate(cu)) => {
|
||||
let v0 = Config::get_rendezvous_servers();
|
||||
Config::set_option("rendezvous-servers".to_owned(), cu.rendezvous_servers.join(","));
|
||||
Config::set_serial(cu.serial);
|
||||
if v0 != Config::get_rendezvous_servers() {
|
||||
Self::restart();
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
if let Ok(msg) = Message::parse_from_bytes(&bytes) {
|
||||
rz.handle_resp(msg.union, Sink::Framed(&mut socket, &addr), &server, &mut update_latency).await?;
|
||||
} else {
|
||||
log::debug!("Non-protobuf message bytes received: {:?}", bytes);
|
||||
}
|
||||
@@ -257,7 +205,7 @@ impl RendezvousMediator {
|
||||
let elapsed_resp = last_register_resp.map(|x| x.elapsed().as_millis() as i64).unwrap_or(REG_INTERVAL);
|
||||
let timeout = (elapsed_resp - last_register_sent.map(|x| x.elapsed().as_millis() as i64).unwrap_or(REG_INTERVAL)) > REG_TIMEOUT;
|
||||
if timeout || elapsed_resp >= REG_INTERVAL {
|
||||
allow_err!(rz.register_peer(&mut socket).await);
|
||||
allow_err!(rz.register_peer(Sink::Framed(&mut socket, &addr)).await);
|
||||
last_register_sent = now;
|
||||
if timeout {
|
||||
fails += 1;
|
||||
@@ -285,6 +233,113 @@ impl RendezvousMediator {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn handle_resp(
|
||||
&mut self,
|
||||
msg: Option<rendezvous_message::Union>,
|
||||
sink: Sink<'_>,
|
||||
server: &ServerPtr,
|
||||
update_latency: &mut impl FnMut(),
|
||||
) -> ResultType<()> {
|
||||
match msg {
|
||||
Some(rendezvous_message::Union::RegisterPeerResponse(rpr)) => {
|
||||
update_latency();
|
||||
if rpr.request_pk {
|
||||
log::info!("request_pk received from {}", self.host);
|
||||
allow_err!(self.register_pk(sink).await);
|
||||
}
|
||||
}
|
||||
Some(rendezvous_message::Union::RegisterPkResponse(rpr)) => {
|
||||
update_latency();
|
||||
match rpr.result.enum_value() {
|
||||
Ok(register_pk_response::Result::OK) => {
|
||||
Config::set_key_confirmed(true);
|
||||
Config::set_host_key_confirmed(&self.host_prefix, true);
|
||||
*SOLVING_PK_MISMATCH.lock().unwrap() = "".to_owned();
|
||||
}
|
||||
Ok(register_pk_response::Result::UUID_MISMATCH) => {
|
||||
allow_err!(self.handle_uuid_mismatch(sink).await);
|
||||
}
|
||||
_ => {
|
||||
log::error!("unknown RegisterPkResponse");
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(rendezvous_message::Union::PunchHole(ph)) => {
|
||||
let rz = self.clone();
|
||||
let server = server.clone();
|
||||
tokio::spawn(async move {
|
||||
allow_err!(rz.handle_punch_hole(ph, server).await);
|
||||
});
|
||||
}
|
||||
Some(rendezvous_message::Union::RequestRelay(rr)) => {
|
||||
let rz = self.clone();
|
||||
let server = server.clone();
|
||||
tokio::spawn(async move {
|
||||
allow_err!(rz.handle_request_relay(rr, server).await);
|
||||
});
|
||||
}
|
||||
Some(rendezvous_message::Union::FetchLocalAddr(fla)) => {
|
||||
let rz = self.clone();
|
||||
let server = server.clone();
|
||||
tokio::spawn(async move {
|
||||
allow_err!(rz.handle_intranet(fla, server).await);
|
||||
});
|
||||
}
|
||||
Some(rendezvous_message::Union::ConfigureUpdate(cu)) => {
|
||||
let v0 = Config::get_rendezvous_servers();
|
||||
Config::set_option(
|
||||
"rendezvous-servers".to_owned(),
|
||||
cu.rendezvous_servers.join(","),
|
||||
);
|
||||
Config::set_serial(cu.serial);
|
||||
if v0 != Config::get_rendezvous_servers() {
|
||||
Self::restart();
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start_tcp(server: ServerPtr, host: String) -> ResultType<()> {
|
||||
let mut conn = connect_tcp(check_port(&host, RENDEZVOUS_PORT), CONNECT_TIMEOUT).await?;
|
||||
let key = crate::get_key(true).await;
|
||||
crate::secure_tcp(&mut conn, &key).await?;
|
||||
let mut rz = Self {
|
||||
addr: conn.local_addr().into_target_addr()?,
|
||||
host: host.clone(),
|
||||
host_prefix: host.clone(),
|
||||
last_id_pk_registry: "".to_owned(),
|
||||
};
|
||||
let mut timer = interval(TIMER_OUT);
|
||||
loop {
|
||||
let mut update_latency = || {};
|
||||
select! {
|
||||
res = conn.next() => {
|
||||
let bytes = res.ok_or_else(|| anyhow::anyhow!("rendezvous server disconnected"))??;
|
||||
let msg = Message::parse_from_bytes(&bytes)?;
|
||||
rz.handle_resp(msg.union, Sink::Stream(&mut conn), &server, &mut update_latency).await?
|
||||
}
|
||||
_ = timer.tick() => {
|
||||
if SHOULD_EXIT.load(Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start(server: ServerPtr, host: String) -> ResultType<()> {
|
||||
log::info!("start rendezvous mediator of {}", host);
|
||||
if cfg!(debug_assertions) && option_env!("TEST_TCP").is_some() {
|
||||
Self::start_tcp(server, host).await
|
||||
} else {
|
||||
Self::start_udp(server, host).await
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_request_relay(&self, rr: RequestRelay, server: ServerPtr) -> ResultType<()> {
|
||||
self.create_relay(
|
||||
rr.socket_addr.into(),
|
||||
@@ -315,7 +370,7 @@ impl RendezvousMediator {
|
||||
secure,
|
||||
);
|
||||
|
||||
let mut socket = socket_client::connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
|
||||
let mut socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
|
||||
|
||||
let mut msg_out = Message::new();
|
||||
let mut rr = RelayResponse {
|
||||
@@ -360,7 +415,7 @@ impl RendezvousMediator {
|
||||
}
|
||||
let peer_addr = AddrMangle::decode(&fla.socket_addr);
|
||||
log::debug!("Handle intranet from {:?}", peer_addr);
|
||||
let mut socket = socket_client::connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
|
||||
let mut socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
|
||||
let local_addr = socket.local_addr();
|
||||
let local_addr: SocketAddr =
|
||||
format!("{}:{}", local_addr.ip(), local_addr.port()).parse()?;
|
||||
@@ -399,7 +454,7 @@ impl RendezvousMediator {
|
||||
let peer_addr = AddrMangle::decode(&ph.socket_addr);
|
||||
log::debug!("Punch hole to {:?}", peer_addr);
|
||||
let mut socket = {
|
||||
let socket = socket_client::connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
|
||||
let socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
|
||||
let local_addr = socket.local_addr();
|
||||
// key important here for punch hole to tell my gateway incoming peer is safe.
|
||||
// it can not be async here, because local_addr can not be reused, we must close the connection before use it again.
|
||||
@@ -423,7 +478,7 @@ impl RendezvousMediator {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn register_pk(&mut self, socket: &mut FramedSocket) -> ResultType<()> {
|
||||
async fn register_pk(&mut self, socket: Sink<'_>) -> ResultType<()> {
|
||||
let mut msg_out = Message::new();
|
||||
let pk = Config::get_key_pair().1;
|
||||
let uuid = hbb_common::get_uuid();
|
||||
@@ -435,11 +490,11 @@ impl RendezvousMediator {
|
||||
pk: pk.into(),
|
||||
..Default::default()
|
||||
});
|
||||
socket.send(&msg_out, self.addr.to_owned()).await?;
|
||||
socket.send(&msg_out).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_uuid_mismatch(&mut self, socket: &mut FramedSocket) -> ResultType<()> {
|
||||
async fn handle_uuid_mismatch(&mut self, socket: Sink<'_>) -> ResultType<()> {
|
||||
if self.last_id_pk_registry != Config::get_id() {
|
||||
return Ok(());
|
||||
}
|
||||
@@ -457,7 +512,7 @@ impl RendezvousMediator {
|
||||
self.register_pk(socket).await
|
||||
}
|
||||
|
||||
async fn register_peer(&mut self, socket: &mut FramedSocket) -> ResultType<()> {
|
||||
async fn register_peer(&mut self, socket: Sink<'_>) -> ResultType<()> {
|
||||
if !SOLVING_PK_MISMATCH.lock().unwrap().is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
@@ -481,7 +536,7 @@ impl RendezvousMediator {
|
||||
serial,
|
||||
..Default::default()
|
||||
});
|
||||
socket.send(&msg_out, self.addr.to_owned()).await?;
|
||||
socket.send(&msg_out).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -622,7 +677,7 @@ async fn create_online_stream() -> ResultType<FramedStream> {
|
||||
bail!("Invalid server address: {}", rendezvous_server);
|
||||
}
|
||||
let online_server = format!("{}:{}", tmp[0], port - 1);
|
||||
socket_client::connect_tcp(online_server, CONNECT_TIMEOUT).await
|
||||
connect_tcp(online_server, CONNECT_TIMEOUT).await
|
||||
}
|
||||
|
||||
async fn query_online_states_(
|
||||
@@ -680,6 +735,20 @@ async fn query_online_states_(
|
||||
}
|
||||
}
|
||||
|
||||
enum Sink<'a> {
|
||||
Framed(&'a mut FramedSocket, &'a TargetAddr<'a>),
|
||||
Stream(&'a mut FramedStream),
|
||||
}
|
||||
|
||||
impl Sink<'_> {
|
||||
async fn send(self, msg: &Message) -> ResultType<()> {
|
||||
match self {
|
||||
Sink::Framed(socket, addr) => socket.send(msg, addr.to_owned()).await,
|
||||
Sink::Stream(stream) => stream.send(msg).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use hbb_common::tokio;
|
||||
|
||||
Reference in New Issue
Block a user