Merge branch 'master' into lan_discovery

This commit is contained in:
RustDesk
2022-01-10 17:34:51 +08:00
committed by GitHub
68 changed files with 3775 additions and 1605 deletions

View File

@@ -36,35 +36,34 @@ mod pa_impl {
use super::*;
#[tokio::main(flavor = "current_thread")]
pub async fn run(sp: GenericService) -> ResultType<()> {
if let Ok(mut stream) = crate::ipc::connect(1000, "_pa").await {
let mut encoder =
Encoder::new(crate::platform::linux::PA_SAMPLE_RATE, Stereo, LowDelay)?;
allow_err!(
stream
.send(&crate::ipc::Data::Config((
"audio-input".to_owned(),
Some(Config::get_option("audio-input"))
)))
.await
);
while sp.ok() {
sp.snapshot(|sps| {
sps.send(create_format_msg(crate::platform::linux::PA_SAMPLE_RATE, 2));
Ok(())
})?;
if let Some(data) = stream.next_timeout2(1000).await {
match data? {
Some(crate::ipc::Data::RawMessage(bytes)) => {
let data = unsafe {
std::slice::from_raw_parts::<f32>(
bytes.as_ptr() as _,
bytes.len() / 4,
)
};
send_f32(data, &mut encoder, &sp);
}
_ => {}
hbb_common::sleep(0.1).await; // one moment to wait for _pa ipc
let mut stream = crate::ipc::connect(1000, "_pa").await?;
unsafe {
AUDIO_ZERO_COUNT = 0;
}
let mut encoder = Encoder::new(crate::platform::linux::PA_SAMPLE_RATE, Stereo, LowDelay)?;
allow_err!(
stream
.send(&crate::ipc::Data::Config((
"audio-input".to_owned(),
Some(Config::get_option("audio-input"))
)))
.await
);
while sp.ok() {
sp.snapshot(|sps| {
sps.send(create_format_msg(crate::platform::linux::PA_SAMPLE_RATE, 2));
Ok(())
})?;
if let Some(data) = stream.next_timeout2(1000).await {
match data? {
Some(crate::ipc::Data::RawMessage(bytes)) => {
let data = unsafe {
std::slice::from_raw_parts::<f32>(bytes.as_ptr() as _, bytes.len() / 4)
};
send_f32(data, &mut encoder, &sp);
}
_ => {}
}
}
}
@@ -119,9 +118,6 @@ mod cpal_impl {
encoder: &mut Encoder,
sp: &GenericService,
) {
if data.iter().filter(|x| **x != 0.).next().is_none() {
return;
}
let buffer;
let data = if sample_rate0 != sample_rate {
buffer = crate::common::resample_channels(data, sample_rate0, sample_rate, channels);
@@ -195,10 +191,10 @@ mod cpal_impl {
let (device, config) = get_device()?;
let sp = sp.clone();
let err_fn = move |err| {
log::error!("an error occurred on stream: {}", err);
// too many UnknownErrno, will improve later
log::trace!("an error occurred on stream: {}", err);
};
// Sample rate must be one of 8000, 12000, 16000, 24000, or 48000.
// Note: somehow 48000 not work
let sample_rate_0 = config.sample_rate().0;
let sample_rate = if sample_rate_0 < 12000 {
8000
@@ -206,9 +202,15 @@ mod cpal_impl {
12000
} else if sample_rate_0 < 24000 {
16000
} else {
} else if sample_rate_0 < 48000 {
24000
} else {
48000
};
log::debug!("Audio sample rate : {}", sample_rate);
unsafe {
AUDIO_ZERO_COUNT = 0;
}
let mut encoder = Encoder::new(
sample_rate,
if config.channels() > 1 { Stereo } else { Mono },
@@ -282,19 +284,39 @@ fn create_format_msg(sample_rate: u32, channels: u16) -> Message {
msg
}
// use AUDIO_ZERO_COUNT for the Noise(Zero) Gate Attack Time
// every audio data length is set to 480
// MAX_AUDIO_ZERO_COUNT=800 is similar as Gate Attack Time 3~5s(Linux) || 6~8s(Windows)
const MAX_AUDIO_ZERO_COUNT: u16 = 800;
static mut AUDIO_ZERO_COUNT: u16 = 0;
fn send_f32(data: &[f32], encoder: &mut Encoder, sp: &GenericService) {
if data.iter().filter(|x| **x != 0.).next().is_some() {
match encoder.encode_vec_float(data, data.len() * 6) {
Ok(data) => {
let mut msg_out = Message::new();
msg_out.set_audio_frame(AudioFrame {
data,
..Default::default()
});
sp.send(msg_out);
}
Err(_) => {}
unsafe {
AUDIO_ZERO_COUNT = 0;
}
} else {
unsafe {
if AUDIO_ZERO_COUNT > MAX_AUDIO_ZERO_COUNT {
if AUDIO_ZERO_COUNT == MAX_AUDIO_ZERO_COUNT + 1 {
log::debug!("Audio Zero Gate Attack");
AUDIO_ZERO_COUNT += 1;
}
return;
}
AUDIO_ZERO_COUNT += 1;
}
}
match encoder.encode_vec_float(data, data.len() * 6) {
Ok(data) => {
let mut msg_out = Message::new();
msg_out.set_audio_frame(AudioFrame {
data,
..Default::default()
});
sp.send(msg_out);
}
Err(_) => {}
}
}

View File

@@ -97,8 +97,12 @@ mod listen {
fn trigger(ctx: &mut ClipboardContext) {
let _ = match ctx.get_text() {
Ok(text) => ctx.set_text(text),
Err(_) => ctx.set_text(Default::default()),
Ok(text) => {
if !text.is_empty() {
ctx.set_text(text).ok();
}
}
Err(_) => {}
};
}
}

View File

@@ -62,15 +62,18 @@ impl Subscriber for ConnInner {
#[inline]
fn send(&mut self, msg: Arc<Message>) {
self.tx.as_mut().map(|tx| {
allow_err!(tx.send((Instant::now(), msg)));
});
}
fn send_video_frame(&mut self, tm: std::time::Instant, msg: Arc<Message>) {
self.tx_video.as_mut().map(|tx| {
allow_err!(tx.send((tm.into(), msg)));
});
match &msg.union {
Some(message::Union::video_frame(_)) => {
self.tx_video.as_mut().map(|tx| {
allow_err!(tx.send((Instant::now(), msg)));
});
}
_ => {
self.tx.as_mut().map(|tx| {
allow_err!(tx.send((Instant::now(), msg)));
});
}
}
}
}
@@ -78,6 +81,8 @@ const TEST_DELAY_TIMEOUT: Duration = Duration::from_secs(3);
const SEC30: Duration = Duration::from_secs(30);
const H1: Duration = Duration::from_secs(3600);
const MILLI1: Duration = Duration::from_millis(1);
const SEND_TIMEOUT_VIDEO: u64 = 12_000;
const SEND_TIMEOUT_OTHER: u64 = SEND_TIMEOUT_VIDEO * 10;
impl Connection {
pub async fn start(
@@ -144,9 +149,17 @@ impl Connection {
time::interval_at(Instant::now() + TEST_DELAY_TIMEOUT, TEST_DELAY_TIMEOUT);
let mut last_recv_time = Instant::now();
conn.stream.set_send_timeout(
if conn.file_transfer.is_some() || conn.port_forward_socket.is_some() {
SEND_TIMEOUT_OTHER
} else {
SEND_TIMEOUT_VIDEO
},
);
loop {
tokio::select! {
biased;
biased; // video has higher priority
Some(data) = rx_from_cm.recv() => {
match data {
@@ -283,73 +296,58 @@ impl Connection {
}
}
}
video_service::notify_video_frame_feched(id, None);
video_service::notify_video_frame_feched(id, None);
super::video_service::update_test_latency(id, 0);
super::video_service::update_image_quality(id, None);
if let Some(forward) = conn.port_forward_socket.as_mut() {
if let Err(err) = conn.try_port_forward_loop(&mut rx_from_cm).await {
conn.on_close(&err.to_string(), false);
}
}
async fn try_port_forward_loop(
&mut self,
rx_from_cm: &mut mpsc::UnboundedReceiver<Data>,
) -> ResultType<()> {
let mut last_recv_time = Instant::now();
if let Some(forward) = self.port_forward_socket.as_mut() {
log::info!("Running port forwarding loop");
conn.stream.set_raw();
self.stream.set_raw();
loop {
tokio::select! {
Some(data) = rx_from_cm.recv() => {
match data {
ipc::Data::Close => {
conn.on_close("Close requested from connection manager", false);
break;
bail!("Close requested from selfection manager");
}
_ => {}
}
}
res = forward.next() => {
if let Some(res) = res {
match res {
Err(err) => {
conn.on_close(&err.to_string(), false);
break;
},
Ok(bytes) => {
last_recv_time = Instant::now();
if let Err(err) = conn.stream.send_bytes(bytes.into()).await {
conn.on_close(&err.to_string(), false);
break;
}
}
}
last_recv_time = Instant::now();
self.stream.send_bytes(res?.into()).await?;
} else {
conn.on_close("Forward reset by the peer", false);
break;
bail!("Forward reset by the peer");
}
},
res = conn.stream.next() => {
res = self.stream.next() => {
if let Some(res) = res {
match res {
Err(err) => {
conn.on_close(&err.to_string(), false);
break;
},
Ok(bytes) => {
last_recv_time = Instant::now();
if let Err(err) = forward.send(bytes.into()).await {
conn.on_close(&err.to_string(), false);
break;
}
}
}
last_recv_time = Instant::now();
timeout(SEND_TIMEOUT_OTHER, forward.send(res?.into())).await??;
} else {
conn.on_close("Stream reset by the peer", false);
break;
bail!("Stream reset by the peer");
}
},
_ = conn.timer.tick() => {
_ = self.timer.tick() => {
if last_recv_time.elapsed() >= H1 {
conn.on_close("Timeout", false);
break;
bail!("Timeout");
}
}
}
}
}
Ok(())
}
async fn send_permission(&mut self, permission: Permission, enabled: bool) {
@@ -591,7 +589,7 @@ impl Connection {
}
_ => {}
}
if lr.username != Config::get_id() {
if !crate::is_ip(&lr.username) && lr.username != Config::get_id() {
self.send_login_error("Offline").await;
} else if lr.password.is_empty() {
self.try_start_cm(lr.my_id, lr.my_name, false).await;
@@ -906,12 +904,10 @@ async fn start_ipc(
mut rx_to_cm: mpsc::UnboundedReceiver<ipc::Data>,
tx_from_cm: mpsc::UnboundedSender<ipc::Data>,
) -> ResultType<()> {
loop {
if !crate::platform::is_prelogin() {
break;
}
sleep(1.).await;
if crate::platform::is_prelogin() {
return Ok(());
}
let mut stream = None;
if let Ok(s) = crate::ipc::connect(1000, "_cm").await {
stream = Some(s);

View File

@@ -79,8 +79,6 @@ impl Subscriber for MouseCursorSub {
self.inner.send(msg);
}
}
fn send_video_frame(&mut self, _: std::time::Instant, _: Arc<Message>) {}
}
pub const NAME_CURSOR: &'static str = "mouse_cursor";
@@ -197,15 +195,20 @@ fn modifier_sleep() {
std::thread::sleep(std::time::Duration::from_nanos(1));
}
#[cfg(not(target_os = "macos"))]
#[inline]
fn get_modifier_state(key: Key, en: &mut Enigo) -> bool {
// on Linux, if RightAlt is down, RightAlt status is false, Alt status is true
// but on Windows, both are true
let x = en.get_key_state(key.clone());
match key {
Key::Shift => x || en.get_key_state(Key::RightShift),
Key::Control => x || en.get_key_state(Key::RightControl),
Key::Alt => x || en.get_key_state(Key::RightAlt),
Key::Meta => x || en.get_key_state(Key::RWin),
Key::RightShift => x || en.get_key_state(Key::Shift),
Key::RightControl => x || en.get_key_state(Key::Control),
Key::RightAlt => x || en.get_key_state(Key::Alt),
Key::RWin => x || en.get_key_state(Key::Meta),
_ => x,
}
}
@@ -264,7 +267,7 @@ fn fix_key_down_timeout(force: bool) {
if let Some(key) = key {
let func = move || {
let mut en = ENIGO.lock().unwrap();
if en.get_key_state(key) {
if get_modifier_state(key, &mut en) {
en.key_up(key);
log::debug!("Fixed {:?} timeout", key);
}
@@ -286,7 +289,7 @@ fn fix_modifier(
key1: Key,
en: &mut Enigo,
) {
if en.get_key_state(key1) && !modifiers.contains(&ProtobufEnumOrUnknown::new(key0)) {
if get_modifier_state(key1, en) && !modifiers.contains(&ProtobufEnumOrUnknown::new(key0)) {
en.key_up(key1);
log::debug!("Fixed {:?}", key1);
}
@@ -577,11 +580,9 @@ fn handle_key_(evt: &KeyEvent) {
}
}
#[cfg(not(target_os = "macos"))]
if crate::common::valid_for_capslock(evt) {
if has_cap != en.get_key_state(Key::CapsLock) {
en.key_down(Key::CapsLock).ok();
en.key_up(Key::CapsLock);
}
if has_cap != en.get_key_state(Key::CapsLock) {
en.key_down(Key::CapsLock).ok();
en.key_up(Key::CapsLock);
}
#[cfg(windows)]
if crate::common::valid_for_numlock(evt) {

View File

@@ -16,7 +16,6 @@ pub trait Service: Send + Sync {
pub trait Subscriber: Default + Send + Sync + 'static {
fn id(&self) -> i32;
fn send(&mut self, msg: Arc<Message>);
fn send_video_frame(&mut self, tm: time::Instant, msg: Arc<Message>);
}
#[derive(Default)]
@@ -145,15 +144,15 @@ impl<T: Subscriber + From<ConnInner>> ServiceTmpl<T> {
}
}
pub fn send_video_frame(&self, tm: time::Instant, msg: Message) -> HashSet<i32> {
self.send_video_frame_shared(tm, Arc::new(msg))
pub fn send_video_frame(&self, msg: Message) -> HashSet<i32> {
self.send_video_frame_shared(Arc::new(msg))
}
pub fn send_video_frame_shared(&self, tm: time::Instant, msg: Arc<Message>) -> HashSet<i32> {
pub fn send_video_frame_shared(&self, msg: Arc<Message>) -> HashSet<i32> {
let mut conn_ids = HashSet::new();
let mut lock = self.0.write().unwrap();
for s in lock.subscribes.values_mut() {
s.send_video_frame(tm, msg.clone());
s.send(msg.clone());
conn_ids.insert(s.id());
}
conn_ids

View File

@@ -108,7 +108,7 @@ impl VideoFrameController {
fetched_conn_ids.insert(id);
// break if all connections have received current frame
if fetched_conn_ids.is_superset(&send_conn_ids) {
if fetched_conn_ids.len() >= send_conn_ids.len() {
break;
}
}
@@ -188,11 +188,7 @@ fn run(sp: GenericService) -> ResultType<()> {
speed,
};
let mut vpx;
let mut n = ((width * height) as f64 / (1920 * 1080) as f64).round() as u32;
if n < 1 {
n = 1;
}
match Encoder::new(&cfg, n) {
match Encoder::new(&cfg, 0) {
Ok(x) => vpx = x,
Err(err) => bail!("Failed to create encoder: {}", err),
}
@@ -220,7 +216,7 @@ fn run(sp: GenericService) -> ResultType<()> {
let start = time::Instant::now();
let mut last_check_displays = time::Instant::now();
#[cfg(windows)]
let mut try_gdi = true;
let mut try_gdi = 1;
#[cfg(windows)]
log::info!("gdi: {}", c.is_gdi());
while sp.ok() {
@@ -257,11 +253,11 @@ fn run(sp: GenericService) -> ResultType<()> {
Ok(frame) => {
let time = now - start;
let ms = (time.as_secs() * 1000 + time.subsec_millis() as u64) as i64;
let send_conn_ids = handle_one_frame(&sp, now, &frame, ms, &mut crc, &mut vpx)?;
let send_conn_ids = handle_one_frame(&sp, &frame, ms, &mut crc, &mut vpx)?;
frame_controller.set_send(now, send_conn_ids);
#[cfg(windows)]
{
try_gdi = false;
try_gdi = 0;
}
}
Err(ref e) if e.kind() == WouldBlock => {
@@ -271,10 +267,13 @@ fn run(sp: GenericService) -> ResultType<()> {
wait = 0
}
#[cfg(windows)]
if try_gdi && !c.is_gdi() {
c.set_gdi();
try_gdi = false;
log::info!("No image, fall back to gdi");
if try_gdi > 0 && !c.is_gdi() {
if try_gdi > 3 {
c.set_gdi();
try_gdi = 0;
log::info!("No image, fall back to gdi");
}
try_gdi += 1;
}
continue;
}
@@ -327,7 +326,6 @@ fn create_frame(frame: &EncodeFrame) -> VP9 {
#[inline]
fn handle_one_frame(
sp: &GenericService,
now: Instant,
frame: &[u8],
ms: i64,
crc: &mut (u32, u32),
@@ -365,7 +363,7 @@ fn handle_one_frame(
// to-do: flush periodically, e.g. 1 second
if frames.len() > 0 {
send_conn_ids = sp.send_video_frame(now, create_msg(frames));
send_conn_ids = sp.send_video_frame(create_msg(frames));
}
}
Ok(send_conn_ids)