diff --git a/src/client.rs b/src/client.rs index 665560d62..b687c8a84 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3407,3 +3407,153 @@ async fn hc_connection_( } Ok(()) } + +pub mod peer_online { + use hbb_common::{ + anyhow::bail, + config::{Config, CONNECT_TIMEOUT, READ_TIMEOUT}, + log, + rendezvous_proto::*, + sleep, + socket_client::connect_tcp, + tcp::FramedStream, + ResultType, + }; + use std::time::Instant; + + pub async fn query_online_states, Vec)>(ids: Vec, f: F) { + let test = false; + if test { + sleep(1.5).await; + let mut onlines = ids; + let offlines = onlines.drain((onlines.len() / 2)..).collect(); + f(onlines, offlines) + } else { + let query_begin = Instant::now(); + let query_timeout = std::time::Duration::from_millis(3_000); + loop { + match query_online_states_(&ids, query_timeout).await { + Ok((onlines, offlines)) => { + f(onlines, offlines); + break; + } + Err(e) => { + log::debug!("{}", &e); + } + } + + if query_begin.elapsed() > query_timeout { + log::debug!( + "query onlines timeout {:?} ({:?})", + query_begin.elapsed(), + query_timeout + ); + break; + } + + sleep(1.5).await; + } + } + } + + async fn create_online_stream() -> ResultType { + let (rendezvous_server, _servers, _contained) = + crate::get_rendezvous_server(READ_TIMEOUT).await; + let tmp: Vec<&str> = rendezvous_server.split(":").collect(); + if tmp.len() != 2 { + bail!("Invalid server address: {}", rendezvous_server); + } + let port: u16 = tmp[1].parse()?; + if port == 0 { + bail!("Invalid server address: {}", rendezvous_server); + } + let online_server = format!("{}:{}", tmp[0], port - 1); + connect_tcp(online_server, CONNECT_TIMEOUT).await + } + + async fn query_online_states_( + ids: &Vec, + timeout: std::time::Duration, + ) -> ResultType<(Vec, Vec)> { + let query_begin = Instant::now(); + + let mut msg_out = RendezvousMessage::new(); + msg_out.set_online_request(OnlineRequest { + id: Config::get_id(), + peers: ids.clone(), + ..Default::default() + }); + + loop { + let mut socket = match create_online_stream().await { + Ok(s) => s, + Err(e) => { + log::debug!("Failed to create peers online stream, {e}"); + return Ok((vec![], ids.clone())); + } + }; + // TODO: Use long connections to avoid socket creation + // If we use a Arc>> to hold and reuse the previous socket, + // we may face the following error: + // An established connection was aborted by the software in your host machine. (os error 10053) + if let Err(e) = socket.send(&msg_out).await { + log::debug!("Failed to send peers online states query, {e}"); + return Ok((vec![], ids.clone())); + } + if let Some(msg_in) = + crate::common::get_next_nonkeyexchange_msg(&mut socket, None).await + { + match msg_in.union { + Some(rendezvous_message::Union::OnlineResponse(online_response)) => { + let states = online_response.states; + let mut onlines = Vec::new(); + let mut offlines = Vec::new(); + for i in 0..ids.len() { + // bytes index from left to right + let bit_value = 0x01 << (7 - i % 8); + if (states[i / 8] & bit_value) == bit_value { + onlines.push(ids[i].clone()); + } else { + offlines.push(ids[i].clone()); + } + } + return Ok((onlines, offlines)); + } + _ => { + // ignore + } + } + } else { + // TODO: Make sure socket closed? + bail!("Online stream receives None"); + } + + if query_begin.elapsed() > timeout { + bail!("Try query onlines timeout {:?}", &timeout); + } + + sleep(300.0).await; + } + } + + #[cfg(test)] + mod tests { + use hbb_common::tokio; + + #[tokio::test] + async fn test_query_onlines() { + super::query_online_states( + vec![ + "152183996".to_owned(), + "165782066".to_owned(), + "155323351".to_owned(), + "460952777".to_owned(), + ], + |onlines: Vec, offlines: Vec| { + println!("onlines: {:?}, offlines: {:?}", &onlines, &offlines); + }, + ) + .await; + } + } +} diff --git a/src/flutter.rs b/src/flutter.rs index d408202a9..03b5a5750 100644 --- a/src/flutter.rs +++ b/src/flutter.rs @@ -2093,8 +2093,7 @@ pub(super) mod async_tasks { ids = rx_onlines.recv() => { match ids { Some(_ids) => { - #[cfg(not(any(target_os = "ios")))] - crate::rendezvous_mediator::query_online_states(_ids, handle_query_onlines).await + crate::client::peer_online::query_online_states(_ids, handle_query_onlines).await } None => { break; diff --git a/src/rendezvous_mediator.rs b/src/rendezvous_mediator.rs index 4ae222966..bd628ec66 100644 --- a/src/rendezvous_mediator.rs +++ b/src/rendezvous_mediator.rs @@ -12,10 +12,7 @@ use uuid::Uuid; use hbb_common::{ allow_err, anyhow::{self, bail}, - config::{ - self, keys::*, option2bool, Config, CONNECT_TIMEOUT, READ_TIMEOUT, REG_INTERVAL, - RENDEZVOUS_PORT, - }, + config::{self, keys::*, option2bool, Config, CONNECT_TIMEOUT, REG_INTERVAL, RENDEZVOUS_PORT}, futures::future::join_all, log, protobuf::Message as _, @@ -703,123 +700,6 @@ async fn direct_server(server: ServerPtr) { } } -pub async fn query_online_states, Vec)>(ids: Vec, f: F) { - let test = false; - if test { - sleep(1.5).await; - let mut onlines = ids; - let offlines = onlines.drain((onlines.len() / 2)..).collect(); - f(onlines, offlines) - } else { - let query_begin = Instant::now(); - let query_timeout = std::time::Duration::from_millis(3_000); - loop { - if SHOULD_EXIT.load(Ordering::SeqCst) { - break; - } - match query_online_states_(&ids, query_timeout).await { - Ok((onlines, offlines)) => { - f(onlines, offlines); - break; - } - Err(e) => { - log::debug!("{}", &e); - } - } - - if query_begin.elapsed() > query_timeout { - log::debug!( - "query onlines timeout {:?} ({:?})", - query_begin.elapsed(), - query_timeout - ); - break; - } - - sleep(1.5).await; - } - } -} - -async fn create_online_stream() -> ResultType { - let (rendezvous_server, _servers, _contained) = - crate::get_rendezvous_server(READ_TIMEOUT).await; - let tmp: Vec<&str> = rendezvous_server.split(":").collect(); - if tmp.len() != 2 { - bail!("Invalid server address: {}", rendezvous_server); - } - let port: u16 = tmp[1].parse()?; - if port == 0 { - bail!("Invalid server address: {}", rendezvous_server); - } - let online_server = format!("{}:{}", tmp[0], port - 1); - connect_tcp(online_server, CONNECT_TIMEOUT).await -} - -async fn query_online_states_( - ids: &Vec, - timeout: std::time::Duration, -) -> ResultType<(Vec, Vec)> { - let query_begin = Instant::now(); - - let mut msg_out = RendezvousMessage::new(); - msg_out.set_online_request(OnlineRequest { - id: Config::get_id(), - peers: ids.clone(), - ..Default::default() - }); - - loop { - if SHOULD_EXIT.load(Ordering::SeqCst) { - // No need to care about onlines - return Ok((Vec::new(), Vec::new())); - } - - let mut socket = match create_online_stream().await { - Ok(s) => s, - Err(e) => { - log::debug!("Failed to create peers online stream, {e}"); - return Ok((vec![], ids.clone())); - } - }; - if let Err(e) = socket.send(&msg_out).await { - log::debug!("Failed to send peers online states query, {e}"); - return Ok((vec![], ids.clone())); - } - if let Some(msg_in) = crate::common::get_next_nonkeyexchange_msg(&mut socket, None).await { - match msg_in.union { - Some(rendezvous_message::Union::OnlineResponse(online_response)) => { - let states = online_response.states; - let mut onlines = Vec::new(); - let mut offlines = Vec::new(); - for i in 0..ids.len() { - // bytes index from left to right - let bit_value = 0x01 << (7 - i % 8); - if (states[i / 8] & bit_value) == bit_value { - onlines.push(ids[i].clone()); - } else { - offlines.push(ids[i].clone()); - } - } - return Ok((onlines, offlines)); - } - _ => { - // ignore - } - } - } else { - // TODO: Make sure socket closed? - bail!("Online stream receives None"); - } - - if query_begin.elapsed() > timeout { - bail!("Try query onlines timeout {:?}", &timeout); - } - - sleep(300.0).await; - } -} - enum Sink<'a> { Framed(&'a mut FramedSocket, &'a TargetAddr<'a>), Stream(&'a mut FramedStream), @@ -833,24 +713,3 @@ impl Sink<'_> { } } } - -#[cfg(test)] -mod tests { - use hbb_common::tokio; - - #[tokio::test] - async fn test_query_onlines() { - super::query_online_states( - vec![ - "152183996".to_owned(), - "165782066".to_owned(), - "155323351".to_owned(), - "460952777".to_owned(), - ], - |onlines: Vec, offlines: Vec| { - println!("onlines: {:?}, offlines: {:?}", &onlines, &offlines); - }, - ) - .await; - } -}