socks5 support

Signed-off-by: fufesou <shuanglongchen@yeah.net>
This commit is contained in:
fufesou
2022-01-02 22:55:33 +08:00
parent b17bda9a55
commit 6d506cbb64
11 changed files with 704 additions and 148 deletions

View File

@@ -13,8 +13,8 @@ use hbb_common::{
message_proto::*,
protobuf::Message as _,
rendezvous_proto::*,
socket_client,
sodiumoxide::crypto::{box_, secretbox, sign},
tcp::FramedStream,
timeout,
tokio::time::Duration,
AddrMangle, ResultType, Stream,
@@ -107,10 +107,10 @@ impl Client {
let any_addr = Config::get_any_listen_addr();
let rendezvous_server = crate::get_rendezvous_server(1_000).await;
log::info!("rendezvous server: {}", rendezvous_server);
let mut socket = FramedStream::new(rendezvous_server, any_addr, RENDEZVOUS_TIMEOUT)
.await
.with_context(|| "Failed to connect to rendezvous server")?;
let my_addr = socket.get_ref().local_addr()?;
let mut socket =
socket_client::connect_tcp(rendezvous_server, any_addr, RENDEZVOUS_TIMEOUT).await?;
let my_addr = socket.local_addr();
let mut pk = Vec::new();
let mut relay_server = "".to_owned();
@@ -262,7 +262,8 @@ impl Client {
}
log::info!("peer address: {}, timeout: {}", peer, connect_timeout);
let start = std::time::Instant::now();
let mut conn = FramedStream::new(peer, local_addr, connect_timeout).await;
// NOTICE: Socks5 is be used event in intranet. Which may be not a good way.
let mut conn = socket_client::connect_tcp(peer, local_addr, connect_timeout).await;
let direct = !conn.is_err();
if conn.is_err() {
if !relay_server.is_empty() {
@@ -393,9 +394,11 @@ impl Client {
let mut uuid = "".to_owned();
for i in 1..=3 {
// use different socket due to current hbbs implement requiring different nat address for each attempt
let mut socket = FramedStream::new(rendezvous_server, any_addr, RENDEZVOUS_TIMEOUT)
.await
.with_context(|| "Failed to connect to rendezvous server")?;
let mut socket =
socket_client::connect_tcp(rendezvous_server, any_addr, RENDEZVOUS_TIMEOUT)
.await
.with_context(|| "Failed to connect to rendezvous server")?;
let mut msg_out = RendezvousMessage::new();
uuid = Uuid::new_v4().to_string();
log::info!(
@@ -438,7 +441,7 @@ impl Client {
relay_server: String,
conn_type: ConnType,
) -> ResultType<Stream> {
let mut conn = FramedStream::new(
let mut conn = socket_client::connect_tcp(
crate::check_port(relay_server, RELAY_PORT),
Config::get_any_listen_addr(),
CONNECT_TIMEOUT,

View File

@@ -1,20 +1,29 @@
use std::net::SocketAddr;
pub use arboard::Clipboard as ClipboardContext;
use hbb_common::{
allow_err, bail,
allow_err,
anyhow::bail,
bytes::{Bytes, BytesMut},
compress::{compress as compress_func, decompress},
config::{Config, COMPRESS_LEVEL, RENDEZVOUS_TIMEOUT},
futures_core::Stream,
futures_sink::Sink,
log,
message_proto::*,
protobuf::Message as _,
protobuf::ProtobufEnum,
rendezvous_proto::*,
sleep,
tcp::FramedStream,
tokio, ResultType,
sleep, socket_client, tokio,
udp::FramedSocket,
ResultType,
};
#[cfg(any(target_os = "android", target_os = "ios", feature = "cli"))]
use hbb_common::{config::RENDEZVOUS_PORT, futures::future::join_all};
use std::sync::{Arc, Mutex};
use std::{
future::Future,
sync::{Arc, Mutex},
};
pub const CLIPBOARD_NAME: &'static str = "clipboard";
pub const CLIPBOARD_INTERVAL: u64 = 333;
@@ -259,13 +268,13 @@ async fn test_nat_type_() -> ResultType<bool> {
let mut port2 = 0;
let mut addr = Config::get_any_listen_addr();
for i in 0..2 {
let mut socket = FramedStream::new(
let mut socket = socket_client::connect_tcp(
if i == 0 { &server1 } else { &server2 },
addr,
RENDEZVOUS_TIMEOUT,
)
.await?;
addr = socket.get_ref().local_addr()?;
addr = socket.local_addr();
socket.send(&msg_out).await?;
if let Some(Ok(bytes)) = socket.next_timeout(3000).await {
if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
@@ -302,12 +311,12 @@ async fn test_nat_type_() -> ResultType<bool> {
}
#[cfg(any(target_os = "android", target_os = "ios"))]
pub async fn get_rendezvous_server(_ms_timeout: u64) -> std::net::SocketAddr {
pub async fn get_rendezvous_server(_ms_timeout: u64) -> SocketAddr {
Config::get_rendezvous_server()
}
#[cfg(not(any(target_os = "android", target_os = "ios")))]
pub async fn get_rendezvous_server(ms_timeout: u64) -> std::net::SocketAddr {
pub async fn get_rendezvous_server(ms_timeout: u64) -> SocketAddr {
crate::ipc::get_rendezvous_server(ms_timeout).await
}
@@ -330,7 +339,7 @@ async fn test_rendezvous_server_() {
for host in servers {
futs.push(tokio::spawn(async move {
let tm = std::time::Instant::now();
if FramedStream::new(
if socket_client::connect_tcp(
&crate::check_port(&host, RENDEZVOUS_PORT),
Config::get_any_listen_addr(),
RENDEZVOUS_TIMEOUT,
@@ -437,8 +446,37 @@ pub fn check_software_update() {
#[tokio::main(flavor = "current_thread")]
async fn _check_software_update() -> hbb_common::ResultType<()> {
sleep(3.).await;
let rendezvous_server = get_rendezvous_server(1_000).await;
let mut socket = hbb_common::udp::FramedSocket::new(Config::get_any_listen_addr()).await?;
let socks5_conf = socket_client::get_socks5_conf();
if socks5_conf.is_some() {
let conn_fn = |bind_addr: SocketAddr| async move {
socket_client::connect_udp_socks5(
rendezvous_server,
bind_addr,
&socks5_conf,
RENDEZVOUS_TIMEOUT,
)
.await
};
_inner_check_software_update(conn_fn, rendezvous_server).await
} else {
_inner_check_software_update(socket_client::connect_udp_socket, rendezvous_server).await
}
}
pub async fn _inner_check_software_update<'a, F, Fut, Frm>(
conn_fn: F,
rendezvous_server: SocketAddr,
) -> ResultType<()>
where
F: FnOnce(SocketAddr) -> Fut,
Fut: Future<Output = ResultType<(FramedSocket<Frm>, Option<SocketAddr>)>>,
Frm: Unpin + Stream<Item = ResultType<(BytesMut, SocketAddr)>> + Sink<(Bytes, SocketAddr)>,
<Frm as Sink<(Bytes, SocketAddr)>>::Error: Sync + Send + std::error::Error + 'static,
{
sleep(3.).await;
let (mut socket, _) = conn_fn(Config::get_any_listen_addr()).await?;
let mut msg_out = RendezvousMessage::new();
msg_out.set_software_update(SoftwareUpdate {
url: crate::VERSION.to_owned(),

View File

@@ -1,13 +1,16 @@
use crate::server::{check_zombie, new as new_server, ServerPtr};
use hbb_common::{
allow_err,
anyhow::bail,
bytes::{Bytes, BytesMut},
config::{Config, RENDEZVOUS_PORT, RENDEZVOUS_TIMEOUT},
futures::future::join_all,
futures_core::Stream,
futures_sink::Sink,
log,
protobuf::Message as _,
rendezvous_proto::*,
sleep,
tcp::FramedStream,
sleep, socket_client,
tokio::{
self, select,
time::{interval, Duration},
@@ -16,6 +19,7 @@ use hbb_common::{
AddrMangle, ResultType,
};
use std::{
future::Future,
net::SocketAddr,
sync::{Arc, Mutex},
time::SystemTime,
@@ -59,7 +63,35 @@ impl RendezvousMediator {
let server = server.clone();
let servers = servers.clone();
futs.push(tokio::spawn(async move {
allow_err!(Self::start(server, host, servers).await);
let socks5_conf = socket_client::get_socks5_conf();
if socks5_conf.is_some() {
let target = format!("{}:{}", host, RENDEZVOUS_PORT);
let conn_fn = |bind_addr: SocketAddr| {
let target = target.clone();
let conf_ref = &socks5_conf;
async move {
socket_client::connect_udp_socks5(
target,
bind_addr,
conf_ref,
RENDEZVOUS_TIMEOUT,
)
.await
}
};
allow_err!(Self::start(server, host, servers, conn_fn, true).await);
} else {
allow_err!(
Self::start(
server,
host,
servers,
socket_client::connect_udp_socket,
false,
)
.await
);
}
}));
}
join_all(futs).await;
@@ -68,11 +100,19 @@ impl RendezvousMediator {
}
}
pub async fn start(
pub async fn start<'a, F, Fut, Frm>(
server: ServerPtr,
host: String,
rendezvous_servers: Vec<String>,
) -> ResultType<()> {
conn_fn: F,
socks5: bool,
) -> ResultType<()>
where
F: Fn(SocketAddr) -> Fut,
Fut: Future<Output = ResultType<(FramedSocket<Frm>, Option<SocketAddr>)>>,
Frm: Unpin + Stream<Item = ResultType<(BytesMut, SocketAddr)>> + Sink<(Bytes, SocketAddr)>,
<Frm as Sink<(Bytes, SocketAddr)>>::Error: Sync + Send + std::error::Error + 'static,
{
log::info!("start rendezvous mediator of {}", host);
let host_prefix: String = host
.split(".")
@@ -93,7 +133,12 @@ impl RendezvousMediator {
last_id_pk_registry: "".to_owned(),
};
allow_err!(rz.dns_check());
let mut socket = FramedSocket::new(Config::get_any_listen_addr()).await?;
let bind_addr = Config::get_any_listen_addr();
let (mut socket, target_addr) = conn_fn(bind_addr).await?;
if let Some(addr) = target_addr {
rz.addr = addr;
}
const TIMER_OUT: Duration = Duration::from_secs(1);
let mut timer = interval(TIMER_OUT);
let mut last_timer = SystemTime::UNIX_EPOCH;
@@ -136,60 +181,68 @@ impl RendezvousMediator {
}
};
select! {
Some(Ok((bytes, _))) = socket.next() => {
if let Ok(msg_in) = Message::parse_from_bytes(&bytes) {
match msg_in.union {
Some(rendezvous_message::Union::register_peer_response(rpr)) => {
update_latency();
if rpr.request_pk {
log::info!("request_pk received from {}", host);
allow_err!(rz.register_pk(&mut socket).await);
continue;
}
}
Some(rendezvous_message::Union::register_pk_response(rpr)) => {
update_latency();
match rpr.result.enum_value_or_default() {
register_pk_response::Result::OK => {
Config::set_key_confirmed(true);
Config::set_host_key_confirmed(&rz.host_prefix, true);
*SOLVING_PK_MISMATCH.lock().unwrap() = "".to_owned();
n = socket.next() => {
match n {
Some(Ok((bytes, _))) => {
if let Ok(msg_in) = Message::parse_from_bytes(&bytes) {
match msg_in.union {
Some(rendezvous_message::Union::register_peer_response(rpr)) => {
update_latency();
if rpr.request_pk {
log::info!("request_pk received from {}", host);
allow_err!(rz.register_pk(&mut socket).await);
continue;
}
}
register_pk_response::Result::UUID_MISMATCH => {
allow_err!(rz.handle_uuid_mismatch(&mut socket).await);
Some(rendezvous_message::Union::register_pk_response(rpr)) => {
update_latency();
match rpr.result.enum_value_or_default() {
register_pk_response::Result::OK => {
Config::set_key_confirmed(true);
Config::set_host_key_confirmed(&rz.host_prefix, true);
*SOLVING_PK_MISMATCH.lock().unwrap() = "".to_owned();
}
register_pk_response::Result::UUID_MISMATCH => {
allow_err!(rz.handle_uuid_mismatch(&mut socket).await);
}
_ => {}
}
}
Some(rendezvous_message::Union::punch_hole(ph)) => {
let rz = rz.clone();
let server = server.clone();
tokio::spawn(async move {
allow_err!(rz.handle_punch_hole(ph, server).await);
});
}
Some(rendezvous_message::Union::request_relay(rr)) => {
let rz = rz.clone();
let server = server.clone();
tokio::spawn(async move {
allow_err!(rz.handle_request_relay(rr, server).await);
});
}
Some(rendezvous_message::Union::fetch_local_addr(fla)) => {
let rz = rz.clone();
let server = server.clone();
tokio::spawn(async move {
allow_err!(rz.handle_intranet(fla, server).await);
});
}
Some(rendezvous_message::Union::configure_update(cu)) => {
Config::set_option("rendezvous-servers".to_owned(), cu.rendezvous_servers.join(","));
Config::set_serial(cu.serial);
}
_ => {}
}
} else {
log::debug!("Non-protobuf message bytes received: {:?}", bytes);
}
Some(rendezvous_message::Union::punch_hole(ph)) => {
let rz = rz.clone();
let server = server.clone();
tokio::spawn(async move {
allow_err!(rz.handle_punch_hole(ph, server).await);
});
}
Some(rendezvous_message::Union::request_relay(rr)) => {
let rz = rz.clone();
let server = server.clone();
tokio::spawn(async move {
allow_err!(rz.handle_request_relay(rr, server).await);
});
}
Some(rendezvous_message::Union::fetch_local_addr(fla)) => {
let rz = rz.clone();
let server = server.clone();
tokio::spawn(async move {
allow_err!(rz.handle_intranet(fla, server).await);
});
}
Some(rendezvous_message::Union::configure_update(cu)) => {
Config::set_option("rendezvous-servers".to_owned(), cu.rendezvous_servers.join(","));
Config::set_serial(cu.serial);
}
_ => {}
}
} else {
log::debug!("Non-protobuf message bytes received: {:?}", bytes);
},
Some(Err(e)) => bail!("Failed to receive next {}", e), // maybe socks5 tcp disconnected
None => {
// unreachable!()
},
}
},
_ = timer.tick() => {
@@ -200,13 +253,21 @@ impl RendezvousMediator {
break;
}
if rz.addr.port() == 0 {
allow_err!(rz.dns_check());
if rz.addr.port() == 0 {
continue;
} else {
// have to do this for osx, to avoid "Can't assign requested address"
// when socket created before OS network ready
socket = FramedSocket::new(Config::get_any_listen_addr()).await?;
// tcp is established to help connecting socks5
if !socks5 {
allow_err!(rz.dns_check());
if rz.addr.port() == 0 {
continue;
} else {
// have to do this for osx, to avoid "Can't assign requested address"
// when socket created before OS network ready
let r = conn_fn(bind_addr).await?;
socket = r.0;
if let Some(addr) = r.1 {
rz.addr = addr;
}
}
}
}
let now = SystemTime::now();
@@ -226,10 +287,18 @@ impl RendezvousMediator {
Config::update_latency(&host, -1);
old_latency = 0;
if now.duration_since(last_dns_check).map(|d| d.as_millis() as i64).unwrap_or(0) > DNS_INTERVAL {
if let Ok(_) = rz.dns_check() {
// tcp is established to help connecting socks5
if !socks5 {
if let Ok(_) = rz.dns_check() {
// in some case of network reconnect (dial IP network),
// old UDP socket not work any more after network recover
socket = FramedSocket::new(Config::get_any_listen_addr()).await?;
let r = conn_fn(bind_addr).await?;
socket = r.0;
if let Some(addr) = r.1 {
rz.addr = addr;
}
}
}
last_dns_check = now;
}
@@ -280,8 +349,14 @@ impl RendezvousMediator {
uuid,
secure,
);
let mut socket =
FramedStream::new(self.addr, Config::get_any_listen_addr(), RENDEZVOUS_TIMEOUT).await?;
let mut socket = socket_client::connect_tcp(
format!("{}:{}", self.host, RENDEZVOUS_PORT),
Config::get_any_listen_addr(),
RENDEZVOUS_TIMEOUT,
)
.await?;
let mut msg_out = Message::new();
let mut rr = RelayResponse {
socket_addr,
@@ -303,15 +378,15 @@ impl RendezvousMediator {
async fn handle_intranet(&self, fla: FetchLocalAddr, server: ServerPtr) -> ResultType<()> {
let peer_addr = AddrMangle::decode(&fla.socket_addr);
log::debug!("Handle intranet from {:?}", peer_addr);
let (mut socket, port) = {
let socket =
FramedStream::new(self.addr, Config::get_any_listen_addr(), RENDEZVOUS_TIMEOUT)
.await?;
let port = socket.get_ref().local_addr()?.port();
(socket, port)
};
let local_addr = socket.get_ref().local_addr()?;
let local_addr: SocketAddr = format!("{}:{}", local_addr.ip(), port).parse()?;
let mut socket = socket_client::connect_tcp(
format!("{}:{}", self.host, RENDEZVOUS_PORT),
Config::get_any_listen_addr(),
RENDEZVOUS_TIMEOUT,
)
.await?;
let local_addr = socket.local_addr();
let local_addr: SocketAddr =
format!("{}:{}", local_addr.ip(), local_addr.port()).parse()?;
let mut msg_out = Message::new();
let mut relay_server = Config::get_option("relay-server");
if relay_server.is_empty() {
@@ -347,10 +422,14 @@ impl RendezvousMediator {
let peer_addr = AddrMangle::decode(&ph.socket_addr);
log::debug!("Punch hole to {:?}", peer_addr);
let mut socket = {
let socket =
FramedStream::new(self.addr, Config::get_any_listen_addr(), RENDEZVOUS_TIMEOUT)
.await?;
allow_err!(FramedStream::new(peer_addr, socket.get_ref().local_addr()?, 300).await);
let socket = socket_client::connect_tcp(
format!("{}:{}", self.host, RENDEZVOUS_PORT),
Config::get_any_listen_addr(),
RENDEZVOUS_TIMEOUT,
)
.await?;
let local_addr = socket.local_addr();
allow_err!(socket_client::connect_tcp(peer_addr, local_addr, 300).await);
socket
};
let mut msg_out = Message::new();
@@ -370,7 +449,11 @@ impl RendezvousMediator {
Ok(())
}
async fn register_pk(&mut self, socket: &mut FramedSocket) -> ResultType<()> {
async fn register_pk<Frm>(&mut self, socket: &mut FramedSocket<Frm>) -> ResultType<()>
where
Frm: Unpin + Stream + Sink<(Bytes, SocketAddr)>,
<Frm as Sink<(Bytes, SocketAddr)>>::Error: Sync + Send + std::error::Error + 'static,
{
let mut msg_out = Message::new();
let pk = Config::get_key_pair().1;
let uuid = if let Ok(id) = machine_uid::get() {
@@ -391,7 +474,11 @@ impl RendezvousMediator {
Ok(())
}
async fn handle_uuid_mismatch(&mut self, socket: &mut FramedSocket) -> ResultType<()> {
async fn handle_uuid_mismatch<Frm>(&mut self, socket: &mut FramedSocket<Frm>) -> ResultType<()>
where
Frm: Unpin + Stream + Sink<(Bytes, SocketAddr)>,
<Frm as Sink<(Bytes, SocketAddr)>>::Error: Sync + Send + std::error::Error + 'static,
{
if self.last_id_pk_registry != Config::get_id() {
return Ok(());
}
@@ -409,7 +496,11 @@ impl RendezvousMediator {
self.register_pk(socket).await
}
async fn register_peer(&mut self, socket: &mut FramedSocket) -> ResultType<()> {
async fn register_peer<Frm>(&mut self, socket: &mut FramedSocket<Frm>) -> ResultType<()>
where
Frm: Unpin + Stream + Sink<(Bytes, SocketAddr)>,
<Frm as Sink<(Bytes, SocketAddr)>>::Error: Sync + Send + std::error::Error + 'static,
{
if !SOLVING_PK_MISMATCH.lock().unwrap().is_empty() {
return Ok(());
}

View File

@@ -1,5 +1,5 @@
use crate::ipc::Data;
pub use connection::*;
use connection::{ConnInner, Connection};
use hbb_common::{
allow_err,
anyhow::{anyhow, Context},
@@ -11,8 +11,8 @@ use hbb_common::{
rendezvous_proto::*,
sleep,
sodiumoxide::crypto::{box_, secretbox, sign},
tcp::FramedStream,
timeout, tokio, ResultType, Stream,
socket_client,
};
use service::{GenericService, Service, ServiceTmpl, Subscriber};
use std::{
@@ -61,7 +61,7 @@ pub fn new() -> ServerPtr {
}
async fn accept_connection_(server: ServerPtr, socket: Stream, secure: bool) -> ResultType<()> {
let local_addr = socket.get_ref().local_addr()?;
let local_addr = socket.local_addr();
drop(socket);
// even we drop socket, below still may fail if not use reuse_addr,
// there is TIME_WAIT before socket really released, so sometimes we
@@ -69,7 +69,8 @@ async fn accept_connection_(server: ServerPtr, socket: Stream, secure: bool) ->
let listener = new_listener(local_addr, true).await?;
log::info!("Server listening on: {}", &listener.local_addr()?);
if let Ok((stream, addr)) = timeout(CONNECT_TIMEOUT, listener.accept()).await? {
create_tcp_connection(server, Stream::from(stream), addr, secure).await?;
let stream_addr = stream.local_addr()?;
create_tcp_connection(server, Stream::from(stream, stream_addr), addr, secure).await?;
}
Ok(())
}
@@ -183,8 +184,8 @@ async fn create_relay_connection_(
peer_addr: SocketAddr,
secure: bool,
) -> ResultType<()> {
let mut stream = FramedStream::new(
&crate::check_port(relay_server, RELAY_PORT),
let mut stream = socket_client::connect_tcp(
crate::check_port(relay_server, RELAY_PORT),
Config::get_any_listen_addr(),
CONNECT_TIMEOUT,
)