flutter_desktop_online_state: refactor connection page

Signed-off-by: fufesou <shuanglongchen@yeah.net>
This commit is contained in:
fufesou
2022-07-27 22:56:28 +08:00
parent b6d56790fe
commit 0ba8b4079b
15 changed files with 1152 additions and 101 deletions

View File

@@ -985,6 +985,21 @@ unsafe extern "C" fn set_by_name(name: *const c_char, value: *const c_char) {
}
}
fn handle_query_onlines(onlines: Vec<String>, offlines: Vec<String>) {
if let Some(s) = flutter::GLOBAL_EVENT_STREAM.read().unwrap().as_ref() {
let data = HashMap::from([
("name", "callback_query_onlines".to_owned()),
("onlines", onlines.join(",")),
("offlines", offlines.join(",")),
]);
s.add(serde_json::ser::to_string(&data).unwrap_or("".to_owned()));
};
}
pub fn query_onlines(ids: Vec<String>) {
crate::rendezvous_mediator::query_online_states(ids, handle_query_onlines)
}
#[cfg(target_os = "android")]
pub mod server_side {
use jni::{

View File

@@ -8,6 +8,7 @@ use hbb_common::{
protobuf::Message as _,
rendezvous_proto::*,
sleep, socket_client,
tcp::FramedStream,
tokio::{
self, select,
time::{interval, Duration},
@@ -637,3 +638,139 @@ pub fn discover() -> ResultType<()> {
config::LanPeers::store(serde_json::to_string(&peers)?);
Ok(())
}
#[tokio::main(flavor = "current_thread")]
pub async fn query_online_states<F: FnOnce(Vec<String>, Vec<String>)>(ids: Vec<String>, f: F) {
let test = false;
if test {
sleep(1.5).await;
let mut onlines = ids;
let offlines = onlines.drain((onlines.len() / 2)..).collect();
f(onlines, offlines)
} else {
let query_begin = Instant::now();
let query_timeout = std::time::Duration::from_millis(3_000);
loop {
if SHOULD_EXIT.load(Ordering::SeqCst) {
break;
}
match query_online_states_(&ids, query_timeout).await {
Ok((onlines, offlines)) => {
f(onlines, offlines);
break;
}
Err(e) => {
log::debug!("{}", &e);
}
}
if query_begin.elapsed() > query_timeout {
log::debug!("query onlines timeout {:?}", query_timeout);
break;
}
sleep(1.5).await;
}
}
}
async fn create_online_stream() -> ResultType<FramedStream> {
let rendezvous_server = crate::get_rendezvous_server(1_000).await;
let tmp: Vec<&str> = rendezvous_server.split(":").collect();
if tmp.len() != 2 {
bail!("Invalid server address: {}", rendezvous_server);
}
let port: u16 = tmp[1].parse()?;
if port == 0 {
bail!("Invalid server address: {}", rendezvous_server);
}
let online_server = format!("{}:{}", tmp[0], port - 1);
let server_addr = socket_client::get_target_addr(&online_server)?;
socket_client::connect_tcp(
server_addr,
Config::get_any_listen_addr(),
RENDEZVOUS_TIMEOUT,
)
.await
}
async fn query_online_states_(
ids: &Vec<String>,
timeout: std::time::Duration,
) -> ResultType<(Vec<String>, Vec<String>)> {
let query_begin = Instant::now();
let mut msg_out = RendezvousMessage::new();
msg_out.set_online_request(OnlineRequest {
id: Config::get_id(),
peers: ids.clone(),
..Default::default()
});
loop {
if SHOULD_EXIT.load(Ordering::SeqCst) {
// No need to care about onlines
return Ok((Vec::new(), Vec::new()));
}
let mut socket = create_online_stream().await?;
socket.send(&msg_out).await?;
match socket.next_timeout(RENDEZVOUS_TIMEOUT).await {
Some(Ok(bytes)) => {
if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
match msg_in.union {
Some(rendezvous_message::Union::online_response(online_response)) => {
let states = online_response.states;
let mut onlines = Vec::new();
let mut offlines = Vec::new();
for i in 0..ids.len() {
// bytes index from left to right
let bit_value = 0x01 << (7 - i % 8);
if (states[i / 8] & bit_value) == bit_value {
onlines.push(ids[i].clone());
} else {
offlines.push(ids[i].clone());
}
}
return Ok((onlines, offlines));
}
_ => {
// ignore
}
}
}
}
Some(Err(e)) => {
log::error!("Failed to receive {e}");
}
None => {
// TODO: Make sure socket closed?
bail!("Online stream receives None");
}
}
if query_begin.elapsed() > timeout {
bail!("Try query onlines timeout {:?}", &timeout);
}
sleep(300.0).await;
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_query_onlines() {
super::query_online_states(
vec![
"152183996".to_owned(),
"165782066".to_owned(),
"155323351".to_owned(),
"460952777".to_owned(),
],
|onlines: Vec<String>, offlines: Vec<String>| {
println!("onlines: {:?}, offlines: {:?}", &onlines, &offlines);
},
);
}
}