Merge remote-tracking branch 'origin/master' into feat/x11/clipboard-file/init

Signed-off-by: 蔡略 <cailue@bupt.edu.cn>
This commit is contained in:
蔡略
2023-09-08 20:09:57 +08:00
302 changed files with 14257 additions and 6606 deletions

View File

@@ -56,6 +56,7 @@ pub struct Remote<T: InvokeUiSession> {
remove_jobs: HashMap<i32, RemoveJob>,
timer: Interval,
last_update_jobs_status: (Instant, HashMap<i32, u64>),
is_connected: bool,
first_frame: bool,
#[cfg(any(target_os = "windows", target_os = "linux"))]
client_conn_id: i32, // used for file clipboard
@@ -90,6 +91,7 @@ impl<T: InvokeUiSession> Remote<T> {
remove_jobs: Default::default(),
timer: time::interval(SEC30),
last_update_jobs_status: (Instant::now(), Default::default()),
is_connected: false,
first_frame: false,
#[cfg(any(target_os = "windows", target_os = "linux"))]
client_conn_id: 0,
@@ -124,7 +126,7 @@ impl<T: InvokeUiSession> Remote<T> {
{
Ok((mut peer, direct, pk)) => {
self.handler.set_connection_type(peer.is_secured(), direct); // flutter -> connection_ready
self.handler.set_connection_info(direct, false);
self.handler.update_direct(Some(direct));
if conn_type == ConnType::DEFAULT_CONN {
self.handler
.set_fingerprint(crate::common::pk_to_fingerprint(pk.unwrap_or_default()));
@@ -160,24 +162,14 @@ impl<T: InvokeUiSession> Remote<T> {
if let Some(res) = res {
match res {
Err(err) => {
log::error!("Connection closed: {}", err);
self.handler.set_force_relay(direct, received);
let msgtype = "error";
let title = "Connection Error";
let text = err.to_string();
let show_relay_hint = self.handler.show_relay_hint(last_recv_time, msgtype, title, &text);
if show_relay_hint{
self.handler.msgbox("relay-hint", title, &text, "");
} else {
self.handler.msgbox(msgtype, title, &text, "");
}
self.handler.on_establish_connection_error(err.to_string());
break;
}
Ok(ref bytes) => {
last_recv_time = Instant::now();
if !received {
received = true;
self.handler.set_connection_info(direct, true);
self.handler.update_received(true);
}
self.data_count.fetch_add(bytes.len(), Ordering::Relaxed);
if !self.handle_msg_from_peer(bytes, &mut peer).await {
@@ -205,28 +197,7 @@ impl<T: InvokeUiSession> Remote<T> {
}
_msg = rx_clip_client.recv() => {
#[cfg(any(target_os="windows", target_os="linux"))]
match _msg {
Some(clip) => match clip {
clipboard::ClipboardFile::NotifyCallback{r#type, title, text} => {
self.handler.msgbox(&r#type, &title, &text, "");
}
_ => {
let is_stopping_allowed = clip.is_stopping_allowed();
let server_file_transfer_enabled = *self.handler.server_file_transfer_enabled.read().unwrap();
let file_transfer_enabled = self.handler.lc.read().unwrap().enable_file_transfer.v;
let stop = is_stopping_allowed && !(server_file_transfer_enabled && file_transfer_enabled);
log::debug!("Process clipboard message from system, stop: {}, is_stopping_allowed: {}, server_file_transfer_enabled: {}, file_transfer_enabled: {}", stop, is_stopping_allowed, server_file_transfer_enabled, file_transfer_enabled);
if stop {
ContextSend::set_is_stopped();
} else {
allow_err!(peer.send(&crate::clipboard_file::clip_2_msg(clip)).await);
}
}
}
None => {
// unreachable!()
}
}
self.handle_local_clipboard_msg(&mut peer, _msg).await;
}
_ = self.timer.tick() => {
if last_recv_time.elapsed() >= SEC30 {
@@ -244,7 +215,7 @@ impl<T: InvokeUiSession> Remote<T> {
}
}
_ = status_timer.tick() => {
self.fps_control();
self.fps_control(direct);
let elapsed = fps_instant.elapsed().as_millis();
if elapsed < 1000 {
continue;
@@ -271,8 +242,7 @@ impl<T: InvokeUiSession> Remote<T> {
}
}
Err(err) => {
self.handler
.msgbox("error", "Connection Error", &err.to_string(), "");
self.handler.on_establish_connection_error(err.to_string());
}
}
#[cfg(not(any(target_os = "android", target_os = "ios")))]
@@ -288,6 +258,44 @@ impl<T: InvokeUiSession> Remote<T> {
}
}
#[cfg(any(target_os = "windows", target_os = "linux"))]
async fn handle_local_clipboard_msg(
&self,
peer: &mut crate::client::FramedStream,
msg: Option<clipboard::ClipboardFile>,
) {
match msg {
Some(clip) => match clip {
clipboard::ClipboardFile::NotifyCallback {
r#type,
title,
text,
} => {
self.handler.msgbox(&r#type, &title, &text, "");
}
_ => {
let is_stopping_allowed = clip.is_stopping_allowed();
let server_file_transfer_enabled =
*self.handler.server_file_transfer_enabled.read().unwrap();
let file_transfer_enabled =
self.handler.lc.read().unwrap().enable_file_transfer.v;
let stop = is_stopping_allowed
&& (!self.is_connected
|| !(server_file_transfer_enabled && file_transfer_enabled));
log::debug!("Process clipboard message from system, stop: {}, is_stopping_allowed: {}, server_file_transfer_enabled: {}, file_transfer_enabled: {}", stop, is_stopping_allowed, server_file_transfer_enabled, file_transfer_enabled);
if stop {
ContextSend::set_is_stopped();
} else {
allow_err!(peer.send(&crate::clipboard_file::clip_2_msg(clip)).await);
}
}
},
None => {
// unreachable!()
}
}
}
fn handle_job_status(&mut self, id: i32, file_num: i32, err: Option<String>) {
if let Some(job) = self.remove_jobs.get_mut(&id) {
if job.no_confirm {
@@ -475,9 +483,13 @@ impl<T: InvokeUiSession> Remote<T> {
// peer is not windows, need transform \ to /
fs::transform_windows_path(&mut files);
}
let total_size = job.total_size();
self.read_jobs.push(job);
self.timer = time::interval(MILLI1);
allow_err!(peer.send(&fs::new_receive(id, to, file_num, files)).await);
allow_err!(
peer.send(&fs::new_receive(id, to, file_num, files, total_size))
.await
);
}
}
}
@@ -560,7 +572,8 @@ impl<T: InvokeUiSession> Remote<T> {
id,
job.path.to_string_lossy().to_string(),
job.file_num,
job.files.clone()
job.files.clone(),
job.total_size(),
))
.await
);
@@ -843,8 +856,10 @@ impl<T: InvokeUiSession> Remote<T> {
transfer_metas.write_jobs.push(json_str);
}
log::info!("meta: {:?}", transfer_metas);
config.transfer = transfer_metas;
self.handler.save_config(config);
if config.transfer != transfer_metas {
config.transfer = transfer_metas;
self.handler.save_config(config);
}
true
}
@@ -875,38 +890,77 @@ impl<T: InvokeUiSession> Remote<T> {
}
}
#[inline]
fn fps_control(&mut self) {
fn fps_control(&mut self, direct: bool) {
let len = self.video_queue.len();
let ctl = &mut self.fps_control;
// Current full speed decoding fps
let decode_fps = self.decode_fps.load(std::sync::atomic::Ordering::Relaxed);
// 500ms
let debounce = if decode_fps > 10 { decode_fps / 2 } else { 5 };
if len < debounce || decode_fps == 0 {
if decode_fps == 0 {
return;
}
// First setting , or the length of the queue still increases after setting, or exceed the size of the last setting again
if ctl.set_times < 10 // enough
&& (ctl.set_times == 0
|| (len > ctl.last_queue_size && ctl.last_set_instant.elapsed().as_secs() > 30))
let limited_fps = if direct {
decode_fps * 9 / 10 // 30 got 27
} else {
decode_fps * 4 / 5 // 30 got 24
};
// send full speed fps
let version = self.handler.lc.read().unwrap().version;
let max_encode_speed = 144 * 10 / 9;
if version >= hbb_common::get_version_number("1.2.1")
&& (ctl.last_full_speed_fps.is_none() // First time
|| ((ctl.last_full_speed_fps.unwrap_or_default() - decode_fps as i32).abs() >= 5 // diff 5
&& !(decode_fps > max_encode_speed // already exceed max encoding speed
&& ctl.last_full_speed_fps.unwrap_or_default() > max_encode_speed as i32)))
{
// 80% fps to ensure decoding is faster than encoding
let mut custom_fps = decode_fps as i32 * 4 / 5;
let mut misc = Misc::new();
misc.set_full_speed_fps(decode_fps as _);
let mut msg = Message::new();
msg.set_misc(misc);
self.sender.send(Data::Message(msg)).ok();
ctl.last_full_speed_fps = Some(decode_fps as _);
}
// decrease judgement
let debounce = if decode_fps > 10 { decode_fps / 2 } else { 5 }; // 500ms
let should_decrease = len >= debounce // exceed debounce
&& len > ctl.last_queue_size + 5 // still caching
&& !ctl.last_custom_fps.unwrap_or(i32::MAX) < limited_fps as i32; // NOT already set a smaller one
// increase judgement
if len <= 1 {
ctl.idle_counter += 1;
} else {
ctl.idle_counter = 0;
}
let mut should_increase = false;
if let Some(last_custom_fps) = ctl.last_custom_fps {
// ever set
if last_custom_fps + 5 < limited_fps as i32 && ctl.idle_counter > 3 {
// limited_fps is 5 larger than last set, and idle time is more than 3 seconds
should_increase = true;
}
}
if should_decrease || should_increase {
// limited_fps to ensure decoding is faster than encoding
let mut custom_fps = limited_fps as i32;
if custom_fps < 1 {
custom_fps = 1;
}
// send custom fps
let mut misc = Misc::new();
misc.set_option(OptionMessage {
custom_fps,
..Default::default()
});
if version > hbb_common::get_version_number("1.2.1") {
// avoid confusion with custom image quality fps
misc.set_auto_adjust_fps(custom_fps as _);
} else {
misc.set_option(OptionMessage {
custom_fps,
..Default::default()
});
}
let mut msg = Message::new();
msg.set_misc(misc);
self.sender.send(Data::Message(msg)).ok();
ctl.last_queue_size = len;
ctl.set_times += 1;
ctl.last_set_instant = Instant::now();
ctl.last_custom_fps = Some(custom_fps);
}
// send refresh
if ctl.refresh_times < 10 // enough
@@ -1002,6 +1056,8 @@ impl<T: InvokeUiSession> Remote<T> {
if self.handler.is_file_transfer() {
self.handler.load_last_jobs();
}
self.is_connected = true;
}
_ => {}
},
@@ -1415,12 +1471,9 @@ impl<T: InvokeUiSession> Remote<T> {
}
}
}
Some(message::Union::PeerInfo(pi)) => match pi.conn_id {
crate::SYNC_PEER_INFO_DISPLAYS => {
self.handler.set_displays(&pi.displays);
}
_ => {}
},
Some(message::Union::PeerInfo(pi)) => {
self.handler.set_displays(&pi.displays);
}
_ => {}
}
}
@@ -1620,20 +1673,22 @@ impl RemoveJob {
struct FpsControl {
last_queue_size: usize,
set_times: usize,
refresh_times: usize,
last_set_instant: Instant,
last_refresh_instant: Instant,
last_full_speed_fps: Option<i32>,
last_custom_fps: Option<i32>,
idle_counter: usize,
}
impl Default for FpsControl {
fn default() -> Self {
Self {
last_queue_size: Default::default(),
set_times: Default::default(),
refresh_times: Default::default(),
last_set_instant: Instant::now(),
last_refresh_instant: Instant::now(),
last_full_speed_fps: None,
last_custom_fps: None,
idle_counter: 0,
}
}
}