diff --git a/src/client/io_loop.rs b/src/client/io_loop.rs index 2dbf4f7dc..82b87a690 100644 --- a/src/client/io_loop.rs +++ b/src/client/io_loop.rs @@ -30,7 +30,7 @@ use hbb_common::{ tokio::{ self, sync::mpsc, - time::{self, Duration, Instant, Interval}, + time::{self, Duration, Instant}, }, Stream, }; @@ -62,7 +62,7 @@ pub struct Remote { read_jobs: Vec, write_jobs: Vec, remove_jobs: HashMap, - timer: Interval, + timer: crate::RustDeskInterval, last_update_jobs_status: (Instant, HashMap), is_connected: bool, first_frame: bool, @@ -99,7 +99,7 @@ impl Remote { read_jobs: Vec::new(), write_jobs: Vec::new(), remove_jobs: Default::default(), - timer: time::interval(SEC30), + timer: crate::rustdesk_interval(time::interval(SEC30)), last_update_jobs_status: (Instant::now(), Default::default()), is_connected: false, first_frame: false, @@ -170,7 +170,7 @@ impl Remote { #[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))] let mut rx_clip_client = rx_clip_client_lock.lock().await; - let mut status_timer = time::interval(Duration::new(1, 0)); + let mut status_timer = crate::rustdesk_interval(time::interval(Duration::new(1, 0))); let mut fps_instant = Instant::now(); loop { @@ -228,7 +228,7 @@ impl Remote { } self.update_jobs_status(); } else { - self.timer = time::interval_at(Instant::now() + SEC30, SEC30); + self.timer = crate::rustdesk_interval(time::interval_at(Instant::now() + SEC30, SEC30)); } } _ = status_timer.tick() => { @@ -537,7 +537,7 @@ impl Remote { } let total_size = job.total_size(); self.read_jobs.push(job); - self.timer = time::interval(MILLI1); + self.timer = crate::rustdesk_interval(time::interval(MILLI1)); allow_err!( peer.send(&fs::new_receive(id, to, file_num, files, total_size)) .await @@ -597,7 +597,7 @@ impl Remote { ); job.is_last_job = true; self.read_jobs.push(job); - self.timer = time::interval(MILLI1); + self.timer = crate::rustdesk_interval(time::interval(MILLI1)); } } } diff --git a/src/common.rs b/src/common.rs index fde8ecfa5..5286072f3 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,6 +1,7 @@ use std::{ future::Future, sync::{Arc, Mutex, RwLock}, + task::Poll, }; #[derive(Debug, Eq, PartialEq)] @@ -132,15 +133,20 @@ use hbb_common::{ bytes::Bytes, compress::compress as compress_func, config::{self, Config, CONNECT_TIMEOUT, READ_TIMEOUT}, + futures_util::future::poll_fn, get_version_number, log, message_proto::*, - protobuf::Enum, - protobuf::Message as _, + protobuf::{Enum, Message as _}, rendezvous_proto::*, socket_client, sodiumoxide::crypto::{box_, secretbox, sign}, tcp::FramedStream, - timeout, tokio, ResultType, + timeout, + tokio::{ + self, + time::{Duration, Instant, Interval}, + }, + ResultType, }; // #[cfg(any(target_os = "android", target_os = "ios", feature = "cli"))] use hbb_common::{config::RENDEZVOUS_PORT, futures::future::join_all}; @@ -1335,3 +1341,118 @@ pub fn using_public_server() -> bool { && crate::get_custom_rendezvous_server(get_option("custom-rendezvous-server")).is_empty() } +pub struct ThrottledInterval { + interval: Interval, + last_tick: Instant, + min_interval: Duration, +} + +impl ThrottledInterval { + pub fn new(i: Interval) -> ThrottledInterval { + let period = i.period(); + ThrottledInterval { + interval: i, + last_tick: Instant::now() - period * 2, + min_interval: Duration::from_secs_f64(period.as_secs_f64() * 0.9), + } + } + + pub async fn tick(&mut self) -> Instant { + loop { + let instant = poll_fn(|cx| self.poll_tick(cx)); + if let Some(instant) = instant.await { + return instant; + } + } + } + + pub fn poll_tick(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + match self.interval.poll_tick(cx) { + Poll::Ready(instant) => { + if self.last_tick.elapsed() >= self.min_interval { + self.last_tick = Instant::now(); + Poll::Ready(Some(instant)) + } else { + // This call is required since tokio 1.27 + cx.waker().wake_by_ref(); + Poll::Ready(None) + } + } + Poll::Pending => { + Poll::Pending + }, + } + } +} + +pub type RustDeskInterval = ThrottledInterval; + +#[inline] +pub fn rustdesk_interval(i: Interval) -> ThrottledInterval { + ThrottledInterval::new(i) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{format::StrftimeItems, Local}; + use hbb_common::tokio::{ + self, + time::{interval, sleep, Duration}, + }; + use std::collections::HashSet; + + #[tokio::test] + async fn test_tokio_time_interval() { + let mut timer = interval(Duration::from_secs(1)); + let mut times = Vec::new(); + sleep(Duration::from_secs(3)).await; + loop { + tokio::select! { + _ = timer.tick() => { + let format = StrftimeItems::new("%Y-%m-%d %H:%M:%S"); + times.push(Local::now().format_with_items(format).to_string()); + if times.len() == 5 { + break; + } + } + } + } + let times2: HashSet = HashSet::from_iter(times.clone()); + assert_eq!(times.len(), times2.len() + 3); + } + + #[allow(non_snake_case)] + #[tokio::test] + async fn test_RustDesk_interval() { + let mut timer = rustdesk_interval(interval(Duration::from_secs(1))); + let mut times = Vec::new(); + sleep(Duration::from_secs(3)).await; + loop { + tokio::select! { + _ = timer.tick() => { + let format = StrftimeItems::new("%Y-%m-%d %H:%M:%S"); + times.push(Local::now().format_with_items(format).to_string()); + if times.len() == 5 { + break; + } + } + } + } + let times2: HashSet = HashSet::from_iter(times.clone()); + assert_eq!(times.len(), times2.len()); + } + + #[test] + fn test_duration_multiplication() { + let dur = Duration::from_secs(1); + + assert_eq!(dur * 2, Duration::from_secs(2)); + assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.9), Duration::from_millis(900)); + assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.923), Duration::from_millis(923)); + assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.923 * 1e-3), Duration::from_micros(923)); + assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.923 * 1e-6), Duration::from_nanos(923)); + assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.923 * 1e-9), Duration::from_nanos(1)); + assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.923 * 1e-10), Duration::from_nanos(0)); + } +} diff --git a/src/hbbs_http/sync.rs b/src/hbbs_http/sync.rs index e020c5a68..ac508a963 100644 --- a/src/hbbs_http/sync.rs +++ b/src/hbbs_http/sync.rs @@ -51,7 +51,7 @@ pub struct StrategyOptions { #[cfg(not(any(target_os = "ios")))] #[tokio::main(flavor = "current_thread")] async fn start_hbbs_sync_async() { - let mut interval = tokio::time::interval_at(Instant::now() + TIME_CONN, TIME_CONN); + let mut interval = crate::rustdesk_interval(tokio::time::interval_at(Instant::now() + TIME_CONN, TIME_CONN)); let mut last_sent: Option = None; let mut info_uploaded: (bool, String, Option) = (false, "".to_owned(), None); loop { diff --git a/src/license.rs b/src/license.rs index cb9e72dd5..c3c295678 100644 --- a/src/license.rs +++ b/src/license.rs @@ -116,6 +116,7 @@ mod test { host: "server.example.net".to_owned(), key: "".to_owned(), api: "".to_owned(), + relay: "".to_owned(), } ); assert_eq!( @@ -124,6 +125,7 @@ mod test { host: "server.example.net".to_owned(), key: "".to_owned(), api: "".to_owned(), + relay: "".to_owned(), } ); // key in these tests is "foobar.,2" base64 encoded @@ -136,6 +138,7 @@ mod test { host: "server.example.net".to_owned(), key: "Zm9vYmFyLiwyCg==".to_owned(), api: "abc".to_owned(), + relay: "".to_owned(), } ); assert_eq!( @@ -145,6 +148,7 @@ mod test { host: "server.example.net".to_owned(), key: "Zm9vYmFyLiwyCg==".to_owned(), api: "".to_owned(), + relay: "".to_owned(), } ); } diff --git a/src/rendezvous_mediator.rs b/src/rendezvous_mediator.rs index 7fc6d8927..9ef044fdd 100644 --- a/src/rendezvous_mediator.rs +++ b/src/rendezvous_mediator.rs @@ -152,8 +152,7 @@ impl RendezvousMediator { keep_alive: DEFAULT_KEEP_ALIVE, }; - let mut timer = interval(TIMER_OUT); - let mut last_timer: Option = None; + let mut timer = crate::rustdesk_interval(interval(TIMER_OUT)); const MIN_REG_TIMEOUT: i64 = 3_000; const MAX_REG_TIMEOUT: i64 = 30_000; let mut reg_timeout = MIN_REG_TIMEOUT; @@ -215,11 +214,6 @@ impl RendezvousMediator { break; } let now = Some(Instant::now()); - if last_timer.map(|x| x.elapsed() < TIMER_OUT).unwrap_or(false) { - // a workaround of tokio timer bug - continue; - } - last_timer = now; let expired = last_register_resp.map(|x| x.elapsed().as_millis() as i64 >= REG_INTERVAL).unwrap_or(true); let timeout = last_register_sent.map(|x| x.elapsed().as_millis() as i64 >= reg_timeout).unwrap_or(false); // temporarily disable exponential backoff for android before we add wakeup trigger to force connect in android @@ -342,7 +336,7 @@ impl RendezvousMediator { host_prefix: Self::get_host_prefix(&host), keep_alive: DEFAULT_KEEP_ALIVE, }; - let mut timer = interval(TIMER_OUT); + let mut timer = crate::rustdesk_interval(interval(TIMER_OUT)); let mut last_register_sent: Option = None; let mut last_recv_msg = Instant::now(); // we won't support connecting to multiple rendzvous servers any more, so we can use a global variable here. diff --git a/src/server/connection.rs b/src/server/connection.rs index 4e8a86a50..8d0f1caf7 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -41,7 +41,7 @@ use hbb_common::{ tokio::{ net::TcpStream, sync::mpsc, - time::{self, Duration, Instant, Interval}, + time::{self, Duration, Instant}, }, tokio_util::codec::{BytesCodec, Framed}, }; @@ -175,8 +175,8 @@ pub struct Connection { server: super::ServerPtrWeak, hash: Hash, read_jobs: Vec, - timer: Interval, - file_timer: Interval, + timer: crate::RustDeskInterval, + file_timer: crate::RustDeskInterval, file_transfer: Option<(String, bool)>, port_forward_socket: Option>, port_forward_address: String, @@ -327,8 +327,8 @@ impl Connection { server, hash, read_jobs: Vec::new(), - timer: time::interval(SEC30), - file_timer: time::interval(SEC30), + timer: crate::rustdesk_interval(time::interval(SEC30)), + file_timer: crate::rustdesk_interval(time::interval(SEC30)), file_transfer: None, port_forward_socket: None, port_forward_address: "".to_owned(), @@ -419,7 +419,7 @@ impl Connection { if !conn.block_input { conn.send_permission(Permission::BlockInput, false).await; } - let mut test_delay_timer = time::interval(TEST_DELAY_TIMEOUT); + let mut test_delay_timer = crate::rustdesk_interval(time::interval(TEST_DELAY_TIMEOUT)); let mut last_recv_time = Instant::now(); conn.stream.set_send_timeout( @@ -432,7 +432,7 @@ impl Connection { #[cfg(not(any(target_os = "android", target_os = "ios")))] std::thread::spawn(move || Self::handle_input(_rx_input, tx_cloned)); - let mut second_timer = time::interval(Duration::from_secs(1)); + let mut second_timer = crate::rustdesk_interval(time::interval(Duration::from_secs(1))); loop { tokio::select! { @@ -608,7 +608,7 @@ impl Connection { } } } else { - conn.file_timer = time::interval_at(Instant::now() + SEC30, SEC30); + conn.file_timer = crate::rustdesk_interval(time::interval_at(Instant::now() + SEC30, SEC30)); } } Ok(conns) = hbbs_rx.recv() => { @@ -2054,7 +2054,8 @@ impl Connection { job.is_remote = true; job.conn_id = self.inner.id(); self.read_jobs.push(job); - self.file_timer = time::interval(MILLI1); + self.file_timer = + crate::rustdesk_interval(time::interval(MILLI1)); self.post_file_audit( FileAuditType::RemoteSend, &s.path, diff --git a/src/server/portable_service.rs b/src/server/portable_service.rs index 9d8b694fb..8ed581020 100644 --- a/src/server/portable_service.rs +++ b/src/server/portable_service.rs @@ -442,7 +442,8 @@ pub mod server { match ipc::connect(1000, postfix).await { Ok(mut stream) => { - let mut timer = tokio::time::interval(Duration::from_secs(1)); + let mut timer = + crate::rustdesk_interval(tokio::time::interval(Duration::from_secs(1))); let mut nack = 0; loop { tokio::select! { @@ -777,7 +778,7 @@ pub mod client { tokio::spawn(async move { let mut stream = Connection::new(stream); let postfix = postfix.to_owned(); - let mut timer = tokio::time::interval(Duration::from_secs(1)); + let mut timer = crate::rustdesk_interval(tokio::time::interval(Duration::from_secs(1))); let mut nack = 0; let mut rx = rx_clone.lock().await; loop { diff --git a/src/tray.rs b/src/tray.rs index b6772cafd..7be8b7b81 100644 --- a/src/tray.rs +++ b/src/tray.rs @@ -173,7 +173,7 @@ async fn start_query_session_count(sender: std::sync::mpsc::Sender) { let mut last_count = 0; loop { if let Ok(mut c) = crate::ipc::connect(1000, "").await { - let mut timer = tokio::time::interval(Duration::from_secs(1)); + let mut timer = crate::rustdesk_interval(tokio::time::interval(Duration::from_secs(1))); loop { tokio::select! { res = c.next() => { diff --git a/src/ui_interface.rs b/src/ui_interface.rs index dbcf525db..cc41ea247 100644 --- a/src/ui_interface.rs +++ b/src/ui_interface.rs @@ -1038,9 +1038,7 @@ async fn check_connect_status_(reconnect: bool, rx: mpsc::UnboundedReceiver { @@ -1108,11 +1106,6 @@ async fn check_connect_status_(reconnect: bool, rx: mpsc::UnboundedReceiver { - if last_timer.elapsed() < TIMER_OUT { - continue; - } - last_timer = time::Instant::now(); - c.send(&ipc::Data::OnlineStatus(None)).await.ok(); c.send(&ipc::Data::Options(None)).await.ok(); c.send(&ipc::Data::Config(("id".to_owned(), None))).await.ok();