diff --git a/Cargo.lock b/Cargo.lock index 86bf2e7ab..1c65370be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4945,6 +4945,7 @@ dependencies = [ "core-foundation 0.9.3", "core-graphics 0.22.3", "cpal", + "crossbeam-queue", "ctrlc", "dark-light", "dasp", diff --git a/Cargo.toml b/Cargo.toml index 990ccb352..28e55b1a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ rdev = { git = "https://github.com/fufesou/rdev" } url = { version = "2.1", features = ["serde"] } dlopen = "0.1" hex = "0.4.3" +crossbeam-queue = "0.3" reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"], default-features=false } chrono = "0.4.23" diff --git a/src/client.rs b/src/client.rs index 1efd8b18e..39703e55a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -13,6 +13,7 @@ use cpal::{ traits::{DeviceTrait, HostTrait, StreamTrait}, Device, Host, StreamConfig, }; +use crossbeam_queue::ArrayQueue; use magnum_opus::{Channels::*, Decoder as AudioDecoder}; #[cfg(not(any(target_os = "android", target_os = "linux")))] use ringbuf::{ring_buffer::RbBase, Rb}; @@ -67,6 +68,7 @@ pub mod io_loop; pub const MILLI1: Duration = Duration::from_millis(1); pub const SEC30: Duration = Duration::from_secs(30); +pub const VIDEO_QUEUE_SIZE: usize = 120; /// Client of the remote desktop. pub struct Client; @@ -1659,7 +1661,7 @@ impl LoginConfigHandler { /// Media data. pub enum MediaData { - VideoFrame(VideoFrame), + VideoFrame, AudioFrame(AudioFrame), AudioFormat(AudioFormat), Reset, @@ -1674,11 +1676,15 @@ pub type MediaSender = mpsc::Sender; /// # Arguments /// /// * `video_callback` - The callback for video frame. Being called when a video frame is ready. -pub fn start_video_audio_threads(video_callback: F) -> (MediaSender, MediaSender) +pub fn start_video_audio_threads( + video_callback: F, +) -> (MediaSender, MediaSender, Arc>) where F: 'static + FnMut(&mut Vec) + Send, { let (video_sender, video_receiver) = mpsc::channel::(); + let video_queue = Arc::new(ArrayQueue::::new(VIDEO_QUEUE_SIZE)); + let video_queue_cloned = video_queue.clone(); let mut video_callback = video_callback; std::thread::spawn(move || { @@ -1686,9 +1692,11 @@ where loop { if let Ok(data) = video_receiver.recv() { match data { - MediaData::VideoFrame(vf) => { - if let Ok(true) = video_handler.handle_frame(vf) { - video_callback(&mut video_handler.rgb); + MediaData::VideoFrame => { + if let Some(vf) = video_queue.pop() { + if let Ok(true) = video_handler.handle_frame(vf) { + video_callback(&mut video_handler.rgb); + } } } MediaData::Reset => { @@ -1706,7 +1714,7 @@ where log::info!("Video decoder loop exits"); }); let audio_sender = start_audio_thread(); - return (video_sender, audio_sender); + return (video_sender, audio_sender, video_queue_cloned); } /// Start an audio thread diff --git a/src/client/io_loop.rs b/src/client/io_loop.rs index 135e92a75..6b0e139e2 100644 --- a/src/client/io_loop.rs +++ b/src/client/io_loop.rs @@ -9,6 +9,7 @@ use std::sync::{ #[cfg(windows)] use clipboard::{cliprdr::CliprdrClientContext, ContextSend}; +use crossbeam_queue::ArrayQueue; use hbb_common::config::{PeerConfig, TransferSerde}; use hbb_common::fs::{ can_enable_overwrite_detection, get_job, get_string, new_send_confirm, DigestCheckResult, @@ -42,6 +43,7 @@ use crate::{client::Data, client::Interface}; pub struct Remote { handler: Session, + video_queue: Arc>, video_sender: MediaSender, audio_sender: MediaSender, receiver: mpsc::UnboundedReceiver, @@ -68,6 +70,7 @@ pub struct Remote { impl Remote { pub fn new( handler: Session, + video_queue: Arc>, video_sender: MediaSender, audio_sender: MediaSender, receiver: mpsc::UnboundedReceiver, @@ -76,6 +79,7 @@ impl Remote { ) -> Self { Self { handler, + video_queue, video_sender, audio_sender, receiver, @@ -830,7 +834,8 @@ impl Remote { ..Default::default() }) }; - self.video_sender.send(MediaData::VideoFrame(vf)).ok(); + self.video_queue.force_push(vf); + self.video_sender.send(MediaData::VideoFrame).ok(); } Some(message::Union::Hash(hash)) => { self.handler diff --git a/src/ui_session_interface.rs b/src/ui_session_interface.rs index 7ee49a3b7..f89be4879 100644 --- a/src/ui_session_interface.rs +++ b/src/ui_session_interface.rs @@ -1149,13 +1149,15 @@ pub async fn io_loop(handler: Session) { let frame_count = Arc::new(AtomicUsize::new(0)); let frame_count_cl = frame_count.clone(); let ui_handler = handler.ui_handler.clone(); - let (video_sender, audio_sender) = start_video_audio_threads(move |data: &mut Vec| { - frame_count_cl.fetch_add(1, Ordering::Relaxed); - ui_handler.on_rgba(data); - }); + let (video_sender, audio_sender, video_queue) = + start_video_audio_threads(move |data: &mut Vec| { + frame_count_cl.fetch_add(1, Ordering::Relaxed); + ui_handler.on_rgba(data); + }); let mut remote = Remote::new( handler, + video_queue, video_sender, audio_sender, receiver,