refactor udp framed

Signed-off-by: fufesou <shuanglongchen@yeah.net>
This commit is contained in:
fufesou
2022-01-04 00:44:50 +08:00
parent 6d506cbb64
commit 875570e040
8 changed files with 140 additions and 301 deletions

View File

@@ -1,27 +1,20 @@
use std::net::SocketAddr;
pub use arboard::Clipboard as ClipboardContext;
use hbb_common::{
allow_err,
anyhow::bail,
bytes::{Bytes, BytesMut},
compress::{compress as compress_func, decompress},
config::{Config, COMPRESS_LEVEL, RENDEZVOUS_TIMEOUT},
futures_core::Stream,
futures_sink::Sink,
config::{Config, NetworkType, COMPRESS_LEVEL, RENDEZVOUS_TIMEOUT},
log,
message_proto::*,
protobuf::Message as _,
protobuf::ProtobufEnum,
rendezvous_proto::*,
sleep, socket_client, tokio,
udp::FramedSocket,
ResultType,
sleep, socket_client, tokio, ResultType,
};
#[cfg(any(target_os = "android", target_os = "ios", feature = "cli"))]
use hbb_common::{config::RENDEZVOUS_PORT, futures::future::join_all};
use std::{
future::Future,
net::SocketAddr,
sync::{Arc, Mutex},
};
@@ -274,7 +267,9 @@ async fn test_nat_type_() -> ResultType<bool> {
RENDEZVOUS_TIMEOUT,
)
.await?;
addr = socket.local_addr();
if Config::get_network_type() == NetworkType::Direct {
addr = socket.local_addr();
}
socket.send(&msg_out).await?;
if let Some(Ok(bytes)) = socket.next_timeout(3000).await {
if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
@@ -448,35 +443,12 @@ async fn _check_software_update() -> hbb_common::ResultType<()> {
sleep(3.).await;
let rendezvous_server = get_rendezvous_server(1_000).await;
let socks5_conf = socket_client::get_socks5_conf();
if socks5_conf.is_some() {
let conn_fn = |bind_addr: SocketAddr| async move {
socket_client::connect_udp_socks5(
rendezvous_server,
bind_addr,
&socks5_conf,
RENDEZVOUS_TIMEOUT,
)
.await
};
_inner_check_software_update(conn_fn, rendezvous_server).await
} else {
_inner_check_software_update(socket_client::connect_udp_socket, rendezvous_server).await
}
}
pub async fn _inner_check_software_update<'a, F, Fut, Frm>(
conn_fn: F,
rendezvous_server: SocketAddr,
) -> ResultType<()>
where
F: FnOnce(SocketAddr) -> Fut,
Fut: Future<Output = ResultType<(FramedSocket<Frm>, Option<SocketAddr>)>>,
Frm: Unpin + Stream<Item = ResultType<(BytesMut, SocketAddr)>> + Sink<(Bytes, SocketAddr)>,
<Frm as Sink<(Bytes, SocketAddr)>>::Error: Sync + Send + std::error::Error + 'static,
{
sleep(3.).await;
let (mut socket, _) = conn_fn(Config::get_any_listen_addr()).await?;
let (mut socket, _) = socket_client::connect_udp(
rendezvous_server,
Config::get_any_listen_addr(),
RENDEZVOUS_TIMEOUT,
)
.await?;
let mut msg_out = RendezvousMessage::new();
msg_out.set_software_update(SoftwareUpdate {
url: crate::VERSION.to_owned(),

View File

@@ -2,11 +2,8 @@ use crate::server::{check_zombie, new as new_server, ServerPtr};
use hbb_common::{
allow_err,
anyhow::bail,
bytes::{Bytes, BytesMut},
config::{Config, RENDEZVOUS_PORT, RENDEZVOUS_TIMEOUT},
futures::future::join_all,
futures_core::Stream,
futures_sink::Sink,
log,
protobuf::Message as _,
rendezvous_proto::*,
@@ -19,7 +16,6 @@ use hbb_common::{
AddrMangle, ResultType,
};
use std::{
future::Future,
net::SocketAddr,
sync::{Arc, Mutex},
time::SystemTime,
@@ -63,35 +59,36 @@ impl RendezvousMediator {
let server = server.clone();
let servers = servers.clone();
futs.push(tokio::spawn(async move {
let socks5_conf = socket_client::get_socks5_conf();
if socks5_conf.is_some() {
let target = format!("{}:{}", host, RENDEZVOUS_PORT);
let conn_fn = |bind_addr: SocketAddr| {
let target = target.clone();
let conf_ref = &socks5_conf;
async move {
socket_client::connect_udp_socks5(
target,
bind_addr,
conf_ref,
RENDEZVOUS_TIMEOUT,
)
.await
}
};
allow_err!(Self::start(server, host, servers, conn_fn, true).await);
} else {
allow_err!(
Self::start(
server,
host,
servers,
socket_client::connect_udp_socket,
false,
)
.await
);
}
allow_err!(Self::start(server, host, servers).await);
// let socks5_conf = socket_client::get_socks5_conf();
// if socks5_conf.is_some() {
// let target = format!("{}:{}", host, RENDEZVOUS_PORT);
// let conn_fn = |bind_addr: SocketAddr| {
// let target = target.clone();
// let conf_ref = &socks5_conf;
// async move {
// socket_client::connect_udp_socks5(
// target,
// bind_addr,
// conf_ref,
// RENDEZVOUS_TIMEOUT,
// )
// .await
// }
// };
// allow_err!(Self::start(server, host, servers, conn_fn, true).await);
// } else {
// allow_err!(
// Self::start(
// server,
// host,
// servers,
// socket_client::connect_udp_socket,
// false,
// )
// .await
// );
// }
}));
}
join_all(futs).await;
@@ -100,19 +97,11 @@ impl RendezvousMediator {
}
}
pub async fn start<'a, F, Fut, Frm>(
pub async fn start(
server: ServerPtr,
host: String,
rendezvous_servers: Vec<String>,
conn_fn: F,
socks5: bool,
) -> ResultType<()>
where
F: Fn(SocketAddr) -> Fut,
Fut: Future<Output = ResultType<(FramedSocket<Frm>, Option<SocketAddr>)>>,
Frm: Unpin + Stream<Item = ResultType<(BytesMut, SocketAddr)>> + Sink<(Bytes, SocketAddr)>,
<Frm as Sink<(Bytes, SocketAddr)>>::Error: Sync + Send + std::error::Error + 'static,
{
) -> ResultType<()> {
log::info!("start rendezvous mediator of {}", host);
let host_prefix: String = host
.split(".")
@@ -132,12 +121,17 @@ impl RendezvousMediator {
rendezvous_servers,
last_id_pk_registry: "".to_owned(),
};
allow_err!(rz.dns_check());
let mut host_addr = rz.addr;
allow_err!(rz.dns_check(&mut host_addr));
let bind_addr = Config::get_any_listen_addr();
let (mut socket, target_addr) = conn_fn(bind_addr).await?;
let target = format!("{}:{}", host, RENDEZVOUS_PORT);
let (mut socket, target_addr) =
socket_client::connect_udp(target, bind_addr, RENDEZVOUS_TIMEOUT).await?;
if let Some(addr) = target_addr {
rz.addr = addr;
} else {
rz.addr = host_addr;
}
const TIMER_OUT: Duration = Duration::from_secs(1);
let mut timer = interval(TIMER_OUT);
@@ -254,20 +248,16 @@ impl RendezvousMediator {
}
if rz.addr.port() == 0 {
// tcp is established to help connecting socks5
if !socks5 {
allow_err!(rz.dns_check());
if rz.addr.port() == 0 {
continue;
} else {
// have to do this for osx, to avoid "Can't assign requested address"
// when socket created before OS network ready
let r = conn_fn(bind_addr).await?;
socket = r.0;
if let Some(addr) = r.1 {
rz.addr = addr;
}
}
allow_err!(rz.dns_check(&mut host_addr));
if host_addr.port() == 0 {
continue;
} else {
// have to do this for osx, to avoid "Can't assign requested address"
// when socket created before OS network ready
if let Some(s) = socket_client::reconnect_udp(bind_addr).await? {
socket = s;
rz.addr = host_addr;
};
}
}
let now = SystemTime::now();
@@ -287,18 +277,13 @@ impl RendezvousMediator {
Config::update_latency(&host, -1);
old_latency = 0;
if now.duration_since(last_dns_check).map(|d| d.as_millis() as i64).unwrap_or(0) > DNS_INTERVAL {
// tcp is established to help connecting socks5
if !socks5 {
if let Ok(_) = rz.dns_check() {
if let Ok(_) = rz.dns_check(&mut host_addr) {
// in some case of network reconnect (dial IP network),
// old UDP socket not work any more after network recover
let r = conn_fn(bind_addr).await?;
socket = r.0;
if let Some(addr) = r.1 {
rz.addr = addr;
}
}
if let Some(s) = socket_client::reconnect_udp(bind_addr).await? {
socket = s;
rz.addr = host_addr;
};
}
last_dns_check = now;
}
@@ -314,8 +299,8 @@ impl RendezvousMediator {
Ok(())
}
fn dns_check(&mut self) -> ResultType<()> {
self.addr = hbb_common::to_socket_addr(&crate::check_port(&self.host, RENDEZVOUS_PORT))?;
fn dns_check(&self, addr: &mut SocketAddr) -> ResultType<()> {
*addr = hbb_common::to_socket_addr(&crate::check_port(&self.host, RENDEZVOUS_PORT))?;
log::debug!("Lookup dns of {}", self.host);
Ok(())
}
@@ -449,11 +434,7 @@ impl RendezvousMediator {
Ok(())
}
async fn register_pk<Frm>(&mut self, socket: &mut FramedSocket<Frm>) -> ResultType<()>
where
Frm: Unpin + Stream + Sink<(Bytes, SocketAddr)>,
<Frm as Sink<(Bytes, SocketAddr)>>::Error: Sync + Send + std::error::Error + 'static,
{
async fn register_pk(&mut self, socket: &mut FramedSocket) -> ResultType<()> {
let mut msg_out = Message::new();
let pk = Config::get_key_pair().1;
let uuid = if let Ok(id) = machine_uid::get() {
@@ -474,11 +455,7 @@ impl RendezvousMediator {
Ok(())
}
async fn handle_uuid_mismatch<Frm>(&mut self, socket: &mut FramedSocket<Frm>) -> ResultType<()>
where
Frm: Unpin + Stream + Sink<(Bytes, SocketAddr)>,
<Frm as Sink<(Bytes, SocketAddr)>>::Error: Sync + Send + std::error::Error + 'static,
{
async fn handle_uuid_mismatch(&mut self, socket: &mut FramedSocket) -> ResultType<()> {
if self.last_id_pk_registry != Config::get_id() {
return Ok(());
}
@@ -496,11 +473,7 @@ impl RendezvousMediator {
self.register_pk(socket).await
}
async fn register_peer<Frm>(&mut self, socket: &mut FramedSocket<Frm>) -> ResultType<()>
where
Frm: Unpin + Stream + Sink<(Bytes, SocketAddr)>,
<Frm as Sink<(Bytes, SocketAddr)>>::Error: Sync + Send + std::error::Error + 'static,
{
async fn register_peer(&mut self, socket: &mut FramedSocket) -> ResultType<()> {
if !SOLVING_PK_MISMATCH.lock().unwrap().is_empty() {
return Ok(());
}