Merge remote-tracking branch 'rd/master' into feat/x11/clipboard-file/init

Signed-off-by: ClSlaid <cailue@bupt.edu.cn>
This commit is contained in:
ClSlaid
2023-10-16 18:57:45 +08:00
111 changed files with 3845 additions and 1561 deletions

View File

@@ -1,3 +1,4 @@
use std::collections::HashMap;
use hbb_common::{
get_time,
message_proto::{Message, VoiceCallRequest, VoiceCallResponse},
@@ -7,7 +8,7 @@ use scrap::CodecFormat;
#[derive(Debug, Default)]
pub struct QualityStatus {
pub speed: Option<String>,
pub fps: Option<i32>,
pub fps: HashMap<usize, i32>,
pub delay: Option<i32>,
pub target_bitrate: Option<i32>,
pub codec_format: Option<CodecFormat>,

View File

@@ -1,33 +1,41 @@
use std::collections::HashMap;
use std::num::NonZeroI64;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
use std::{
collections::HashMap,
num::NonZeroI64,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
},
};
#[cfg(any(target_os = "windows", target_os = "linux"))]
use clipboard::ContextSend;
use crossbeam_queue::ArrayQueue;
use hbb_common::config::{PeerConfig, TransferSerde};
use hbb_common::fs::{
can_enable_overwrite_detection, get_job, get_string, new_send_confirm, DigestCheckResult,
RemoveJobMeta,
};
use hbb_common::message_proto::permission_info::Permission;
use hbb_common::protobuf::Message as _;
use hbb_common::rendezvous_proto::ConnType;
#[cfg(not(any(target_os = "android", target_os = "ios")))]
use hbb_common::sleep;
#[cfg(not(target_os = "ios"))]
use hbb_common::tokio::sync::mpsc::error::TryRecvError;
#[cfg(any(target_os = "windows", target_os = "linux"))]
use hbb_common::tokio::sync::Mutex as TokioMutex;
use hbb_common::tokio::{
self,
sync::mpsc,
time::{self, Duration, Instant, Interval},
use hbb_common::{
allow_err,
config::{PeerConfig, TransferSerde},
fs,
fs::{
can_enable_overwrite_detection, get_job, get_string, new_send_confirm, DigestCheckResult,
RemoveJobMeta,
},
get_time, log,
message_proto::permission_info::Permission,
message_proto::*,
protobuf::Message as _,
rendezvous_proto::ConnType,
tokio::{
self,
sync::mpsc,
time::{self, Duration, Instant, Interval},
},
ResultType, Stream,
};
use hbb_common::{allow_err, fs, get_time, log, message_proto::*, ResultType, Stream};
use scrap::CodecFormat;
use crate::client::{
@@ -43,7 +51,7 @@ use crate::{client::Data, client::Interface};
pub struct Remote<T: InvokeUiSession> {
handler: Session<T>,
video_queue: Arc<ArrayQueue<VideoFrame>>,
video_queue_map: Arc<RwLock<HashMap<usize, ArrayQueue<VideoFrame>>>>,
video_sender: MediaSender,
audio_sender: MediaSender,
receiver: mpsc::UnboundedReceiver<Data>,
@@ -61,27 +69,27 @@ pub struct Remote<T: InvokeUiSession> {
#[cfg(any(target_os = "windows", target_os = "linux"))]
client_conn_id: i32, // used for file clipboard
data_count: Arc<AtomicUsize>,
frame_count: Arc<AtomicUsize>,
frame_count_map: Arc<RwLock<HashMap<usize, usize>>>,
video_format: CodecFormat,
elevation_requested: bool,
fps_control: FpsControl,
decode_fps: Arc<AtomicUsize>,
fps_control_map: HashMap<usize, FpsControl>,
decode_fps_map: Arc<RwLock<HashMap<usize, usize>>>,
}
impl<T: InvokeUiSession> Remote<T> {
pub fn new(
handler: Session<T>,
video_queue: Arc<ArrayQueue<VideoFrame>>,
video_queue: Arc<RwLock<HashMap<usize, ArrayQueue<VideoFrame>>>>,
video_sender: MediaSender,
audio_sender: MediaSender,
receiver: mpsc::UnboundedReceiver<Data>,
sender: mpsc::UnboundedSender<Data>,
frame_count: Arc<AtomicUsize>,
decode_fps: Arc<AtomicUsize>,
frame_count_map: Arc<RwLock<HashMap<usize, usize>>>,
decode_fps: Arc<RwLock<HashMap<usize, usize>>>,
) -> Self {
Self {
handler,
video_queue,
video_queue_map: video_queue,
video_sender,
audio_sender,
receiver,
@@ -96,13 +104,13 @@ impl<T: InvokeUiSession> Remote<T> {
#[cfg(any(target_os = "windows", target_os = "linux"))]
client_conn_id: 0,
data_count: Arc::new(AtomicUsize::new(0)),
frame_count,
frame_count_map,
video_format: CodecFormat::Unknown,
stop_voice_call_sender: None,
voice_call_request_timestamp: None,
elevation_requested: false,
fps_control: Default::default(),
decode_fps,
fps_control_map: Default::default(),
decode_fps_map: decode_fps,
}
}
@@ -153,7 +161,7 @@ impl<T: InvokeUiSession> Remote<T> {
if !is_conn_not_default {
log::debug!("get cliprdr client for conn_id {}", self.client_conn_id);
(self.client_conn_id, rx_clip_client_lock) =
clipboard::get_rx_cliprdr_client(&self.handler.session_id);
clipboard::get_rx_cliprdr_client(&self.handler.id);
};
}
#[cfg(any(target_os = "windows", target_os = "linux"))]
@@ -230,12 +238,18 @@ impl<T: InvokeUiSession> Remote<T> {
let mut speed = self.data_count.swap(0, Ordering::Relaxed);
speed = speed * 1000 / elapsed as usize;
let speed = format!("{:.2}kB/s", speed as f32 / 1024 as f32);
let mut fps = self.frame_count.swap(0, Ordering::Relaxed) as _;
// Correcting the inaccuracy of status_timer
fps = fps * 1000 / elapsed as i32;
let mut frame_count_map_write = self.frame_count_map.write().unwrap();
let frame_count_map = frame_count_map_write.clone();
frame_count_map_write.values_mut().for_each(|v| *v = 0);
drop(frame_count_map_write);
let fps = frame_count_map.iter().map(|(k, v)| {
// Correcting the inaccuracy of status_timer
(k.clone(), (*v as i32) * 1000 / elapsed as i32)
}).collect::<HashMap<usize, i32>>();
self.handler.update_quality_status(QualityStatus {
speed:Some(speed),
fps:Some(fps),
speed: Some(speed),
fps,
..Default::default()
});
}
@@ -261,7 +275,7 @@ impl<T: InvokeUiSession> Remote<T> {
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if _set_disconnected_ok {
Client::try_stop_clipboard(&self.handler.session_id);
Client::try_stop_clipboard(&self.handler.id);
}
#[cfg(any(target_os = "windows", target_os = "linux"))]
@@ -763,10 +777,10 @@ impl<T: InvokeUiSession> Remote<T> {
}
}
}
Data::RecordScreen(start, w, h, id) => {
Data::RecordScreen(start, display, w, h, id) => {
let _ = self
.video_sender
.send(MediaData::RecordScreen(start, w, h, id));
.send(MediaData::RecordScreen(start, display, w, h, id));
}
Data::ElevateDirect => {
let mut request = ElevationRequest::new();
@@ -907,89 +921,100 @@ impl<T: InvokeUiSession> Remote<T> {
None => false,
}
}
#[inline]
fn fps_control(&mut self, direct: bool) {
let len = self.video_queue.len();
let ctl = &mut self.fps_control;
// Current full speed decoding fps
let decode_fps = self.decode_fps.load(std::sync::atomic::Ordering::Relaxed);
if decode_fps == 0 {
return;
}
let limited_fps = if direct {
decode_fps * 9 / 10 // 30 got 27
} else {
decode_fps * 4 / 5 // 30 got 24
};
// send full speed fps
let version = self.handler.lc.read().unwrap().version;
let max_encode_speed = 144 * 10 / 9;
if version >= hbb_common::get_version_number("1.2.1")
&& (ctl.last_full_speed_fps.is_none() // First time
|| ((ctl.last_full_speed_fps.unwrap_or_default() - decode_fps as i32).abs() >= 5 // diff 5
&& !(decode_fps > max_encode_speed // already exceed max encoding speed
&& ctl.last_full_speed_fps.unwrap_or_default() > max_encode_speed as i32)))
{
let mut misc = Misc::new();
misc.set_full_speed_fps(decode_fps as _);
let mut msg = Message::new();
msg.set_misc(misc);
self.sender.send(Data::Message(msg)).ok();
ctl.last_full_speed_fps = Some(decode_fps as _);
}
// decrease judgement
let debounce = if decode_fps > 10 { decode_fps / 2 } else { 5 }; // 500ms
let should_decrease = len >= debounce // exceed debounce
&& len > ctl.last_queue_size + 5 // still caching
&& !ctl.last_custom_fps.unwrap_or(i32::MAX) < limited_fps as i32; // NOT already set a smaller one
let decode_fps_read = self.decode_fps_map.read().unwrap();
for (display, decode_fps) in decode_fps_read.iter() {
let video_queue_map_read = self.video_queue_map.read().unwrap();
let Some(video_queue) = video_queue_map_read.get(display) else {
continue;
};
// increase judgement
if len <= 1 {
ctl.idle_counter += 1;
} else {
ctl.idle_counter = 0;
}
let mut should_increase = false;
if let Some(last_custom_fps) = ctl.last_custom_fps {
// ever set
if last_custom_fps + 5 < limited_fps as i32 && ctl.idle_counter > 3 {
// limited_fps is 5 larger than last set, and idle time is more than 3 seconds
should_increase = true;
if !self.fps_control_map.contains_key(display) {
self.fps_control_map.insert(*display, FpsControl::default());
}
}
if should_decrease || should_increase {
// limited_fps to ensure decoding is faster than encoding
let mut custom_fps = limited_fps as i32;
if custom_fps < 1 {
custom_fps = 1;
}
// send custom fps
let mut misc = Misc::new();
if version > hbb_common::get_version_number("1.2.1") {
// avoid confusion with custom image quality fps
misc.set_auto_adjust_fps(custom_fps as _);
let Some(ctl) = self.fps_control_map.get_mut(display) else {
return;
};
let len = video_queue.len();
let decode_fps = *decode_fps;
let limited_fps = if direct {
decode_fps * 9 / 10 // 30 got 27
} else {
misc.set_option(OptionMessage {
custom_fps,
..Default::default()
});
decode_fps * 4 / 5 // 30 got 24
};
// send full speed fps
let version = self.handler.lc.read().unwrap().version;
let max_encode_speed = 144 * 10 / 9;
if version >= hbb_common::get_version_number("1.2.1")
&& (ctl.last_full_speed_fps.is_none() // First time
|| ((ctl.last_full_speed_fps.unwrap_or_default() - decode_fps as i32).abs() >= 5 // diff 5
&& !(decode_fps > max_encode_speed // already exceed max encoding speed
&& ctl.last_full_speed_fps.unwrap_or_default() > max_encode_speed as i32)))
{
let mut misc = Misc::new();
misc.set_full_speed_fps(decode_fps as _);
let mut msg = Message::new();
msg.set_misc(misc);
self.sender.send(Data::Message(msg)).ok();
ctl.last_full_speed_fps = Some(decode_fps as _);
}
// decrease judgement
let debounce = if decode_fps > 10 { decode_fps / 2 } else { 5 }; // 500ms
let should_decrease = len >= debounce // exceed debounce
&& len > ctl.last_queue_size + 5 // still caching
&& !ctl.last_custom_fps.unwrap_or(i32::MAX) < limited_fps as i32; // NOT already set a smaller one
// increase judgement
if len <= 1 {
ctl.idle_counter += 1;
} else {
ctl.idle_counter = 0;
}
let mut should_increase = false;
if let Some(last_custom_fps) = ctl.last_custom_fps {
// ever set
if last_custom_fps + 5 < limited_fps as i32 && ctl.idle_counter > 3 {
// limited_fps is 5 larger than last set, and idle time is more than 3 seconds
should_increase = true;
}
}
if should_decrease || should_increase {
// limited_fps to ensure decoding is faster than encoding
let mut custom_fps = limited_fps as i32;
if custom_fps < 1 {
custom_fps = 1;
}
// send custom fps
let mut misc = Misc::new();
if version > hbb_common::get_version_number("1.2.1") {
// avoid confusion with custom image quality fps
misc.set_auto_adjust_fps(custom_fps as _);
} else {
misc.set_option(OptionMessage {
custom_fps,
..Default::default()
});
}
let mut msg = Message::new();
msg.set_misc(misc);
self.sender.send(Data::Message(msg)).ok();
ctl.last_queue_size = len;
ctl.last_custom_fps = Some(custom_fps);
}
// send refresh
if ctl.refresh_times < 10 // enough
&& (len > video_queue.capacity() / 2
&& (ctl.refresh_times == 0 || ctl.last_refresh_instant.elapsed().as_secs() > 30))
{
// Refresh causes client set_display, left frames cause flickering.
while let Some(_) = video_queue.pop() {}
self.handler.refresh_video(*display as _);
ctl.refresh_times += 1;
ctl.last_refresh_instant = Instant::now();
}
let mut msg = Message::new();
msg.set_misc(misc);
self.sender.send(Data::Message(msg)).ok();
ctl.last_queue_size = len;
ctl.last_custom_fps = Some(custom_fps);
}
// send refresh
if ctl.refresh_times < 10 // enough
&& (len > self.video_queue.capacity() / 2
&& (ctl.refresh_times == 0 || ctl.last_refresh_instant.elapsed().as_secs() > 30))
{
// Refresh causes client set_display, left frames cause flickering.
while let Some(_) = self.video_queue.pop() {}
self.handler.refresh_video();
ctl.refresh_times += 1;
ctl.last_refresh_instant = Instant::now();
}
}
@@ -1011,14 +1036,27 @@ impl<T: InvokeUiSession> Remote<T> {
..Default::default()
})
};
let display = vf.display as usize;
let mut video_queue_write = self.video_queue_map.write().unwrap();
if !video_queue_write.contains_key(&display) {
video_queue_write.insert(
display,
ArrayQueue::<VideoFrame>::new(crate::client::VIDEO_QUEUE_SIZE),
);
}
if Self::contains_key_frame(&vf) {
while let Some(_) = self.video_queue.pop() {}
if let Some(video_queue) = video_queue_write.get_mut(&display) {
while let Some(_) = video_queue.pop() {}
}
self.video_sender
.send(MediaData::VideoFrame(Box::new(vf)))
.ok();
} else {
self.video_queue.force_push(vf);
self.video_sender.send(MediaData::VideoQueue).ok();
if let Some(video_queue) = video_queue_write.get_mut(&display) {
video_queue.force_push(vf);
}
self.video_sender.send(MediaData::VideoQueue(display)).ok();
}
}
Some(message::Union::Hash(hash)) => {
@@ -1300,7 +1338,9 @@ impl<T: InvokeUiSession> Remote<T> {
}
Some(misc::Union::SwitchDisplay(s)) => {
self.handler.handle_peer_switch_display(&s);
self.video_sender.send(MediaData::Reset).ok();
self.video_sender
.send(MediaData::Reset(s.display as _))
.ok();
if s.width > 0 && s.height > 0 {
self.handler.set_display(
s.x,
@@ -1677,7 +1717,7 @@ impl<T: InvokeUiSession> Remote<T> {
#[cfg(feature = "flutter")]
if let Some(hbb_common::message_proto::cliprdr::Union::FormatList(_)) = &clip.union {
if self.client_conn_id
!= clipboard::get_client_conn_id(&crate::flutter::get_cur_session_id()).unwrap_or(0)
!= clipboard::get_client_conn_id(&crate::flutter::get_cur_peer_id()).unwrap_or(0)
{
return;
}