tokio1, windows not test yet

This commit is contained in:
rustdesk
2021-06-25 19:42:51 +08:00
parent 25a83f6b4a
commit dc3fcda2c9
23 changed files with 169 additions and 154 deletions

View File

@@ -1,16 +1,13 @@
use crate::{bail, bytes_codec::BytesCodec, ResultType};
use bytes::{BufMut, Bytes, BytesMut};
use futures::SinkExt;
use futures::{SinkExt, StreamExt};
use protobuf::Message;
use sodiumoxide::crypto::secretbox::{self, Key, Nonce};
use std::{
io::{Error, ErrorKind},
ops::{Deref, DerefMut},
};
use tokio::{
net::{TcpListener, TcpStream, ToSocketAddrs},
stream::StreamExt,
};
use tokio::net::{lookup_host, TcpListener, TcpSocket, TcpStream, ToSocketAddrs};
use tokio_util::codec::Framed;
pub struct FramedStream(Framed<TcpStream, BytesCodec>, Option<(Key, u64, u64)>);
@@ -29,25 +26,37 @@ impl DerefMut for FramedStream {
}
}
fn new_socket(addr: std::net::SocketAddr, reuse: bool) -> Result<TcpSocket, std::io::Error> {
let socket = match addr {
std::net::SocketAddr::V4(..) => TcpSocket::new_v4()?,
std::net::SocketAddr::V6(..) => TcpSocket::new_v6()?,
};
if reuse {
// windows has no reuse_port, but it's reuse_address
// almost equals to unix's reuse_port + reuse_address,
// though may introduce nondeterministic bahavior
#[cfg(unix)]
socket.set_reuseport(true)?;
socket.set_reuseaddr(true)?;
}
socket.bind(addr)?;
Ok(socket)
}
impl FramedStream {
pub async fn new<T: ToSocketAddrs, T2: ToSocketAddrs>(
remote_addr: T,
local_addr: T2,
ms_timeout: u64,
) -> ResultType<Self> {
for local_addr in local_addr.to_socket_addrs().await? {
for remote_addr in remote_addr.to_socket_addrs().await? {
if let Ok(stream) = super::timeout(
for local_addr in lookup_host(&local_addr).await? {
for remote_addr in lookup_host(&remote_addr).await? {
let stream = super::timeout(
ms_timeout,
TcpStream::connect_std(
super::new_socket(local_addr, true, true)?.into_tcp_stream(),
&remote_addr,
),
new_socket(local_addr, true)?.connect(remote_addr),
)
.await?
{
return Ok(Self(Framed::new(stream, BytesCodec::new()), None));
}
.await??;
return Ok(Self(Framed::new(stream, BytesCodec::new()), None));
}
}
bail!("could not resolve to any address");
@@ -124,22 +133,21 @@ impl FramedStream {
fn get_nonce(seqnum: u64) -> Nonce {
let mut nonce = Nonce([0u8; secretbox::NONCEBYTES]);
nonce.0[..std::mem::size_of_val(&seqnum)].copy_from_slice(&seqnum.to_ne_bytes());
nonce.0[..std::mem::size_of_val(&seqnum)].copy_from_slice(&seqnum.to_le_bytes());
nonce
}
}
const DEFAULT_BACKLOG: i32 = 128;
const DEFAULT_BACKLOG: u32 = 128;
#[allow(clippy::never_loop)]
pub async fn new_listener<T: ToSocketAddrs>(addr: T, reuse: bool) -> ResultType<TcpListener> {
if !reuse {
Ok(TcpListener::bind(addr).await?)
} else {
for addr in addr.to_socket_addrs().await? {
let socket = super::new_socket(addr, true, true)?;
socket.listen(DEFAULT_BACKLOG)?;
return Ok(TcpListener::from_std(socket.into_tcp_listener())?);
for addr in lookup_host(&addr).await? {
let socket = new_socket(addr, true)?;
return Ok(socket.listen(DEFAULT_BACKLOG)?);
}
bail!("could not resolve to any address");
}