diff --git a/Cargo.lock b/Cargo.lock index e20d0b3d2..a0815f8e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5091,6 +5091,7 @@ dependencies = [ "core-foundation 0.9.3", "core-graphics 0.22.3", "cpal", + "crossbeam-queue", "ctrlc", "dark-light", "dasp", diff --git a/Cargo.toml b/Cargo.toml index 105c7da28..a01415837 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,7 @@ errno = "0.3" rdev = { git = "https://github.com/fufesou/rdev" } url = { version = "2.1", features = ["serde"] } dlopen = "0.1" +crossbeam-queue = "0.3" hex = "0.4" reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"], default-features=false } chrono = "0.4" diff --git a/src/client.rs b/src/client.rs index 1efd8b18e..2f745b70c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -13,6 +13,7 @@ use cpal::{ traits::{DeviceTrait, HostTrait, StreamTrait}, Device, Host, StreamConfig, }; +use crossbeam_queue::ArrayQueue; use magnum_opus::{Channels::*, Decoder as AudioDecoder}; #[cfg(not(any(target_os = "android", target_os = "linux")))] use ringbuf::{ring_buffer::RbBase, Rb}; @@ -67,6 +68,7 @@ pub mod io_loop; pub const MILLI1: Duration = Duration::from_millis(1); pub const SEC30: Duration = Duration::from_secs(30); +pub const VIDEO_QUEUE_SIZE: usize = 120; /// Client of the remote desktop. pub struct Client; @@ -1659,8 +1661,9 @@ impl LoginConfigHandler { /// Media data. pub enum MediaData { - VideoFrame(VideoFrame), - AudioFrame(AudioFrame), + VideoQueue, + VideoFrame(Box), + AudioFrame(Box), AudioFormat(AudioFormat), Reset, RecordScreen(bool, i32, i32, String), @@ -1674,11 +1677,15 @@ pub type MediaSender = mpsc::Sender; /// # Arguments /// /// * `video_callback` - The callback for video frame. Being called when a video frame is ready. -pub fn start_video_audio_threads(video_callback: F) -> (MediaSender, MediaSender) +pub fn start_video_audio_threads( + video_callback: F, +) -> (MediaSender, MediaSender, Arc>) where F: 'static + FnMut(&mut Vec) + Send, { let (video_sender, video_receiver) = mpsc::channel::(); + let video_queue = Arc::new(ArrayQueue::::new(VIDEO_QUEUE_SIZE)); + let video_queue_cloned = video_queue.clone(); let mut video_callback = video_callback; std::thread::spawn(move || { @@ -1687,10 +1694,17 @@ where if let Ok(data) = video_receiver.recv() { match data { MediaData::VideoFrame(vf) => { - if let Ok(true) = video_handler.handle_frame(vf) { + if let Ok(true) = video_handler.handle_frame(*vf) { video_callback(&mut video_handler.rgb); } } + MediaData::VideoQueue => { + if let Some(vf) = video_queue.pop() { + if let Ok(true) = video_handler.handle_frame(vf) { + video_callback(&mut video_handler.rgb); + } + } + } MediaData::Reset => { video_handler.reset(); } @@ -1706,7 +1720,7 @@ where log::info!("Video decoder loop exits"); }); let audio_sender = start_audio_thread(); - return (video_sender, audio_sender); + return (video_sender, audio_sender, video_queue_cloned); } /// Start an audio thread @@ -1719,7 +1733,7 @@ pub fn start_audio_thread() -> MediaSender { if let Ok(data) = audio_receiver.recv() { match data { MediaData::AudioFrame(af) => { - audio_handler.handle_frame(af); + audio_handler.handle_frame(*af); } MediaData::AudioFormat(f) => { log::debug!("recved audio format, sample rate={}", f.sample_rate); diff --git a/src/client/io_loop.rs b/src/client/io_loop.rs index 135e92a75..b0bddc82e 100644 --- a/src/client/io_loop.rs +++ b/src/client/io_loop.rs @@ -9,6 +9,7 @@ use std::sync::{ #[cfg(windows)] use clipboard::{cliprdr::CliprdrClientContext, ContextSend}; +use crossbeam_queue::ArrayQueue; use hbb_common::config::{PeerConfig, TransferSerde}; use hbb_common::fs::{ can_enable_overwrite_detection, get_job, get_string, new_send_confirm, DigestCheckResult, @@ -42,6 +43,7 @@ use crate::{client::Data, client::Interface}; pub struct Remote { handler: Session, + video_queue: Arc>, video_sender: MediaSender, audio_sender: MediaSender, receiver: mpsc::UnboundedReceiver, @@ -68,6 +70,7 @@ pub struct Remote { impl Remote { pub fn new( handler: Session, + video_queue: Arc>, video_sender: MediaSender, audio_sender: MediaSender, receiver: mpsc::UnboundedReceiver, @@ -76,6 +79,7 @@ impl Remote { ) -> Self { Self { handler, + video_queue, video_sender, audio_sender, receiver, @@ -812,6 +816,18 @@ impl Remote { } } + fn contains_key_frame(vf: &VideoFrame) -> bool { + match &vf.union { + Some(vf) => match vf { + video_frame::Union::Vp9s(f) => f.frames.iter().any(|e| e.key), + video_frame::Union::H264s(f) => f.frames.iter().any(|e| e.key), + video_frame::Union::H265s(f) => f.frames.iter().any(|e| e.key), + _ => false, + }, + None => false, + } + } + async fn handle_msg_from_peer(&mut self, data: &[u8], peer: &mut Stream) -> bool { if let Ok(msg_in) = Message::parse_from_bytes(&data) { match msg_in.union { @@ -830,7 +846,15 @@ impl Remote { ..Default::default() }) }; - self.video_sender.send(MediaData::VideoFrame(vf)).ok(); + if Self::contains_key_frame(&vf) { + while let Some(_) = self.video_queue.pop() {} + self.video_sender + .send(MediaData::VideoFrame(Box::new(vf))) + .ok(); + } else { + self.video_queue.force_push(vf); + self.video_sender.send(MediaData::VideoQueue).ok(); + } } Some(message::Union::Hash(hash)) => { self.handler @@ -1217,7 +1241,9 @@ impl Remote { } Some(message::Union::AudioFrame(frame)) => { if !self.handler.lc.read().unwrap().disable_audio.v { - self.audio_sender.send(MediaData::AudioFrame(frame)).ok(); + self.audio_sender + .send(MediaData::AudioFrame(Box::new(frame))) + .ok(); } } Some(message::Union::FileAction(action)) => match action.union { diff --git a/src/server/connection.rs b/src/server/connection.rs index 6b0cbefc3..0c17fd176 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -1669,7 +1669,7 @@ impl Connection { Some(message::Union::AudioFrame(frame)) => { if !self.disable_audio { if let Some(sender) = &self.audio_sender { - allow_err!(sender.send(MediaData::AudioFrame(frame))); + allow_err!(sender.send(MediaData::AudioFrame(Box::new(frame)))); } else { log::warn!( "Processing audio frame without the voice call audio sender." diff --git a/src/ui_session_interface.rs b/src/ui_session_interface.rs index 7ee49a3b7..f89be4879 100644 --- a/src/ui_session_interface.rs +++ b/src/ui_session_interface.rs @@ -1149,13 +1149,15 @@ pub async fn io_loop(handler: Session) { let frame_count = Arc::new(AtomicUsize::new(0)); let frame_count_cl = frame_count.clone(); let ui_handler = handler.ui_handler.clone(); - let (video_sender, audio_sender) = start_video_audio_threads(move |data: &mut Vec| { - frame_count_cl.fetch_add(1, Ordering::Relaxed); - ui_handler.on_rgba(data); - }); + let (video_sender, audio_sender, video_queue) = + start_video_audio_threads(move |data: &mut Vec| { + frame_count_cl.fetch_add(1, Ordering::Relaxed); + ui_handler.on_rgba(data); + }); let mut remote = Remote::new( handler, + video_queue, video_sender, audio_sender, receiver,