rustdesk
2021-12-24 19:13:11 +08:00
parent 885d8a4586
commit 4703a7d332
7 changed files with 52 additions and 49 deletions

View File

@@ -81,6 +81,8 @@ const TEST_DELAY_TIMEOUT: Duration = Duration::from_secs(3);
const SEC30: Duration = Duration::from_secs(30);
const H1: Duration = Duration::from_secs(3600);
const MILLI1: Duration = Duration::from_millis(1);
const SEND_TIMEOUT_VIDEO: u64 = 12_000;
const SEND_TIMEOUT_OTHER: u64 = SEND_TIMEOUT_VIDEO * 10;
impl Connection {
pub async fn start(
@@ -147,6 +149,14 @@ impl Connection {
time::interval_at(Instant::now() + TEST_DELAY_TIMEOUT, TEST_DELAY_TIMEOUT);
let mut last_recv_time = Instant::now();
conn.stream.set_send_timeout(
if conn.file_transfer.is_some() || conn.port_forward_socket.is_some() {
SEND_TIMEOUT_OTHER
} else {
SEND_TIMEOUT_VIDEO
},
);
loop {
tokio::select! {
biased; // video has higher priority
@@ -286,73 +296,58 @@ impl Connection {
}
}
}
video_service::notify_video_frame_feched(id, None);
video_service::notify_video_frame_feched(id, None);
super::video_service::update_test_latency(id, 0);
super::video_service::update_image_quality(id, None);
if let Some(forward) = conn.port_forward_socket.as_mut() {
if let Err(err) = conn.try_port_forward_loop(&mut rx_from_cm).await {
conn.on_close(&err.to_string(), false);
}
}
async fn try_port_forward_loop(
&mut self,
rx_from_cm: &mut mpsc::UnboundedReceiver<Data>,
) -> ResultType<()> {
let mut last_recv_time = Instant::now();
if let Some(forward) = self.port_forward_socket.as_mut() {
log::info!("Running port forwarding loop");
conn.stream.set_raw();
self.stream.set_raw();
loop {
tokio::select! {
Some(data) = rx_from_cm.recv() => {
match data {
ipc::Data::Close => {
conn.on_close("Close requested from connection manager", false);
break;
bail!("Close requested from selfection manager");
}
_ => {}
}
}
res = forward.next() => {
if let Some(res) = res {
match res {
Err(err) => {
conn.on_close(&err.to_string(), false);
break;
},
Ok(bytes) => {
last_recv_time = Instant::now();
if let Err(err) = conn.stream.send_bytes(bytes.into()).await {
conn.on_close(&err.to_string(), false);
break;
}
}
}
last_recv_time = Instant::now();
self.stream.send_bytes(res?.into()).await?;
} else {
conn.on_close("Forward reset by the peer", false);
break;
bail!("Forward reset by the peer");
}
},
res = conn.stream.next() => {
res = self.stream.next() => {
if let Some(res) = res {
match res {
Err(err) => {
conn.on_close(&err.to_string(), false);
break;
},
Ok(bytes) => {
last_recv_time = Instant::now();
if let Err(err) = forward.send(bytes.into()).await {
conn.on_close(&err.to_string(), false);
break;
}
}
}
last_recv_time = Instant::now();
timeout(SEND_TIMEOUT_OTHER, forward.send(res?.into())).await??;
} else {
conn.on_close("Stream reset by the peer", false);
break;
bail!("Stream reset by the peer");
}
},
_ = conn.timer.tick() => {
_ = self.timer.tick() => {
if last_recv_time.elapsed() >= H1 {
conn.on_close("Timeout", false);
break;
bail!("Timeout");
}
}
}
}
}
Ok(())
}
async fn send_permission(&mut self, permission: Permission, enabled: bool) {