This commit is contained in:
rustdesk
2024-07-14 05:47:42 +08:00
parent bed214bd37
commit d0a54a6cc6
6 changed files with 144 additions and 54 deletions

View File

@@ -22,12 +22,9 @@ use sha2::{Digest, Sha256};
use uuid::Uuid;
pub use file_trait::FileManager;
#[cfg(windows)]
use hbb_common::tokio;
#[cfg(not(feature = "flutter"))]
#[cfg(not(any(target_os = "android", target_os = "ios")))]
use hbb_common::tokio::sync::mpsc::UnboundedSender;
#[cfg(not(any(target_os = "android", target_os = "ios")))]
use hbb_common::tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use hbb_common::{
allow_err,
@@ -42,11 +39,14 @@ use hbb_common::{
protobuf::{Message as _, MessageField},
rand,
rendezvous_proto::*,
socket_client,
socket_client::{connect_tcp, connect_tcp_local, ipv4_to_ipv6},
sodiumoxide::{base64, crypto::sign},
tcp::FramedStream,
timeout,
tokio::time::Duration,
tokio::{
self,
time::{interval, Duration, Instant},
},
AddrMangle, ResultType, Stream,
};
pub use helper::*;
@@ -226,7 +226,7 @@ impl Client {
token: &str,
conn_type: ConnType,
interface: impl Interface,
) -> ResultType<(Stream, bool, Option<Vec<u8>>)> {
) -> ResultType<((Stream, bool, Option<Vec<u8>>), (i32, String))> {
debug_assert!(peer == interface.get_id());
interface.update_direct(None);
interface.update_received(false);
@@ -250,25 +250,26 @@ impl Client {
token: &str,
conn_type: ConnType,
interface: impl Interface,
) -> ResultType<(Stream, bool, Option<Vec<u8>>)> {
) -> ResultType<((Stream, bool, Option<Vec<u8>>), (i32, String))> {
if config::is_incoming_only() {
bail!("Incoming only mode");
}
// to-do: remember the port for each peer, so that we can retry easier
if hbb_common::is_ip_str(peer) {
return Ok((
socket_client::connect_tcp(check_port(peer, RELAY_PORT + 1), CONNECT_TIMEOUT)
.await?,
true,
None,
(
connect_tcp(check_port(peer, RELAY_PORT + 1), CONNECT_TIMEOUT).await?,
true,
None,
),
(0, "".to_owned()),
));
}
// Allow connect to {domain}:{port}
if hbb_common::is_domain_port_str(peer) {
return Ok((
socket_client::connect_tcp(peer, CONNECT_TIMEOUT).await?,
true,
None,
(connect_tcp(peer, CONNECT_TIMEOUT).await?, true, None),
(0, "".to_owned()),
));
}
@@ -295,13 +296,13 @@ impl Client {
}
};
let mut socket = socket_client::connect_tcp(&*rendezvous_server, CONNECT_TIMEOUT).await;
let mut socket = connect_tcp(&*rendezvous_server, CONNECT_TIMEOUT).await;
debug_assert!(!servers.contains(&rendezvous_server));
if socket.is_err() && !servers.is_empty() {
log::info!("try the other servers: {:?}", servers);
for server in servers {
let server = check_port(server, RENDEZVOUS_PORT);
socket = socket_client::connect_tcp(&*server, CONNECT_TIMEOUT).await;
socket = connect_tcp(&*server, CONNECT_TIMEOUT).await;
if socket.is_ok() {
rendezvous_server = server;
break;
@@ -327,6 +328,7 @@ impl Client {
let mut peer_nat_type = NatType::UNKNOWN_NAT;
let my_nat_type = crate::get_nat_type(100).await;
let mut is_local = false;
let mut feedback = 0;
for i in 1..=3 {
log::info!("#{} punch attempt with {}, id: {}", i, my_addr, peer);
let mut msg_out = RendezvousMessage::new();
@@ -342,9 +344,11 @@ impl Client {
nat_type: nat_type.into(),
licence_key: key.to_owned(),
conn_type: conn_type.into(),
version: crate::VERSION.to_owned(),
..Default::default()
});
socket.send(&msg_out).await?;
// below timeout should not bigger than hbbs's connection timeout.
if let Some(msg_in) =
crate::get_next_nonkeyexchange_msg(&mut socket, Some(i * 6000)).await
{
@@ -375,6 +379,7 @@ impl Client {
signed_id_pk = ph.pk.into();
relay_server = ph.relay_server;
peer_addr = AddrMangle::decode(&ph.socket_addr);
feedback = ph.feedback;
log::info!("Hole Punched {} = {}", peer, peer_addr);
break;
}
@@ -395,9 +400,10 @@ impl Client {
my_addr.is_ipv4(),
)
.await?;
feedback = rr.feedback;
let pk =
Self::secure_connection(peer, signed_id_pk, key, &mut conn).await?;
return Ok((conn, false, pk));
return Ok(((conn, false, pk), (feedback, rendezvous_server)));
}
_ => {
log::error!("Unexpected protobuf msg received: {:?}", msg_in);
@@ -420,23 +426,26 @@ impl Client {
format!("nat_type: {:?}", peer_nat_type)
}
);
Self::connect(
my_addr,
peer_addr,
peer,
signed_id_pk,
&relay_server,
&rendezvous_server,
time_used,
peer_nat_type,
my_nat_type,
is_local,
key,
token,
conn_type,
interface,
)
.await
Ok((
Self::connect(
my_addr,
peer_addr,
peer,
signed_id_pk,
&relay_server,
&rendezvous_server,
time_used,
peer_nat_type,
my_nat_type,
is_local,
key,
token,
conn_type,
interface,
)
.await?,
(feedback, rendezvous_server),
))
}
/// Connect to the peer.
@@ -491,8 +500,7 @@ impl Client {
log::info!("peer address: {}, timeout: {}", peer, connect_timeout);
let start = std::time::Instant::now();
// NOTICE: Socks5 is be used event in intranet. Which may be not a good way.
let mut conn =
socket_client::connect_tcp_local(peer, Some(local_addr), connect_timeout).await;
let mut conn = connect_tcp_local(peer, Some(local_addr), connect_timeout).await;
let mut direct = !conn.is_err();
interface.update_direct(Some(direct));
if interface.is_force_relay() || conn.is_err() {
@@ -622,7 +630,7 @@ impl Client {
for i in 1..=3 {
// use different socket due to current hbbs implementation requiring different nat address for each attempt
let mut socket = socket_client::connect_tcp(rendezvous_server, CONNECT_TIMEOUT)
let mut socket = connect_tcp(rendezvous_server, CONNECT_TIMEOUT)
.await
.with_context(|| "Failed to connect to rendezvous server")?;
@@ -679,8 +687,8 @@ impl Client {
conn_type: ConnType,
ipv4: bool,
) -> ResultType<Stream> {
let mut conn = socket_client::connect_tcp(
socket_client::ipv4_to_ipv6(check_port(relay_server, RELAY_PORT), ipv4),
let mut conn = connect_tcp(
ipv4_to_ipv6(check_port(relay_server, RELAY_PORT), ipv4),
CONNECT_TIMEOUT,
)
.await
@@ -3171,3 +3179,75 @@ pub fn check_if_retry(msgtype: &str, title: &str, text: &str, retry_for_relay: b
&& !text.to_lowercase().contains("manually")
&& !text.to_lowercase().contains("not allowed")))
}
pub async fn hc_connection(
feedback: i32,
rendezvous_server: String,
token: &str,
) -> Option<tokio::sync::mpsc::UnboundedSender<()>> {
if feedback == 0 || rendezvous_server.is_empty() || token.is_empty() {
return None;
}
let (tx, rx) = unbounded_channel::<()>();
let token = token.to_owned();
tokio::spawn(async move {
allow_err!(hc_connection_(rendezvous_server, rx, token).await);
});
Some(tx)
}
async fn hc_connection_(
rendezvous_server: String,
mut rx: UnboundedReceiver<()>,
token: String,
) -> ResultType<()> {
let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT));
let mut last_recv_msg = Instant::now();
let mut keep_alive = crate::DEFAULT_KEEP_ALIVE;
let host = check_port(&rendezvous_server, RENDEZVOUS_PORT);
let mut conn = connect_tcp(host.clone(), CONNECT_TIMEOUT).await?;
let key = crate::get_key(true).await;
crate::secure_tcp(&mut conn, &key).await?;
let mut msg_out = RendezvousMessage::new();
msg_out.set_hc(HealthCheck {
token,
..Default::default()
});
conn.send(&msg_out).await?;
loop {
tokio::select! {
res = rx.recv() => {
if res.is_none() {
log::debug!("HC connection is closed as controlling connection exits");
break;
}
}
res = conn.next() => {
last_recv_msg = Instant::now();
let bytes = res.ok_or_else(|| anyhow!("Rendezvous connection is reset by the peer"))??;
if bytes.is_empty() {
conn.send_bytes(bytes::Bytes::new()).await?;
continue; // heartbeat
}
let msg = RendezvousMessage::parse_from_bytes(&bytes)?;
match msg.union {
Some(rendezvous_message::Union::RegisterPkResponse(rpr)) => {
if rpr.keep_alive > 0 {
keep_alive = rpr.keep_alive * 1000;
log::info!("keep_alive: {}ms", keep_alive);
}
}
_ => {}
}
}
_ = timer.tick() => {
// https://www.emqx.com/en/blog/mqtt-keep-alive
if last_recv_msg.elapsed().as_millis() as u64 > keep_alive as u64 * 3 / 2 {
bail!("HC connection is timeout");
}
}
}
}
Ok(())
}