mirror of
https://github.com/weyne85/rustdesk.git
synced 2025-10-29 17:00:05 +00:00
upgrade tokio to 3.17 for a windows named pipe race condition,
https://github.com/tokio-rs/mio/pull/1760, https://github.com/tokio-rs/tokio/issues/6369
This commit is contained in:
@@ -9,15 +9,14 @@ edition = "2018"
|
||||
[dependencies]
|
||||
flexi_logger = { version = "0.27", features = ["async"] }
|
||||
protobuf = { version = "3.4", features = ["with-bytes"] }
|
||||
tokio = { version = "1.36", features = ["full"] }
|
||||
tokio = { version = "1.37", features = ["full"] }
|
||||
tokio-util = { version = "0.7", features = ["full"] }
|
||||
futures = "0.3"
|
||||
bytes = { version = "1.4", features = ["serde"] }
|
||||
bytes = { version = "1.6", features = ["serde"] }
|
||||
log = "0.4"
|
||||
env_logger = "0.10"
|
||||
socket2 = { version = "0.3", features = ["reuseport"] }
|
||||
zstd = "0.13"
|
||||
quinn = {version = "0.9", optional = true }
|
||||
anyhow = "1.0"
|
||||
futures-util = "0.3"
|
||||
directories-next = "2.0"
|
||||
@@ -26,12 +25,12 @@ serde_derive = "1.0"
|
||||
serde = "1.0"
|
||||
serde_json = "1.0"
|
||||
lazy_static = "1.4"
|
||||
confy = { git = "https://github.com/open-trade/confy" }
|
||||
confy = { git = "https://github.com/rustdesk-org/confy" }
|
||||
dirs-next = "2.0"
|
||||
filetime = "0.2"
|
||||
sodiumoxide = "0.2"
|
||||
regex = "1.8"
|
||||
tokio-socks = { git = "https://github.com/open-trade/tokio-socks" }
|
||||
tokio-socks = { git = "https://github.com/rustdesk-org/tokio-socks" }
|
||||
chrono = "0.4"
|
||||
backtrace = "0.3"
|
||||
libc = "0.2"
|
||||
@@ -55,9 +54,6 @@ rustls-pki-types = "1.4"
|
||||
[target.'cfg(any(target_os = "macos", target_os = "windows"))'.dependencies]
|
||||
tokio-native-tls ="0.3"
|
||||
|
||||
[features]
|
||||
quic = []
|
||||
|
||||
[build-dependencies]
|
||||
protobuf-codegen = { version = "3.4" }
|
||||
|
||||
|
||||
@@ -23,8 +23,6 @@ pub mod udp;
|
||||
pub use env_logger;
|
||||
pub use log;
|
||||
pub mod bytes_codec;
|
||||
#[cfg(feature = "quic")]
|
||||
pub mod quic;
|
||||
pub use anyhow::{self, bail};
|
||||
pub use futures_util;
|
||||
pub mod config;
|
||||
@@ -55,9 +53,6 @@ pub use uuid;
|
||||
pub use base64;
|
||||
pub use thiserror;
|
||||
|
||||
#[cfg(feature = "quic")]
|
||||
pub type Stream = quic::Connection;
|
||||
#[cfg(not(feature = "quic"))]
|
||||
pub type Stream = tcp::FramedStream;
|
||||
pub type SessionID = uuid::Uuid;
|
||||
|
||||
|
||||
@@ -1,135 +0,0 @@
|
||||
use crate::{allow_err, anyhow::anyhow, ResultType};
|
||||
use protobuf::Message;
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use tokio::{self, stream::StreamExt, sync::mpsc};
|
||||
|
||||
const QUIC_HBB: &[&[u8]] = &[b"hbb"];
|
||||
const SERVER_NAME: &str = "hbb";
|
||||
|
||||
type Sender = mpsc::UnboundedSender<Value>;
|
||||
type Receiver = mpsc::UnboundedReceiver<Value>;
|
||||
|
||||
pub fn new_server(socket: std::net::UdpSocket) -> ResultType<(Server, SocketAddr)> {
|
||||
let mut transport_config = quinn::TransportConfig::default();
|
||||
transport_config.stream_window_uni(0);
|
||||
let mut server_config = quinn::ServerConfig::default();
|
||||
server_config.transport = Arc::new(transport_config);
|
||||
let mut server_config = quinn::ServerConfigBuilder::new(server_config);
|
||||
server_config.protocols(QUIC_HBB);
|
||||
// server_config.enable_keylog();
|
||||
// server_config.use_stateless_retry(true);
|
||||
let mut endpoint = quinn::Endpoint::builder();
|
||||
endpoint.listen(server_config.build());
|
||||
let (end, incoming) = endpoint.with_socket(socket)?;
|
||||
Ok((Server { incoming }, end.local_addr()?))
|
||||
}
|
||||
|
||||
pub async fn new_client(local_addr: &SocketAddr, peer: &SocketAddr) -> ResultType<Connection> {
|
||||
let mut endpoint = quinn::Endpoint::builder();
|
||||
let mut client_config = quinn::ClientConfigBuilder::default();
|
||||
client_config.protocols(QUIC_HBB);
|
||||
//client_config.enable_keylog();
|
||||
endpoint.default_client_config(client_config.build());
|
||||
let (endpoint, _) = endpoint.bind(local_addr)?;
|
||||
let new_conn = endpoint.connect(peer, SERVER_NAME)?.await?;
|
||||
Connection::new_for_client(new_conn.connection).await
|
||||
}
|
||||
|
||||
pub struct Server {
|
||||
incoming: quinn::Incoming,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
#[inline]
|
||||
pub async fn next(&mut self) -> ResultType<Option<Connection>> {
|
||||
Connection::new_for_server(&mut self.incoming).await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Connection {
|
||||
conn: quinn::Connection,
|
||||
tx: quinn::SendStream,
|
||||
rx: Receiver,
|
||||
}
|
||||
|
||||
type Value = ResultType<Vec<u8>>;
|
||||
|
||||
impl Connection {
|
||||
async fn new_for_server(incoming: &mut quinn::Incoming) -> ResultType<Option<Self>> {
|
||||
if let Some(conn) = incoming.next().await {
|
||||
let quinn::NewConnection {
|
||||
connection: conn,
|
||||
// uni_streams,
|
||||
mut bi_streams,
|
||||
..
|
||||
} = conn.await?;
|
||||
let (tx, rx) = mpsc::unbounded_channel::<Value>();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let stream = bi_streams.next().await;
|
||||
if let Some(stream) = stream {
|
||||
let stream = match stream {
|
||||
Err(e) => {
|
||||
tx.send(Err(e.into())).ok();
|
||||
break;
|
||||
}
|
||||
Ok(s) => s,
|
||||
};
|
||||
let cloned = tx.clone();
|
||||
tokio::spawn(async move {
|
||||
allow_err!(handle_request(stream.1, cloned).await);
|
||||
});
|
||||
} else {
|
||||
tx.send(Err(anyhow!("Reset by the peer"))).ok();
|
||||
break;
|
||||
}
|
||||
}
|
||||
log::info!("Exit connection outer loop");
|
||||
});
|
||||
let tx = conn.open_uni().await?;
|
||||
Ok(Some(Self { conn, tx, rx }))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
async fn new_for_client(conn: quinn::Connection) -> ResultType<Self> {
|
||||
let (tx, rx_quic) = conn.open_bi().await?;
|
||||
let (tx_mpsc, rx) = mpsc::unbounded_channel::<Value>();
|
||||
tokio::spawn(async move {
|
||||
allow_err!(handle_request(rx_quic, tx_mpsc).await);
|
||||
});
|
||||
Ok(Self { conn, tx, rx })
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn next(&mut self) -> Option<Value> {
|
||||
// None is returned when all Sender halves have dropped,
|
||||
// indicating that no further values can be sent on the channel.
|
||||
self.rx.recv().await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn remote_address(&self) -> SocketAddr {
|
||||
self.conn.remote_address()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn send_raw(&mut self, bytes: &[u8]) -> ResultType<()> {
|
||||
self.tx.write_all(bytes).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn send(&mut self, msg: &dyn Message) -> ResultType<()> {
|
||||
match msg.write_to_bytes() {
|
||||
Ok(bytes) => self.send_raw(&bytes).await?,
|
||||
err => allow_err!(err),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_request(rx: quinn::RecvStream, tx: Sender) -> ResultType<()> {
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user