From d0a54a6cc67cd5721fb9d95197cc46c18ccd5e27 Mon Sep 17 00:00:00 2001 From: rustdesk Date: Sun, 14 Jul 2024 05:47:42 +0800 Subject: [PATCH] hc --- libs/hbb_common/protos/rendezvous.proto | 8 ++ src/client.rs | 158 ++++++++++++++++++------ src/client/io_loop.rs | 7 +- src/common.rs | 3 + src/port_forward.rs | 5 +- src/rendezvous_mediator.rs | 17 +-- 6 files changed, 144 insertions(+), 54 deletions(-) diff --git a/libs/hbb_common/protos/rendezvous.proto b/libs/hbb_common/protos/rendezvous.proto index fac9aa433..49d737c69 100644 --- a/libs/hbb_common/protos/rendezvous.proto +++ b/libs/hbb_common/protos/rendezvous.proto @@ -21,6 +21,7 @@ message PunchHoleRequest { string licence_key = 3; ConnType conn_type = 4; string token = 5; + string version = 6; } message PunchHole { @@ -62,6 +63,10 @@ message RegisterPk { string old_id = 4; } +message HealthCheck { + string token = 1; +} + message RegisterPkResponse { enum Result { OK = 0; @@ -92,6 +97,7 @@ message PunchHoleResponse { bool is_local = 6; } string other_failure = 7; + int32 feedback = 8; } message ConfigUpdate { @@ -122,6 +128,7 @@ message RelayResponse { string refuse_reason = 6; string version = 7; string request_region = 8; + int32 feedback = 9; } message SoftwareUpdate { string url = 1; } @@ -190,5 +197,6 @@ message RendezvousMessage { OnlineRequest online_request = 23; OnlineResponse online_response = 24; KeyExchange key_exchange = 25; + HealthCheck hc = 26; } } diff --git a/src/client.rs b/src/client.rs index 4c3f53b03..d0c0c6635 100644 --- a/src/client.rs +++ b/src/client.rs @@ -22,12 +22,9 @@ use sha2::{Digest, Sha256}; use uuid::Uuid; pub use file_trait::FileManager; -#[cfg(windows)] -use hbb_common::tokio; #[cfg(not(feature = "flutter"))] #[cfg(not(any(target_os = "android", target_os = "ios")))] use hbb_common::tokio::sync::mpsc::UnboundedSender; -#[cfg(not(any(target_os = "android", target_os = "ios")))] use hbb_common::tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use hbb_common::{ allow_err, @@ -42,11 +39,14 @@ use hbb_common::{ protobuf::{Message as _, MessageField}, rand, rendezvous_proto::*, - socket_client, + socket_client::{connect_tcp, connect_tcp_local, ipv4_to_ipv6}, sodiumoxide::{base64, crypto::sign}, tcp::FramedStream, timeout, - tokio::time::Duration, + tokio::{ + self, + time::{interval, Duration, Instant}, + }, AddrMangle, ResultType, Stream, }; pub use helper::*; @@ -226,7 +226,7 @@ impl Client { token: &str, conn_type: ConnType, interface: impl Interface, - ) -> ResultType<(Stream, bool, Option>)> { + ) -> ResultType<((Stream, bool, Option>), (i32, String))> { debug_assert!(peer == interface.get_id()); interface.update_direct(None); interface.update_received(false); @@ -250,25 +250,26 @@ impl Client { token: &str, conn_type: ConnType, interface: impl Interface, - ) -> ResultType<(Stream, bool, Option>)> { + ) -> ResultType<((Stream, bool, Option>), (i32, String))> { if config::is_incoming_only() { bail!("Incoming only mode"); } // to-do: remember the port for each peer, so that we can retry easier if hbb_common::is_ip_str(peer) { return Ok(( - socket_client::connect_tcp(check_port(peer, RELAY_PORT + 1), CONNECT_TIMEOUT) - .await?, - true, - None, + ( + connect_tcp(check_port(peer, RELAY_PORT + 1), CONNECT_TIMEOUT).await?, + true, + None, + ), + (0, "".to_owned()), )); } // Allow connect to {domain}:{port} if hbb_common::is_domain_port_str(peer) { return Ok(( - socket_client::connect_tcp(peer, CONNECT_TIMEOUT).await?, - true, - None, + (connect_tcp(peer, CONNECT_TIMEOUT).await?, true, None), + (0, "".to_owned()), )); } @@ -295,13 +296,13 @@ impl Client { } }; - let mut socket = socket_client::connect_tcp(&*rendezvous_server, CONNECT_TIMEOUT).await; + let mut socket = connect_tcp(&*rendezvous_server, CONNECT_TIMEOUT).await; debug_assert!(!servers.contains(&rendezvous_server)); if socket.is_err() && !servers.is_empty() { log::info!("try the other servers: {:?}", servers); for server in servers { let server = check_port(server, RENDEZVOUS_PORT); - socket = socket_client::connect_tcp(&*server, CONNECT_TIMEOUT).await; + socket = connect_tcp(&*server, CONNECT_TIMEOUT).await; if socket.is_ok() { rendezvous_server = server; break; @@ -327,6 +328,7 @@ impl Client { let mut peer_nat_type = NatType::UNKNOWN_NAT; let my_nat_type = crate::get_nat_type(100).await; let mut is_local = false; + let mut feedback = 0; for i in 1..=3 { log::info!("#{} punch attempt with {}, id: {}", i, my_addr, peer); let mut msg_out = RendezvousMessage::new(); @@ -342,9 +344,11 @@ impl Client { nat_type: nat_type.into(), licence_key: key.to_owned(), conn_type: conn_type.into(), + version: crate::VERSION.to_owned(), ..Default::default() }); socket.send(&msg_out).await?; + // below timeout should not bigger than hbbs's connection timeout. if let Some(msg_in) = crate::get_next_nonkeyexchange_msg(&mut socket, Some(i * 6000)).await { @@ -375,6 +379,7 @@ impl Client { signed_id_pk = ph.pk.into(); relay_server = ph.relay_server; peer_addr = AddrMangle::decode(&ph.socket_addr); + feedback = ph.feedback; log::info!("Hole Punched {} = {}", peer, peer_addr); break; } @@ -395,9 +400,10 @@ impl Client { my_addr.is_ipv4(), ) .await?; + feedback = rr.feedback; let pk = Self::secure_connection(peer, signed_id_pk, key, &mut conn).await?; - return Ok((conn, false, pk)); + return Ok(((conn, false, pk), (feedback, rendezvous_server))); } _ => { log::error!("Unexpected protobuf msg received: {:?}", msg_in); @@ -420,23 +426,26 @@ impl Client { format!("nat_type: {:?}", peer_nat_type) } ); - Self::connect( - my_addr, - peer_addr, - peer, - signed_id_pk, - &relay_server, - &rendezvous_server, - time_used, - peer_nat_type, - my_nat_type, - is_local, - key, - token, - conn_type, - interface, - ) - .await + Ok(( + Self::connect( + my_addr, + peer_addr, + peer, + signed_id_pk, + &relay_server, + &rendezvous_server, + time_used, + peer_nat_type, + my_nat_type, + is_local, + key, + token, + conn_type, + interface, + ) + .await?, + (feedback, rendezvous_server), + )) } /// Connect to the peer. @@ -491,8 +500,7 @@ impl Client { log::info!("peer address: {}, timeout: {}", peer, connect_timeout); let start = std::time::Instant::now(); // NOTICE: Socks5 is be used event in intranet. Which may be not a good way. - let mut conn = - socket_client::connect_tcp_local(peer, Some(local_addr), connect_timeout).await; + let mut conn = connect_tcp_local(peer, Some(local_addr), connect_timeout).await; let mut direct = !conn.is_err(); interface.update_direct(Some(direct)); if interface.is_force_relay() || conn.is_err() { @@ -622,7 +630,7 @@ impl Client { for i in 1..=3 { // use different socket due to current hbbs implementation requiring different nat address for each attempt - let mut socket = socket_client::connect_tcp(rendezvous_server, CONNECT_TIMEOUT) + let mut socket = connect_tcp(rendezvous_server, CONNECT_TIMEOUT) .await .with_context(|| "Failed to connect to rendezvous server")?; @@ -679,8 +687,8 @@ impl Client { conn_type: ConnType, ipv4: bool, ) -> ResultType { - let mut conn = socket_client::connect_tcp( - socket_client::ipv4_to_ipv6(check_port(relay_server, RELAY_PORT), ipv4), + let mut conn = connect_tcp( + ipv4_to_ipv6(check_port(relay_server, RELAY_PORT), ipv4), CONNECT_TIMEOUT, ) .await @@ -3171,3 +3179,75 @@ pub fn check_if_retry(msgtype: &str, title: &str, text: &str, retry_for_relay: b && !text.to_lowercase().contains("manually") && !text.to_lowercase().contains("not allowed"))) } + +pub async fn hc_connection( + feedback: i32, + rendezvous_server: String, + token: &str, +) -> Option> { + if feedback == 0 || rendezvous_server.is_empty() || token.is_empty() { + return None; + } + let (tx, rx) = unbounded_channel::<()>(); + let token = token.to_owned(); + tokio::spawn(async move { + allow_err!(hc_connection_(rendezvous_server, rx, token).await); + }); + Some(tx) +} + +async fn hc_connection_( + rendezvous_server: String, + mut rx: UnboundedReceiver<()>, + token: String, +) -> ResultType<()> { + let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT)); + let mut last_recv_msg = Instant::now(); + let mut keep_alive = crate::DEFAULT_KEEP_ALIVE; + + let host = check_port(&rendezvous_server, RENDEZVOUS_PORT); + let mut conn = connect_tcp(host.clone(), CONNECT_TIMEOUT).await?; + let key = crate::get_key(true).await; + crate::secure_tcp(&mut conn, &key).await?; + let mut msg_out = RendezvousMessage::new(); + msg_out.set_hc(HealthCheck { + token, + ..Default::default() + }); + conn.send(&msg_out).await?; + loop { + tokio::select! { + res = rx.recv() => { + if res.is_none() { + log::debug!("HC connection is closed as controlling connection exits"); + break; + } + } + res = conn.next() => { + last_recv_msg = Instant::now(); + let bytes = res.ok_or_else(|| anyhow!("Rendezvous connection is reset by the peer"))??; + if bytes.is_empty() { + conn.send_bytes(bytes::Bytes::new()).await?; + continue; // heartbeat + } + let msg = RendezvousMessage::parse_from_bytes(&bytes)?; + match msg.union { + Some(rendezvous_message::Union::RegisterPkResponse(rpr)) => { + if rpr.keep_alive > 0 { + keep_alive = rpr.keep_alive * 1000; + log::info!("keep_alive: {}ms", keep_alive); + } + } + _ => {} + } + } + _ = timer.tick() => { + // https://www.emqx.com/en/blog/mqtt-keep-alive + if last_recv_msg.elapsed().as_millis() as u64 > keep_alive as u64 * 3 / 2 { + bail!("HC connection is timeout"); + } + } + } + } + Ok(()) +} diff --git a/src/client/io_loop.rs b/src/client/io_loop.rs index 15a779109..27ecc7a91 100644 --- a/src/client/io_loop.rs +++ b/src/client/io_loop.rs @@ -38,7 +38,7 @@ use hbb_common::{tokio::sync::Mutex as TokioMutex, ResultType}; use scrap::CodecFormat; use crate::client::{ - new_voice_call_request, Client, MediaData, MediaSender, QualityStatus, MILLI1, SEC30, + self, new_voice_call_request, Client, MediaData, MediaSender, QualityStatus, MILLI1, SEC30, }; #[cfg(not(any(target_os = "android", target_os = "ios")))] use crate::clipboard::{update_clipboard, CLIPBOARD_INTERVAL}; @@ -134,7 +134,7 @@ impl Remote { ) .await { - Ok((mut peer, direct, pk)) => { + Ok(((mut peer, direct, pk), (feedback, rendezvous_server))) => { self.handler .connection_round_state .lock() @@ -173,6 +173,9 @@ impl Remote { crate::rustdesk_interval(time::interval(Duration::new(1, 0))); let mut fps_instant = Instant::now(); + let _keep_it = + client::hc_connection(feedback, rendezvous_server, token).await; + loop { tokio::select! { res = peer.next() => { diff --git a/src/common.rs b/src/common.rs index fcc65834b..27135b568 100644 --- a/src/common.rs +++ b/src/common.rs @@ -53,6 +53,9 @@ pub const PLATFORM_LINUX: &str = "Linux"; pub const PLATFORM_MACOS: &str = "Mac OS"; pub const PLATFORM_ANDROID: &str = "Android"; +pub const TIMER_OUT: Duration = Duration::from_secs(1); +pub const DEFAULT_KEEP_ALIVE: i32 = 60_000; + const MIN_VER_MULTI_UI_SESSION: &str = "1.2.4"; pub mod input { diff --git a/src/port_forward.rs b/src/port_forward.rs index f9d38f4c4..28ac624cd 100644 --- a/src/port_forward.rs +++ b/src/port_forward.rs @@ -118,11 +118,14 @@ async fn connect_and_login( } else { ConnType::PORT_FORWARD }; - let (mut stream, direct, _pk) = + let ((mut stream, direct, _pk), (feedback, rendezvous_server)) = Client::start(id, key, token, conn_type, interface.clone()).await?; interface.update_direct(Some(direct)); let mut buffer = Vec::new(); let mut received = false; + + let _keep_it = hc_connection(feedback, rendezvous_server, token).await; + loop { tokio::select! { res = timeout(READ_TIMEOUT, stream.next()) => match res { diff --git a/src/rendezvous_mediator.rs b/src/rendezvous_mediator.rs index ceb4057c7..94fe128be 100644 --- a/src/rendezvous_mediator.rs +++ b/src/rendezvous_mediator.rs @@ -21,11 +21,7 @@ use hbb_common::{ sleep, socket_client::{self, connect_tcp, is_ipv4}, tcp::FramedStream, - tokio::{ - self, select, - sync::Mutex, - time::{interval, Duration}, - }, + tokio::{self, select, sync::Mutex, time::interval}, udp::FramedSocket, AddrMangle, IntoTargetAddr, ResultType, TargetAddr, }; @@ -37,9 +33,6 @@ use crate::{ type Message = RendezvousMessage; -const TIMER_OUT: Duration = Duration::from_secs(1); -const DEFAULT_KEEP_ALIVE: i32 = 60_000; - lazy_static::lazy_static! { static ref SOLVING_PK_MISMATCH: Arc> = Default::default(); } @@ -151,10 +144,10 @@ impl RendezvousMediator { addr: addr.clone(), host: host.clone(), host_prefix: Self::get_host_prefix(&host), - keep_alive: DEFAULT_KEEP_ALIVE, + keep_alive: crate::DEFAULT_KEEP_ALIVE, }; - let mut timer = crate::rustdesk_interval(interval(TIMER_OUT)); + let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT)); const MIN_REG_TIMEOUT: i64 = 3_000; const MAX_REG_TIMEOUT: i64 = 30_000; let mut reg_timeout = MIN_REG_TIMEOUT; @@ -337,9 +330,9 @@ impl RendezvousMediator { addr: conn.local_addr().into_target_addr()?, host: host.clone(), host_prefix: Self::get_host_prefix(&host), - keep_alive: DEFAULT_KEEP_ALIVE, + keep_alive: crate::DEFAULT_KEEP_ALIVE, }; - let mut timer = crate::rustdesk_interval(interval(TIMER_OUT)); + let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT)); let mut last_register_sent: Option = None; let mut last_recv_msg = Instant::now(); // we won't support connecting to multiple rendzvous servers any more, so we can use a global variable here.