source code

This commit is contained in:
rustdesk
2021-03-29 15:59:14 +08:00
parent 002fce136c
commit d1013487e2
175 changed files with 35074 additions and 2 deletions

350
src/server/audio_service.rs Normal file
View File

@@ -0,0 +1,350 @@
// both soundio and cpal use wasapi on windows and coreaudio on mac, they do not support loopback.
// libpulseaudio support loopback because pulseaudio is a standalone audio service with some
// configuration, but need to install the library and start the service on OS, not a good choice.
// windows: https://docs.microsoft.com/en-us/windows/win32/coreaudio/loopback-recording
// mac: https://github.com/mattingalls/Soundflower
// https://docs.microsoft.com/en-us/windows/win32/api/audioclient/nn-audioclient-iaudioclient
// https://github.com/ExistentialAudio/BlackHole
// if pactl not work, please run
// sudo apt-get --purge --reinstall install pulseaudio
// https://askubuntu.com/questions/403416/how-to-listen-live-sounds-from-input-from-external-sound-card
// https://wiki.debian.org/audio-loopback
// https://github.com/krruzic/pulsectl
use super::*;
use magnum_opus::{Application::*, Channels::*, Encoder};
pub const NAME: &'static str = "audio";
#[cfg(not(target_os = "linux"))]
pub fn new() -> GenericService {
let sp = GenericService::new(NAME, true);
sp.repeat::<cpal_impl::State, _>(33, cpal_impl::run);
sp
}
#[cfg(target_os = "linux")]
pub fn new() -> GenericService {
let sp = GenericService::new(NAME, true);
sp.run(pa_impl::run);
sp
}
#[cfg(target_os = "linux")]
mod pa_impl {
use super::*;
#[tokio::main(basic_scheduler)]
pub async fn run(sp: GenericService) -> ResultType<()> {
if let Ok(mut stream) = crate::ipc::connect(1000, "_pa").await {
let mut encoder =
Encoder::new(crate::platform::linux::PA_SAMPLE_RATE, Stereo, LowDelay)?;
allow_err!(
stream
.send(&crate::ipc::Data::Config((
"audio-input".to_owned(),
Some(Config::get_option("audio-input"))
)))
.await
);
while sp.ok() {
sp.snapshot(|sps| {
sps.send(create_format_msg(crate::platform::linux::PA_SAMPLE_RATE, 2));
Ok(())
})?;
if let Some(data) = stream.next_timeout2(1000).await {
match data? {
Some(crate::ipc::Data::RawMessage(bytes)) => {
let data = unsafe {
std::slice::from_raw_parts::<f32>(
bytes.as_ptr() as _,
bytes.len() / 4,
)
};
send_f32(data, &mut encoder, &sp);
}
_ => {}
}
}
}
}
Ok(())
}
}
#[cfg(not(target_os = "linux"))]
mod cpal_impl {
use super::*;
use cpal::{
traits::{DeviceTrait, HostTrait, StreamTrait},
Device, Host, SupportedStreamConfig,
};
lazy_static::lazy_static! {
static ref HOST: Host = cpal::default_host();
}
#[derive(Default)]
pub struct State {
stream: Option<(Box<dyn StreamTrait>, Arc<Message>)>,
}
impl super::service::Reset for State {
fn reset(&mut self) {
self.stream.take();
}
}
pub fn run(sp: GenericService, state: &mut State) -> ResultType<()> {
sp.snapshot(|sps| {
match &state.stream {
None => {
state.stream = Some(play(&sp)?);
}
_ => {}
}
if let Some((_, format)) = &state.stream {
sps.send_shared(format.clone());
}
Ok(())
})?;
Ok(())
}
fn send(
data: &[f32],
sample_rate0: u32,
sample_rate: u32,
channels: u16,
encoder: &mut Encoder,
sp: &GenericService,
) {
if data.iter().filter(|x| **x != 0.).next().is_none() {
return;
}
let buffer;
let data = if sample_rate0 != sample_rate {
buffer = crate::common::resample_channels(data, sample_rate0, sample_rate, channels);
&buffer
} else {
data
};
send_f32(data, encoder, sp);
}
#[cfg(windows)]
fn get_device() -> ResultType<(Device, SupportedStreamConfig)> {
let audio_input = Config::get_option("audio-input");
if !audio_input.is_empty() {
return get_audio_input(&audio_input);
}
let device = HOST
.default_output_device()
.with_context(|| "Failed to get default output device for loopback")?;
log::info!(
"Default output device: {}",
device.name().unwrap_or("".to_owned())
);
let format = device
.default_output_config()
.map_err(|e| anyhow!(e))
.with_context(|| "Failed to get default output format")?;
log::info!("Default output format: {:?}", format);
Ok((device, format))
}
#[cfg(not(windows))]
fn get_device() -> ResultType<(Device, SupportedStreamConfig)> {
let audio_input = Config::get_option("audio-input");
get_audio_input(&audio_input)
}
fn get_audio_input(audio_input: &str) -> ResultType<(Device, SupportedStreamConfig)> {
if audio_input == "Mute" {
bail!("Mute");
}
let mut device = None;
if !audio_input.is_empty() {
for d in HOST
.devices()
.with_context(|| "Failed to get audio devices")?
{
if d.name().unwrap_or("".to_owned()) == audio_input {
device = Some(d);
break;
}
}
}
if device.is_none() {
device = Some(
HOST.default_input_device()
.with_context(|| "Failed to get default input device for loopback")?,
);
}
let device = device.unwrap();
log::info!("Input device: {}", device.name().unwrap_or("".to_owned()));
let format = device
.default_input_config()
.map_err(|e| anyhow!(e))
.with_context(|| "Failed to get default input format")?;
log::info!("Default input format: {:?}", format);
Ok((device, format))
}
fn play(sp: &GenericService) -> ResultType<(Box<dyn StreamTrait>, Arc<Message>)> {
let (device, config) = get_device()?;
let sp = sp.clone();
let err_fn = move |err| {
log::error!("an error occurred on stream: {}", err);
};
// Sample rate must be one of 8000, 12000, 16000, 24000, or 48000.
let sample_rate_0 = config.sample_rate().0;
let sample_rate = if sample_rate_0 < 12000 {
8000
} else if sample_rate_0 < 16000 {
12000
} else if sample_rate_0 < 24000 {
16000
} else if sample_rate_0 < 48000 {
24000
} else {
48000
};
let mut encoder = Encoder::new(
sample_rate,
if config.channels() > 1 { Stereo } else { Mono },
LowDelay,
)?;
let channels = config.channels();
let stream = match config.sample_format() {
cpal::SampleFormat::F32 => device.build_input_stream(
&config.into(),
move |data, _: &_| {
send(
data,
sample_rate_0,
sample_rate,
channels,
&mut encoder,
&sp,
);
},
err_fn,
)?,
cpal::SampleFormat::I16 => device.build_input_stream(
&config.into(),
move |data: &[i16], _: &_| {
let buffer: Vec<_> = data.iter().map(|s| cpal::Sample::to_f32(s)).collect();
send(
&buffer,
sample_rate_0,
sample_rate,
channels,
&mut encoder,
&sp,
);
},
err_fn,
)?,
cpal::SampleFormat::U16 => device.build_input_stream(
&config.into(),
move |data: &[u16], _: &_| {
let buffer: Vec<_> = data.iter().map(|s| cpal::Sample::to_f32(s)).collect();
send(
&buffer,
sample_rate_0,
sample_rate,
channels,
&mut encoder,
&sp,
);
},
err_fn,
)?,
};
stream.play()?;
Ok((
Box::new(stream),
Arc::new(create_format_msg(sample_rate, channels)),
))
}
}
fn create_format_msg(sample_rate: u32, channels: u16) -> Message {
let format = AudioFormat {
sample_rate,
channels: channels as _,
..Default::default()
};
let mut misc = Misc::new();
misc.set_audio_format(format);
let mut msg = Message::new();
msg.set_misc(misc);
msg
}
fn send_f32(data: &[f32], encoder: &mut Encoder, sp: &GenericService) {
if data.iter().filter(|x| **x != 0.).next().is_some() {
match encoder.encode_vec_float(data, data.len() * 6) {
Ok(data) => {
let mut msg_out = Message::new();
msg_out.set_audio_frame(AudioFrame {
data,
..Default::default()
});
sp.send(msg_out);
}
Err(_) => {}
}
}
}
#[cfg(test)]
mod tests {
#[cfg(target_os = "linux")]
#[test]
fn test_pulse() {
use libpulse_binding as pulse;
use libpulse_simple_binding as psimple;
let spec = pulse::sample::Spec {
format: pulse::sample::SAMPLE_FLOAT32NE,
channels: 2,
rate: 24000,
};
let hspec = hound::WavSpec {
channels: spec.channels as _,
sample_rate: spec.rate as _,
bits_per_sample: (4 * 8) as _,
sample_format: hound::SampleFormat::Float,
};
const PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/recorded.wav");
let mut writer =
hound::WavWriter::create(PATH, hspec).expect("Could not create hsound writer");
let device = crate::platform::linux::get_pa_monitor();
let s = psimple::Simple::new(
None, // Use the default server
"Test", // Our applications name
pulse::stream::Direction::Record, // We want a record stream
Some(&device), // Use the default device
"Test", // Description of our stream
&spec, // Our sample format
None, // Use default channel map
None, // Use default buffering attributes
)
.expect("Could not create simple pulse");
let mut out: Vec<u8> = Vec::with_capacity(1024);
unsafe {
out.set_len(out.capacity());
}
for _ in 0..600 {
s.read(&mut out).expect("Could not read pcm");
let out2 =
unsafe { std::slice::from_raw_parts::<f32>(out.as_ptr() as _, out.len() / 4) };
for v in out2 {
writer.write_sample(*v).ok();
}
}
println!("{:?} {}", device, out.len());
writer.finalize().expect("Could not finalize writer");
}
}

View File

@@ -0,0 +1,53 @@
use super::*;
pub use crate::common::{
check_clipboard, ClipboardContext, CLIPBOARD_INTERVAL as INTERVAL, CLIPBOARD_NAME as NAME,
CONTENT,
};
struct State {
ctx: Option<ClipboardContext>,
initialized: bool,
}
impl Default for State {
fn default() -> Self {
let ctx = match ClipboardContext::new() {
Ok(ctx) => Some(ctx),
Err(err) => {
log::error!("Failed to start {}: {}", NAME, err);
None
}
};
Self {
ctx,
initialized: false,
}
}
}
impl super::service::Reset for State {
fn reset(&mut self) {
*CONTENT.lock().unwrap() = Default::default();
self.initialized = false;
}
}
pub fn new() -> GenericService {
let sp = GenericService::new(NAME, false);
sp.repeat::<State, _>(INTERVAL, run);
sp
}
fn run(sp: GenericService, state: &mut State) -> ResultType<()> {
if let Some(ctx) = state.ctx.as_mut() {
if let Some(msg) = check_clipboard(ctx, None) {
if !state.initialized {
state.initialized = true;
// ignore clipboard update before service start
return Ok(());
}
sp.send(msg);
}
}
Ok(())
}

979
src/server/connection.rs Normal file
View File

@@ -0,0 +1,979 @@
use super::input_service::*;
use super::*;
use crate::common::update_clipboard;
use crate::ipc;
use hbb_common::{
config::Config,
fs,
futures::SinkExt,
sleep, timeout,
tokio::{
net::TcpStream,
stream::StreamExt,
sync::mpsc,
time::{self, Duration, Instant, Interval},
},
tokio_util::codec::{BytesCodec, Framed},
};
use sha2::{Digest, Sha256};
pub type Sender = mpsc::UnboundedSender<(Instant, Arc<Message>)>;
lazy_static::lazy_static! {
static ref CLICK_TIME: Arc::<Mutex<i64>> = Default::default();
static ref LOGIN_FAILURES: Arc::<Mutex<HashMap<String, (i32, i32, i32)>>> = Default::default();
}
#[derive(Clone, Default)]
pub struct ConnInner {
id: i32,
tx: Option<Sender>,
}
pub struct Connection {
inner: ConnInner,
stream: super::Stream,
server: super::ServerPtrWeak,
hash: Hash,
read_jobs: Vec<fs::TransferJob>,
timer: Interval,
file_transfer: Option<(String, bool)>,
port_forward_socket: Option<Framed<TcpStream, BytesCodec>>,
port_forward_address: String,
tx_to_cm: mpsc::UnboundedSender<ipc::Data>,
authorized: bool,
keyboard: bool,
clipboard: bool,
audio: bool,
last_test_delay: i64,
image_quality: i32,
lock_after_session_end: bool,
show_remote_cursor: bool, // by peer
privacy_mode: bool,
ip: String,
disable_clipboard: bool, // by peer
disable_audio: bool, // by peer
}
impl Subscriber for ConnInner {
#[inline]
fn id(&self) -> i32 {
self.id
}
#[inline]
fn send(&mut self, msg: Arc<Message>) {
self.tx.as_mut().map(|tx| {
allow_err!(tx.send((Instant::now(), msg)));
});
}
}
const TEST_DELAY_TIMEOUT: Duration = Duration::from_secs(3);
const SEC30: Duration = Duration::from_secs(30);
const H1: Duration = Duration::from_secs(3600);
const MILLI1: Duration = Duration::from_millis(1);
impl Connection {
pub async fn start(
addr: SocketAddr,
stream: super::Stream,
id: i32,
server: super::ServerPtrWeak,
) {
let hash = Hash {
salt: Config::get_salt(),
challenge: Config::get_auto_password(),
..Default::default()
};
let (tx_from_cm, mut rx_from_cm) = mpsc::unbounded_channel::<ipc::Data>();
let (tx_to_cm, rx_to_cm) = mpsc::unbounded_channel::<ipc::Data>();
let (tx, mut rx) = mpsc::unbounded_channel::<(Instant, Arc<Message>)>();
let mut conn = Self {
inner: ConnInner { id, tx: Some(tx) },
stream,
server,
hash,
read_jobs: Vec::new(),
timer: time::interval(SEC30),
file_transfer: None,
port_forward_socket: None,
port_forward_address: "".to_owned(),
tx_to_cm,
authorized: false,
keyboard: Config::get_option("enable-keyboard").is_empty(),
clipboard: Config::get_option("enable-clipboard").is_empty(),
audio: Config::get_option("audio-input") != "Mute",
last_test_delay: 0,
image_quality: ImageQuality::Balanced.value(),
lock_after_session_end: false,
show_remote_cursor: false,
privacy_mode: false,
ip: "".to_owned(),
disable_audio: false,
disable_clipboard: false,
};
tokio::spawn(async move {
if let Err(err) = start_ipc(rx_to_cm, tx_from_cm).await {
log::error!("ipc to connection manager exit: {}", err);
}
});
if !conn.on_open(addr).await {
return;
}
if !conn.keyboard {
conn.send_permisssion(Permission::Keyboard, false).await;
}
if !conn.clipboard {
conn.send_permisssion(Permission::Clipboard, false).await;
}
if !conn.audio {
conn.send_permisssion(Permission::Audio, false).await;
}
let mut test_delay_timer =
time::interval_at(Instant::now() + TEST_DELAY_TIMEOUT, TEST_DELAY_TIMEOUT);
let mut last_recv_time = Instant::now();
loop {
tokio::select! {
Some(data) = rx_from_cm.recv() => {
match data {
ipc::Data::Authorize => {
conn.send_logon_response().await;
if conn.port_forward_socket.is_some() {
break;
}
}
ipc::Data::Close => {
let mut misc = Misc::new();
misc.set_close_reason("Closed manually by the peer".into());
let mut msg_out = Message::new();
msg_out.set_misc(misc);
conn.send(msg_out).await;
conn.on_close("Close requested from connection manager", false);
break;
}
ipc::Data::ChatMessage{text} => {
let mut misc = Misc::new();
misc.set_chat_message(ChatMessage {
text,
..Default::default()
});
let mut msg_out = Message::new();
msg_out.set_misc(misc);
conn.send(msg_out).await;
}
ipc::Data::SwitchPermission{name, enabled} => {
log::info!("Change permission {} -> {}", name, enabled);
if &name == "keyboard" {
conn.keyboard = enabled;
conn.send_permisssion(Permission::Keyboard, enabled).await;
if let Some(s) = conn.server.upgrade() {
s.write().unwrap().subscribe(
NAME_CURSOR,
conn.inner.clone(), enabled || conn.show_remote_cursor);
}
} else if &name == "clipboard" {
conn.clipboard = enabled;
conn.send_permisssion(Permission::Clipboard, enabled).await;
if let Some(s) = conn.server.upgrade() {
s.write().unwrap().subscribe(
super::clipboard_service::NAME,
conn.inner.clone(), conn.clipboard_enabled() && conn.keyboard);
}
} else if &name == "audio" {
conn.audio = enabled;
conn.send_permisssion(Permission::Audio, enabled).await;
if let Some(s) = conn.server.upgrade() {
s.write().unwrap().subscribe(
super::audio_service::NAME,
conn.inner.clone(), conn.audio_enabled());
}
}
}
ipc::Data::RawMessage(bytes) => {
allow_err!(conn.stream.send_raw(bytes).await);
}
_ => {}
}
}
Some((instant, value)) = rx.recv() => {
let latency = instant.elapsed().as_millis() as i64;
super::video_service::update_internal_latency(id, latency);
let msg: &Message = &value;
if latency > 1000 {
match &msg.union {
Some(message::Union::video_frame(_)) => {
continue;
}
Some(message::Union::audio_frame(_)) => {
continue;
}
_ => {}
}
}
if let Err(err) = conn.stream.send(msg).await {
conn.on_close(&err.to_string(), false);
break;
}
},
res = conn.stream.next() => {
if let Some(res) = res {
match res {
Err(err) => {
conn.on_close(&err.to_string(), true);
break;
},
Ok(bytes) => {
last_recv_time = Instant::now();
if let Ok(msg_in) = Message::parse_from_bytes(&bytes) {
if !conn.on_message(msg_in).await {
break;
}
}
}
}
} else {
conn.on_close("Reset by the peer", true);
break;
}
},
_ = conn.timer.tick() => {
if !conn.read_jobs.is_empty() {
if let Err(err) = fs::handle_read_jobs(&mut conn.read_jobs, &mut conn.stream).await {
conn.on_close(&err.to_string(), false);
break;
}
} else {
conn.timer = time::interval_at(Instant::now() + SEC30, SEC30);
}
}
_ = test_delay_timer.tick() => {
if last_recv_time.elapsed() >= SEC30 {
conn.on_close("Timeout", true);
break;
}
let time = crate::get_time();
if time > 0 && conn.last_test_delay == 0 {
conn.last_test_delay = time;
let mut msg_out = Message::new();
msg_out.set_test_delay(TestDelay{
time,
..Default::default()
});
conn.inner.send(msg_out.into());
}
}
}
}
super::video_service::update_internal_latency(id, 0);
super::video_service::update_test_latency(id, 0);
super::video_service::update_image_quality(id, None);
if let Some(forward) = conn.port_forward_socket.as_mut() {
log::info!("Running port forwarding loop");
conn.stream.set_raw();
loop {
tokio::select! {
Some(data) = rx_from_cm.recv() => {
match data {
ipc::Data::Close => {
conn.on_close("Close requested from connection manager", false);
break;
}
_ => {}
}
}
res = forward.next() => {
if let Some(res) = res {
match res {
Err(err) => {
conn.on_close(&err.to_string(), false);
break;
},
Ok(bytes) => {
last_recv_time = Instant::now();
if let Err(err) = conn.stream.send_bytes(bytes.into()).await {
conn.on_close(&err.to_string(), false);
break;
}
}
}
} else {
conn.on_close("Forward reset by the peer", false);
break;
}
},
res = conn.stream.next() => {
if let Some(res) = res {
match res {
Err(err) => {
conn.on_close(&err.to_string(), false);
break;
},
Ok(bytes) => {
last_recv_time = Instant::now();
if let Err(err) = forward.send(bytes.into()).await {
conn.on_close(&err.to_string(), false);
break;
}
}
}
} else {
conn.on_close("Stream reset by the peer", false);
break;
}
},
_ = conn.timer.tick() => {
if last_recv_time.elapsed() >= H1 {
conn.on_close("Timeout", false);
break;
}
}
}
}
}
}
async fn send_permisssion(&mut self, permission: Permission, enabled: bool) {
let mut misc = Misc::new();
misc.set_permission_info(PermissionInfo {
permission: permission.into(),
enabled,
..Default::default()
});
let mut msg_out = Message::new();
msg_out.set_misc(misc);
self.send(msg_out).await;
}
async fn on_open(&mut self, addr: SocketAddr) -> bool {
log::debug!("#{} Connection opened from {}.", self.inner.id, addr);
let whitelist: Vec<String> = Config::get_option("whitelist")
.split(",")
.filter(|x| !x.is_empty())
.map(|x| x.to_owned())
.collect();
if !whitelist.is_empty()
&& whitelist
.iter()
.filter(|x| x == &"0.0.0.0")
.next()
.is_none()
&& whitelist
.iter()
.filter(|x| x.parse() == Ok(addr.ip()))
.next()
.is_none()
{
self.send_login_error("Your ip is blocked by the peer")
.await;
sleep(1.).await;
return false;
}
self.ip = addr.ip().to_string();
let mut msg_out = Message::new();
msg_out.set_hash(self.hash.clone());
self.send(msg_out).await;
true
}
async fn send_logon_response(&mut self) {
if self.authorized {
return;
}
#[allow(unused_mut)]
let mut username = crate::platform::get_active_username();
let mut res = LoginResponse::new();
if self.port_forward_socket.is_some() {
let mut msg_out = Message::new();
res.set_peer_info(PeerInfo {
hostname: whoami::hostname(),
username,
platform: whoami::platform().to_string(),
version: crate::VERSION.to_owned(),
..Default::default()
});
msg_out.set_login_response(res);
self.send(msg_out).await;
return;
}
#[cfg(target_os = "linux")]
if !self.file_transfer.is_some() {
let dtype = crate::platform::linux::get_display_server();
if dtype != "x11" {
res.set_error(format!(
"Unsupported display server type {}, x11 expected",
dtype
));
let mut msg_out = Message::new();
msg_out.set_login_response(res);
self.send(msg_out).await;
return;
}
}
#[allow(unused_mut)]
let mut sas_enabled = false;
#[cfg(windows)]
if crate::platform::is_root() {
sas_enabled = true;
}
if self.file_transfer.is_some() {
if crate::platform::is_prelogin() || self.tx_to_cm.send(ipc::Data::Test).is_err() {
username = "".to_owned();
}
}
self.authorized = true;
let mut pi = PeerInfo {
hostname: whoami::hostname(),
username,
platform: whoami::platform().to_string(),
version: crate::VERSION.to_owned(),
sas_enabled,
..Default::default()
};
let mut sub_service = false;
if self.file_transfer.is_some() {
res.set_peer_info(pi);
} else {
try_activate_screen();
match super::video_service::get_displays() {
Err(err) => {
res.set_error(err.to_string());
}
Ok((current, displays)) => {
pi.displays = displays.into();
pi.current_display = current as _;
res.set_peer_info(pi);
sub_service = true;
}
}
}
let mut msg_out = Message::new();
msg_out.set_login_response(res);
self.send(msg_out).await;
if let Some((dir, show_hidden)) = self.file_transfer.clone() {
let dir = if !dir.is_empty() && std::path::Path::new(&dir).is_dir() {
&dir
} else {
""
};
self.read_dir(dir, show_hidden);
} else if sub_service {
if let Some(s) = self.server.upgrade() {
let mut noperms = Vec::new();
if !self.keyboard && !self.show_remote_cursor {
noperms.push(NAME_CURSOR);
}
if !self.show_remote_cursor {
noperms.push(NAME_POS);
}
if !self.clipboard_enabled() || !self.keyboard {
noperms.push(super::clipboard_service::NAME);
}
if !self.audio_enabled() {
noperms.push(super::audio_service::NAME);
}
s.write()
.unwrap()
.add_connection(self.inner.clone(), &noperms);
}
}
}
fn clipboard_enabled(&self) -> bool {
self.clipboard && !self.disable_clipboard
}
fn audio_enabled(&self) -> bool {
self.audio && !self.disable_audio
}
async fn try_start_cm(&mut self, peer_id: String, name: String, authorized: bool) {
self.send_to_cm(ipc::Data::Login {
id: self.inner.id(),
is_file_transfer: self.file_transfer.is_some(),
port_forward: self.port_forward_address.clone(),
peer_id,
name,
authorized,
keyboard: self.keyboard,
clipboard: self.clipboard,
audio: self.audio,
});
}
#[inline]
fn send_to_cm(&mut self, data: ipc::Data) {
self.tx_to_cm.send(data).ok();
}
#[inline]
fn send_fs(&mut self, data: ipc::FS) {
self.send_to_cm(ipc::Data::FS(data));
}
async fn send_login_error<T: std::string::ToString>(&mut self, err: T) {
let mut msg_out = Message::new();
let mut res = LoginResponse::new();
res.set_error(err.to_string());
msg_out.set_login_response(res);
self.send(msg_out).await;
}
async fn on_message(&mut self, msg: Message) -> bool {
if let Some(message::Union::login_request(lr)) = msg.union {
if let Some(o) = lr.option.as_ref() {
self.update_option(o);
}
if self.authorized {
return true;
}
match lr.union {
Some(login_request::Union::file_transfer(ft)) => {
if !Config::get_option("enable-file-transfer").is_empty() {
self.send_login_error("No permission of file transfer")
.await;
sleep(1.).await;
return false;
}
self.file_transfer = Some((ft.dir, ft.show_hidden));
}
Some(login_request::Union::port_forward(mut pf)) => {
if !Config::get_option("enable-tunnel").is_empty() {
self.send_login_error("No permission of IP tunneling").await;
sleep(1.).await;
return false;
}
let mut is_rdp = false;
if pf.host == "RDP" && pf.port == 0 {
pf.host = "localhost".to_owned();
pf.port = 3389;
is_rdp = true;
}
if pf.host.is_empty() {
pf.host = "localhost".to_owned();
}
let mut addr = format!("{}:{}", pf.host, pf.port);
self.port_forward_address = addr.clone();
match timeout(3000, TcpStream::connect(&addr)).await {
Ok(Ok(sock)) => {
self.port_forward_socket = Some(Framed::new(sock, BytesCodec::new()));
}
_ => {
if is_rdp {
addr = "RDP".to_owned();
}
self.send_login_error(format!(
"Failed to access remote {}, please make sure if it is open",
addr
))
.await;
}
}
}
_ => {}
}
if lr.username != Config::get_id() {
self.send_login_error("Offline").await;
} else if lr.password.is_empty() {
self.try_start_cm(lr.my_id, lr.my_name, false).await;
} else {
let mut hasher = Sha256::new();
hasher.update(&Config::get_password());
hasher.update(&self.hash.salt);
let mut hasher2 = Sha256::new();
hasher2.update(&hasher.finalize()[..]);
hasher2.update(&self.hash.challenge);
let mut failure = LOGIN_FAILURES
.lock()
.unwrap()
.get(&self.ip)
.map(|x| x.clone())
.unwrap_or((0, 0, 0));
let time = (crate::get_time() / 60_000) as i32;
if failure.2 > 30 {
self.send_login_error("Too many wrong password attempts")
.await;
} else if time == failure.0 && failure.1 > 6 {
self.send_login_error("Please try 1 minute later").await;
} else if hasher2.finalize()[..] != lr.password[..] {
if failure.0 == time {
failure.1 += 1;
failure.2 += 1;
} else {
failure.0 = time;
failure.1 = 1;
failure.2 += 1;
}
LOGIN_FAILURES
.lock()
.unwrap()
.insert(self.ip.clone(), failure);
self.send_login_error("Wrong Password").await;
self.try_start_cm(lr.my_id, lr.my_name, false).await;
} else {
if failure.0 != 0 {
LOGIN_FAILURES.lock().unwrap().remove(&self.ip);
}
self.send_logon_response().await;
self.try_start_cm(lr.my_id, lr.my_name, true).await;
if self.port_forward_socket.is_some() {
return false;
}
}
}
} else if let Some(message::Union::test_delay(t)) = msg.union {
if t.from_client {
let mut msg_out = Message::new();
msg_out.set_test_delay(t);
self.inner.send(msg_out.into());
} else {
self.last_test_delay = 0;
let latency = crate::get_time() - t.time;
if latency > 0 {
super::video_service::update_test_latency(self.inner.id(), latency);
}
}
} else if self.authorized {
match msg.union {
Some(message::Union::mouse_event(me)) => {
if self.keyboard {
handle_mouse(&me, self.inner.id());
if is_left_up(&me) {
*CLICK_TIME.lock().unwrap() = crate::get_time();
}
}
}
Some(message::Union::key_event(mut me)) => {
if self.keyboard {
if me.press {
if let Some(key_event::Union::unicode(_)) = me.union {
handle_key(&me);
} else if let Some(key_event::Union::seq(_)) = me.union {
handle_key(&me);
} else {
me.down = true;
handle_key(&me);
me.down = false;
handle_key(&me);
}
} else {
handle_key(&me);
}
if is_enter(&me) {
*CLICK_TIME.lock().unwrap() = crate::get_time();
}
}
}
Some(message::Union::clipboard(cb)) => {
if self.clipboard {
update_clipboard(cb, None);
}
}
Some(message::Union::file_action(fa)) => {
if self.file_transfer.is_some() {
match fa.union {
Some(file_action::Union::read_dir(rd)) => {
self.read_dir(&rd.path, rd.include_hidden);
}
Some(file_action::Union::all_files(f)) => {
match fs::get_recursive_files(&f.path, f.include_hidden) {
Err(err) => {
self.send(fs::new_error(f.id, err, -1)).await;
}
Ok(files) => {
self.send(fs::new_dir(f.id, files)).await;
}
}
}
Some(file_action::Union::send(s)) => {
let id = s.id;
match fs::TransferJob::new_read(id, s.path, s.include_hidden) {
Err(err) => {
self.send(fs::new_error(id, err, 0)).await;
}
Ok(job) => {
self.send(fs::new_dir(id, job.files().to_vec())).await;
self.read_jobs.push(job);
self.timer = time::interval(MILLI1);
}
}
}
Some(file_action::Union::receive(r)) => {
self.send_fs(ipc::FS::NewWrite {
path: r.path,
id: r.id,
files: r
.files
.to_vec()
.drain(..)
.map(|f| (f.name, f.modified_time))
.collect(),
});
}
Some(file_action::Union::remove_dir(d)) => {
self.send_fs(ipc::FS::RemoveDir {
path: d.path,
id: d.id,
recursive: d.recursive,
});
}
Some(file_action::Union::remove_file(f)) => {
self.send_fs(ipc::FS::RemoveFile {
path: f.path,
id: f.id,
file_num: f.file_num,
});
}
Some(file_action::Union::create(c)) => {
self.send_fs(ipc::FS::CreateDir {
path: c.path,
id: c.id,
});
}
Some(file_action::Union::cancel(c)) => {
self.send_fs(ipc::FS::CancelWrite { id: c.id });
fs::remove_job(c.id, &mut self.read_jobs);
}
_ => {}
}
}
}
Some(message::Union::file_response(fr)) => match fr.union {
Some(file_response::Union::block(block)) => {
self.send_fs(ipc::FS::WriteBlock {
id: block.id,
file_num: block.file_num,
data: block.data,
compressed: block.compressed,
});
}
Some(file_response::Union::done(d)) => {
self.send_fs(ipc::FS::WriteDone {
id: d.id,
file_num: d.file_num,
});
}
_ => {}
},
Some(message::Union::misc(misc)) => match misc.union {
Some(misc::Union::switch_display(s)) => {
super::video_service::switch_display(s.display);
}
Some(misc::Union::chat_message(c)) => {
self.send_to_cm(ipc::Data::ChatMessage { text: c.text });
}
Some(misc::Union::option(o)) => {
self.update_option(&o);
}
Some(misc::Union::refresh_video(r)) => {
if r {
super::video_service::refresh();
}
}
_ => {}
},
_ => {}
}
}
true
}
fn update_option(&mut self, o: &OptionMessage) {
log::info!("Option update: {:?}", o);
if let Ok(q) = o.image_quality.enum_value() {
self.image_quality = q.value();
super::video_service::update_image_quality(self.inner.id(), Some(q.value()));
}
let q = o.custom_image_quality;
if q > 0 {
self.image_quality = q;
super::video_service::update_image_quality(self.inner.id(), Some(q));
}
if let Ok(q) = o.lock_after_session_end.enum_value() {
if q != BoolOption::NotSet {
self.lock_after_session_end = q == BoolOption::Yes;
}
}
if let Ok(q) = o.show_remote_cursor.enum_value() {
if q != BoolOption::NotSet {
self.show_remote_cursor = q == BoolOption::Yes;
if let Some(s) = self.server.upgrade() {
s.write().unwrap().subscribe(
NAME_CURSOR,
self.inner.clone(),
self.keyboard || self.show_remote_cursor,
);
s.write().unwrap().subscribe(
NAME_POS,
self.inner.clone(),
self.show_remote_cursor,
);
}
}
}
if let Ok(q) = o.disable_audio.enum_value() {
if q != BoolOption::NotSet {
self.disable_audio = q == BoolOption::Yes;
if let Some(s) = self.server.upgrade() {
s.write().unwrap().subscribe(
NAME_CURSOR,
self.inner.clone(),
self.audio_enabled(),
);
}
}
}
if let Ok(q) = o.disable_clipboard.enum_value() {
if q != BoolOption::NotSet {
self.disable_clipboard = q == BoolOption::Yes;
if let Some(s) = self.server.upgrade() {
s.write().unwrap().subscribe(
NAME_CURSOR,
self.inner.clone(),
self.clipboard_enabled() && self.keyboard,
);
}
}
}
if let Ok(q) = o.privacy_mode.enum_value() {
if q != BoolOption::NotSet {
self.privacy_mode = q == BoolOption::Yes;
if self.privacy_mode && self.keyboard {
crate::platform::toggle_privacy_mode(true);
}
}
}
if self.keyboard {
if let Ok(q) = o.block_input.enum_value() {
if q != BoolOption::NotSet {
crate::platform::block_input(q == BoolOption::Yes);
}
}
}
}
fn on_close(&mut self, reason: &str, lock: bool) {
if let Some(s) = self.server.upgrade() {
s.write().unwrap().remove_connection(&self.inner);
}
log::info!("#{} Connection closed: {}", self.inner.id(), reason);
if lock && self.lock_after_session_end && self.keyboard {
crate::platform::lock_screen();
super::video_service::switch_to_primary();
}
if self.privacy_mode {
crate::platform::toggle_privacy_mode(false);
}
self.port_forward_socket.take();
}
fn read_dir(&mut self, dir: &str, include_hidden: bool) {
let dir = dir.to_string();
self.send_fs(ipc::FS::ReadDir {
dir,
include_hidden,
});
}
#[inline]
async fn send(&mut self, msg: Message) {
allow_err!(self.stream.send(&msg).await);
}
}
async fn start_ipc(
mut rx_to_cm: mpsc::UnboundedReceiver<ipc::Data>,
tx_from_cm: mpsc::UnboundedSender<ipc::Data>,
) -> ResultType<()> {
loop {
if !crate::platform::is_prelogin() {
break;
}
sleep(1.).await;
}
let mut stream = None;
if let Ok(s) = crate::ipc::connect(1000, "_cm").await {
stream = Some(s);
} else {
let run_done;
if crate::platform::is_root() {
let mut res = Ok(None);
for _ in 0..10 {
res = crate::platform::run_as_user("--cm");
if res.is_ok() {
break;
}
sleep(1.).await;
}
if let Some(task) = res? {
super::CHILD_PROCESS.lock().unwrap().push(task);
}
run_done = true;
} else {
run_done = false;
}
if !run_done {
super::CHILD_PROCESS
.lock()
.unwrap()
.push(crate::run_me(vec!["--cm"])?);
}
for _ in 0..10 {
sleep(0.3).await;
if let Ok(s) = crate::ipc::connect(1000, "_cm").await {
stream = Some(s);
break;
}
}
if stream.is_none() {
bail!("Failed to connect to connection manager");
}
}
let mut stream = stream.unwrap();
loop {
tokio::select! {
res = stream.next() => {
match res {
Err(err) => {
return Err(err.into());
}
Ok(Some(data)) => {
match data {
ipc::Data::ClickTime(_)=> {
unsafe {
let ct = *CLICK_TIME.lock().unwrap();
let data = ipc::Data::ClickTime(ct);
stream.send(&data).await?;
}
}
_ => {
tx_from_cm.send(data)?;
}
}
}
_ => {}
}
}
res = rx_to_cm.recv() => {
match res {
Some(data) => {
stream.send(&data).await?;
}
None => {
bail!("expected");
}
}
}
}
}
}
// in case screen is sleep and blank, here to activate it
fn try_activate_screen() {
#[cfg(windows)]
std::thread::spawn(|| {
mouse_move_relative(-6, -6);
std::thread::sleep(std::time::Duration::from_millis(30));
mouse_move_relative(6, 6);
});
}

499
src/server/input_service.rs Normal file
View File

@@ -0,0 +1,499 @@
use super::*;
#[cfg(target_os = "macos")]
use dispatch::Queue;
use enigo::{Enigo, KeyboardControllable, MouseButton, MouseControllable};
use hbb_common::config::COMPRESS_LEVEL;
use std::convert::TryFrom;
#[derive(Default)]
struct StateCursor {
hcursor: u64,
cursor_data: Arc<Message>,
cached_cursor_data: HashMap<u64, Arc<Message>>,
}
impl super::service::Reset for StateCursor {
fn reset(&mut self) {
*self = Default::default();
crate::platform::reset_input_cache();
}
}
#[derive(Default)]
struct StatePos {
cursor_pos: (i32, i32),
}
impl super::service::Reset for StatePos {
fn reset(&mut self) {
self.cursor_pos = (0, 0);
}
}
#[derive(Default)]
struct Input {
conn: i32,
time: i64,
}
static mut LATEST_INPUT: Input = Input { conn: 0, time: 0 };
#[derive(Clone, Default)]
pub struct MouseCursorSub {
inner: ConnInner,
cached: HashMap<u64, Arc<Message>>,
}
impl From<ConnInner> for MouseCursorSub {
fn from(inner: ConnInner) -> Self {
Self {
inner,
cached: HashMap::new(),
}
}
}
impl Subscriber for MouseCursorSub {
#[inline]
fn id(&self) -> i32 {
self.inner.id()
}
#[inline]
fn send(&mut self, msg: Arc<Message>) {
if let Some(message::Union::cursor_data(cd)) = &msg.union {
if let Some(msg) = self.cached.get(&cd.id) {
self.inner.send(msg.clone());
} else {
self.inner.send(msg.clone());
let mut tmp = Message::new();
// only send id out, require client side cache also
tmp.set_cursor_id(cd.id);
self.cached.insert(cd.id, Arc::new(tmp));
}
} else {
self.inner.send(msg);
}
}
}
pub const NAME_CURSOR: &'static str = "mouse_cursor";
pub const NAME_POS: &'static str = "mouse_pos";
pub type MouseCursorService = ServiceTmpl<MouseCursorSub>;
pub fn new_cursor() -> MouseCursorService {
let sp = MouseCursorService::new(NAME_CURSOR, true);
sp.repeat::<StateCursor, _>(33, run_cursor);
sp
}
pub fn new_pos() -> GenericService {
let sp = GenericService::new(NAME_POS, false);
sp.repeat::<StatePos, _>(33, run_pos);
sp
}
fn run_pos(sp: GenericService, state: &mut StatePos) -> ResultType<()> {
if let Some((x, y)) = crate::get_cursor_pos() {
if state.cursor_pos.0 != x || state.cursor_pos.1 != y {
state.cursor_pos = (x, y);
let mut msg_out = Message::new();
msg_out.set_cursor_position(CursorPosition {
x,
y,
..Default::default()
});
let exclude = unsafe {
if crate::get_time() - LATEST_INPUT.time < 300 {
LATEST_INPUT.conn
} else {
0
}
};
sp.send_without(msg_out, exclude);
}
}
sp.snapshot(|sps| {
let mut msg_out = Message::new();
msg_out.set_cursor_position(CursorPosition {
x: state.cursor_pos.0,
y: state.cursor_pos.1,
..Default::default()
});
sps.send(msg_out);
Ok(())
})?;
Ok(())
}
fn run_cursor(sp: MouseCursorService, state: &mut StateCursor) -> ResultType<()> {
if let Some(hcursor) = crate::get_cursor()? {
if hcursor != state.hcursor {
let msg;
if let Some(cached) = state.cached_cursor_data.get(&hcursor) {
super::log::trace!("Cursor data cached, hcursor: {}", hcursor);
msg = cached.clone();
} else {
let mut data = crate::get_cursor_data(hcursor)?;
data.colors = hbb_common::compress::compress(&data.colors[..], COMPRESS_LEVEL);
let mut tmp = Message::new();
tmp.set_cursor_data(data);
msg = Arc::new(tmp);
state.cached_cursor_data.insert(hcursor, msg.clone());
super::log::trace!("Cursor data updated, hcursor: {}", hcursor);
}
state.hcursor = hcursor;
sp.send_shared(msg.clone());
state.cursor_data = msg;
}
}
sp.snapshot(|sps| {
sps.send_shared(state.cursor_data.clone());
Ok(())
})?;
Ok(())
}
lazy_static::lazy_static! {
static ref ENIGO: Arc<Mutex<Enigo>> = Arc::new(Mutex::new(Enigo::new()));
}
// mac key input must be run in main thread, otherwise crash on >= osx 10.15
#[cfg(target_os = "macos")]
lazy_static::lazy_static! {
static ref QUEUE: Queue = Queue::main();
static ref IS_SERVER: bool = std::env::args().nth(1) == Some("--server".to_owned());
}
pub fn is_left_up(evt: &MouseEvent) -> bool {
let buttons = evt.mask >> 3;
let evt_type = evt.mask & 0x7;
return buttons == 1 && evt_type == 2;
}
#[cfg(windows)]
pub fn mouse_move_relative(x: i32, y: i32) {
#[cfg(windows)]
crate::platform::windows::try_change_desktop();
let mut en = ENIGO.lock().unwrap();
en.mouse_move_relative(x, y);
}
#[cfg(not(target_os = "macos"))]
fn modifier_sleep() {
// sleep for a while, this is only for keying in rdp in peer so far
#[cfg(windows)]
std::thread::sleep(std::time::Duration::from_nanos(1));
}
pub fn handle_mouse(evt: &MouseEvent, conn: i32) {
#[cfg(windows)]
crate::platform::windows::try_change_desktop();
let buttons = evt.mask >> 3;
let evt_type = evt.mask & 0x7;
if evt_type == 0 {
unsafe {
let time = crate::get_time();
LATEST_INPUT = Input { time, conn };
}
}
let mut en = ENIGO.lock().unwrap();
#[cfg(not(target_os = "macos"))]
let mut to_release = Vec::new();
#[cfg(target_os = "macos")]
en.reset_flag();
for ref ck in evt.modifiers.iter() {
if let Some(key) = KEY_MAP.get(&ck.value()) {
if evt_type == 1 || evt_type == 2 {
#[cfg(target_os = "macos")]
en.add_flag(key);
#[cfg(not(target_os = "macos"))]
if key != &enigo::Key::CapsLock && key != &enigo::Key::NumLock {
if !en.get_key_state(key.clone()) {
en.key_down(key.clone()).ok();
modifier_sleep();
to_release.push(key);
}
}
}
}
}
match evt_type {
0 => {
en.mouse_move_to(evt.x, evt.y);
}
1 => match buttons {
1 => {
allow_err!(en.mouse_down(MouseButton::Left));
}
2 => {
allow_err!(en.mouse_down(MouseButton::Right));
}
4 => {
allow_err!(en.mouse_down(MouseButton::Middle));
}
_ => {}
},
2 => match buttons {
1 => {
en.mouse_up(MouseButton::Left);
}
2 => {
en.mouse_up(MouseButton::Right);
}
4 => {
en.mouse_up(MouseButton::Middle);
}
_ => {}
},
3 => {
#[allow(unused_mut)]
let mut x = evt.x;
#[allow(unused_mut)]
let mut y = evt.y;
#[cfg(not(windows))]
{
x = -x;
y = -y;
}
if x != 0 {
en.mouse_scroll_x(x);
}
if y != 0 {
en.mouse_scroll_y(y);
}
}
_ => {}
}
#[cfg(not(target_os = "macos"))]
for key in to_release {
en.key_up(key.clone());
}
}
pub fn is_enter(evt: &KeyEvent) -> bool {
if let Some(key_event::Union::control_key(ck)) = evt.union {
if ck.value() == ControlKey::Return.value() || ck.value() == ControlKey::NumpadEnter.value()
{
return true;
}
}
return false;
}
lazy_static::lazy_static! {
static ref KEY_MAP: HashMap<i32, enigo::Key> =
[
(ControlKey::Alt, enigo::Key::Alt),
(ControlKey::Backspace, enigo::Key::Backspace),
(ControlKey::CapsLock, enigo::Key::CapsLock),
(ControlKey::Control, enigo::Key::Control),
(ControlKey::Delete, enigo::Key::Delete),
(ControlKey::DownArrow, enigo::Key::DownArrow),
(ControlKey::End, enigo::Key::End),
(ControlKey::Escape, enigo::Key::Escape),
(ControlKey::F1, enigo::Key::F1),
(ControlKey::F10, enigo::Key::F10),
(ControlKey::F11, enigo::Key::F11),
(ControlKey::F12, enigo::Key::F12),
(ControlKey::F2, enigo::Key::F2),
(ControlKey::F3, enigo::Key::F3),
(ControlKey::F4, enigo::Key::F4),
(ControlKey::F5, enigo::Key::F5),
(ControlKey::F6, enigo::Key::F6),
(ControlKey::F7, enigo::Key::F7),
(ControlKey::F8, enigo::Key::F8),
(ControlKey::F9, enigo::Key::F9),
(ControlKey::Home, enigo::Key::Home),
(ControlKey::LeftArrow, enigo::Key::LeftArrow),
(ControlKey::Meta, enigo::Key::Meta),
(ControlKey::Option, enigo::Key::Option),
(ControlKey::PageDown, enigo::Key::PageDown),
(ControlKey::PageUp, enigo::Key::PageUp),
(ControlKey::Return, enigo::Key::Return),
(ControlKey::RightArrow, enigo::Key::RightArrow),
(ControlKey::Shift, enigo::Key::Shift),
(ControlKey::Space, enigo::Key::Space),
(ControlKey::Tab, enigo::Key::Tab),
(ControlKey::UpArrow, enigo::Key::UpArrow),
(ControlKey::Numpad0, enigo::Key::Numpad0),
(ControlKey::Numpad1, enigo::Key::Numpad1),
(ControlKey::Numpad2, enigo::Key::Numpad2),
(ControlKey::Numpad3, enigo::Key::Numpad3),
(ControlKey::Numpad4, enigo::Key::Numpad4),
(ControlKey::Numpad5, enigo::Key::Numpad5),
(ControlKey::Numpad6, enigo::Key::Numpad6),
(ControlKey::Numpad7, enigo::Key::Numpad7),
(ControlKey::Numpad8, enigo::Key::Numpad8),
(ControlKey::Numpad9, enigo::Key::Numpad9),
(ControlKey::Cancel, enigo::Key::Cancel),
(ControlKey::Clear, enigo::Key::Clear),
(ControlKey::Menu, enigo::Key::Menu),
(ControlKey::Pause, enigo::Key::Pause),
(ControlKey::Kana, enigo::Key::Kana),
(ControlKey::Hangul, enigo::Key::Hangul),
(ControlKey::Junja, enigo::Key::Junja),
(ControlKey::Final, enigo::Key::Final),
(ControlKey::Hanja, enigo::Key::Hanja),
(ControlKey::Kanji, enigo::Key::Kanji),
(ControlKey::Convert, enigo::Key::Convert),
(ControlKey::Select, enigo::Key::Select),
(ControlKey::Print, enigo::Key::Print),
(ControlKey::Execute, enigo::Key::Execute),
(ControlKey::Snapshot, enigo::Key::Snapshot),
(ControlKey::Insert, enigo::Key::Insert),
(ControlKey::Help, enigo::Key::Help),
(ControlKey::Sleep, enigo::Key::Sleep),
(ControlKey::Separator, enigo::Key::Separator),
(ControlKey::Scroll, enigo::Key::Scroll),
(ControlKey::NumLock, enigo::Key::NumLock),
(ControlKey::RWin, enigo::Key::RWin),
(ControlKey::Apps, enigo::Key::Apps),
(ControlKey::Multiply, enigo::Key::Multiply),
(ControlKey::Add, enigo::Key::Add),
(ControlKey::Subtract, enigo::Key::Subtract),
(ControlKey::Decimal, enigo::Key::Decimal),
(ControlKey::Divide, enigo::Key::Divide),
(ControlKey::Equals, enigo::Key::Equals),
(ControlKey::NumpadEnter, enigo::Key::NumpadEnter),
].iter().map(|(a, b)| (a.value(), b.clone())).collect();
static ref NUMPAD_KEY_MAP: HashMap<i32, bool> =
[
(ControlKey::Home, true),
(ControlKey::UpArrow, true),
(ControlKey::PageUp, true),
(ControlKey::LeftArrow, true),
(ControlKey::RightArrow, true),
(ControlKey::End, true),
(ControlKey::DownArrow, true),
(ControlKey::PageDown, true),
(ControlKey::Insert, true),
(ControlKey::Delete, true),
].iter().map(|(a, b)| (a.value(), b.clone())).collect();
}
pub fn handle_key(evt: &KeyEvent) {
#[cfg(target_os = "macos")]
if !*IS_SERVER {
// having GUI, run main GUI thread, otherwise crash
let evt = evt.clone();
QUEUE.exec_async(move || handle_key_(&evt));
return;
}
handle_key_(evt);
}
fn handle_key_(evt: &KeyEvent) {
#[cfg(windows)]
crate::platform::windows::try_change_desktop();
let mut en = ENIGO.lock().unwrap();
// disable numlock if press home etc when numlock is on,
// because we will get numpad value (7,8,9 etc) if not
#[cfg(windows)]
let mut disable_numlock = false;
#[cfg(target_os = "macos")]
en.reset_flag();
#[cfg(not(target_os = "macos"))]
let mut to_release = Vec::new();
#[cfg(not(target_os = "macos"))]
let mut has_cap = false;
#[cfg(windows)]
let mut has_numlock = false;
for ref ck in evt.modifiers.iter() {
if let Some(key) = KEY_MAP.get(&ck.value()) {
#[cfg(target_os = "macos")]
en.add_flag(key);
#[cfg(not(target_os = "macos"))]
{
if key == &enigo::Key::CapsLock {
has_cap = true;
} else if key == &enigo::Key::NumLock {
#[cfg(windows)]
{
has_numlock = true;
}
} else {
if !en.get_key_state(key.clone()) {
en.key_down(key.clone()).ok();
modifier_sleep();
to_release.push(key);
}
}
}
}
}
#[cfg(not(target_os = "macos"))]
if crate::common::valid_for_capslock(evt) {
if has_cap != en.get_key_state(enigo::Key::CapsLock) {
en.key_down(enigo::Key::CapsLock).ok();
en.key_up(enigo::Key::CapsLock);
}
}
#[cfg(windows)]
if crate::common::valid_for_numlock(evt) {
if has_numlock != en.get_key_state(enigo::Key::NumLock) {
en.key_down(enigo::Key::NumLock).ok();
en.key_up(enigo::Key::NumLock);
}
}
match evt.union {
Some(key_event::Union::control_key(ck)) => {
if let Some(key) = KEY_MAP.get(&ck.value()) {
#[cfg(windows)]
if let Some(_) = NUMPAD_KEY_MAP.get(&ck.value()) {
disable_numlock = en.get_key_state(enigo::Key::NumLock);
if disable_numlock {
en.key_down(enigo::Key::NumLock).ok();
en.key_up(enigo::Key::NumLock);
}
}
if evt.down {
allow_err!(en.key_down(key.clone()));
} else {
en.key_up(key.clone());
}
} else if ck.value() == ControlKey::CtrlAltDel.value() {
// have to spawn new thread because send_sas is tokio_main, the caller can not be tokio_main.
std::thread::spawn(|| {
allow_err!(send_sas());
});
} else if ck.value() == ControlKey::LockScreen.value() {
crate::platform::lock_screen();
super::video_service::switch_to_primary();
}
}
Some(key_event::Union::chr(chr)) => {
if evt.down {
allow_err!(en.key_down(enigo::Key::Layout(chr as u8 as _)));
} else {
en.key_up(enigo::Key::Layout(chr as u8 as _));
}
}
Some(key_event::Union::unicode(chr)) => {
if let Ok(chr) = char::try_from(chr) {
en.key_sequence(&chr.to_string());
}
}
Some(key_event::Union::seq(ref seq)) => {
en.key_sequence(&seq);
}
_ => {}
}
#[cfg(not(target_os = "macos"))]
for key in to_release {
en.key_up(key.clone());
}
#[cfg(windows)]
if disable_numlock {
en.key_down(enigo::Key::NumLock).ok();
en.key_up(enigo::Key::NumLock);
}
}
#[tokio::main(basic_scheduler)]
async fn send_sas() -> ResultType<()> {
let mut stream = crate::ipc::connect(1000, crate::POSTFIX_SERVICE).await?;
timeout(1000, stream.send(&crate::ipc::Data::SAS)).await??;
Ok(())
}

249
src/server/service.rs Normal file
View File

@@ -0,0 +1,249 @@
use super::*;
use std::{
thread::{self, JoinHandle},
time,
};
pub trait Service: Send + Sync {
fn name(&self) -> &'static str;
fn on_subscribe(&self, sub: ConnInner);
fn on_unsubscribe(&self, id: i32);
fn is_subed(&self, id: i32) -> bool;
fn join(&self);
}
pub trait Subscriber: Default + Send + Sync + 'static {
fn id(&self) -> i32;
fn send(&mut self, msg: Arc<Message>);
}
#[derive(Default)]
pub struct ServiceInner<T: Subscriber + From<ConnInner>> {
name: &'static str,
handle: Option<JoinHandle<()>>,
subscribes: HashMap<i32, T>,
new_subscribes: HashMap<i32, T>,
active: bool,
need_snapshot: bool,
}
pub trait Reset {
fn reset(&mut self);
}
pub struct ServiceTmpl<T: Subscriber + From<ConnInner>>(Arc<RwLock<ServiceInner<T>>>);
pub struct ServiceSwap<T: Subscriber + From<ConnInner>>(ServiceTmpl<T>);
pub type GenericService = ServiceTmpl<ConnInner>;
pub const HIBERATE_TIMEOUT: u64 = 30;
pub const MAX_ERROR_TIMEOUT: u64 = 1_000;
impl<T: Subscriber + From<ConnInner>> ServiceInner<T> {
fn send_new_subscribes(&mut self, msg: Arc<Message>) {
for s in self.new_subscribes.values_mut() {
s.send(msg.clone());
}
}
fn swap_new_subscribes(&mut self) {
for (_, s) in self.new_subscribes.drain() {
self.subscribes.insert(s.id(), s);
}
assert!(self.new_subscribes.is_empty());
}
#[inline]
fn has_subscribes(&self) -> bool {
self.subscribes.len() > 0 || self.new_subscribes.len() > 0
}
}
impl<T: Subscriber + From<ConnInner>> Service for ServiceTmpl<T> {
#[inline]
fn name(&self) -> &'static str {
self.0.read().unwrap().name
}
fn is_subed(&self, id: i32) -> bool {
self.0.read().unwrap().subscribes.get(&id).is_some()
}
fn on_subscribe(&self, sub: ConnInner) {
let mut lock = self.0.write().unwrap();
if lock.subscribes.get(&sub.id()).is_some() {
return;
}
if lock.need_snapshot {
lock.new_subscribes.insert(sub.id(), sub.into());
} else {
lock.subscribes.insert(sub.id(), sub.into());
}
}
fn on_unsubscribe(&self, id: i32) {
let mut lock = self.0.write().unwrap();
if let None = lock.subscribes.remove(&id) {
lock.new_subscribes.remove(&id);
}
}
fn join(&self) {
self.0.write().unwrap().active = false;
self.0.write().unwrap().handle.take().map(JoinHandle::join);
}
}
impl<T: Subscriber + From<ConnInner>> Clone for ServiceTmpl<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T: Subscriber + From<ConnInner>> ServiceTmpl<T> {
pub fn new(name: &'static str, need_snapshot: bool) -> Self {
Self(Arc::new(RwLock::new(ServiceInner::<T> {
name,
active: true,
need_snapshot,
..Default::default()
})))
}
#[inline]
pub fn has_subscribes(&self) -> bool {
self.0.read().unwrap().has_subscribes()
}
#[inline]
pub fn ok(&self) -> bool {
let lock = self.0.read().unwrap();
lock.active && lock.has_subscribes()
}
pub fn snapshot<F>(&self, callback: F) -> ResultType<()>
where
F: FnMut(ServiceSwap<T>) -> ResultType<()>,
{
if self.0.read().unwrap().new_subscribes.len() > 0 {
log::info!("Call snapshot of {} service", self.name());
let mut callback = callback;
callback(ServiceSwap::<T>(self.clone()))?;
}
Ok(())
}
#[inline]
pub fn send(&self, msg: Message) {
self.send_shared(Arc::new(msg));
}
pub fn send_shared(&self, msg: Arc<Message>) {
let mut lock = self.0.write().unwrap();
for s in lock.subscribes.values_mut() {
s.send(msg.clone());
}
}
pub fn send_without(&self, msg: Message, sub: i32) {
let mut lock = self.0.write().unwrap();
let msg = Arc::new(msg);
for s in lock.subscribes.values_mut() {
if sub != s.id() {
s.send(msg.clone());
}
}
}
pub fn repeat<S, F>(&self, interval_ms: u64, callback: F)
where
F: 'static + FnMut(Self, &mut S) -> ResultType<()> + Send,
S: 'static + Default + Reset,
{
let interval = time::Duration::from_millis(interval_ms);
let mut callback = callback;
let sp = self.clone();
let thread = thread::spawn(move || {
let mut state = S::default();
while sp.active() {
let now = time::Instant::now();
if sp.has_subscribes() {
if let Err(err) = callback(sp.clone(), &mut state) {
log::error!("Error of {} service: {}", sp.name(), err);
thread::sleep(time::Duration::from_millis(MAX_ERROR_TIMEOUT));
#[cfg(windows)]
crate::platform::windows::try_change_desktop();
}
} else {
state.reset();
}
let elapsed = now.elapsed();
if elapsed < interval {
thread::sleep(interval - elapsed);
}
}
});
self.0.write().unwrap().handle = Some(thread);
}
pub fn run<F>(&self, callback: F)
where
F: 'static + FnMut(Self) -> ResultType<()> + Send,
{
let sp = self.clone();
let mut callback = callback;
let thread = thread::spawn(move || {
let mut error_timeout = HIBERATE_TIMEOUT;
while sp.active() {
if sp.has_subscribes() {
log::debug!("Enter {} service inner loop", sp.name());
let tm = time::Instant::now();
if let Err(err) = callback(sp.clone()) {
log::error!("Error of {} service: {}", sp.name(), err);
if tm.elapsed() > time::Duration::from_millis(MAX_ERROR_TIMEOUT) {
error_timeout = HIBERATE_TIMEOUT;
} else {
error_timeout *= 2;
}
if error_timeout > MAX_ERROR_TIMEOUT {
error_timeout = MAX_ERROR_TIMEOUT;
}
thread::sleep(time::Duration::from_millis(error_timeout));
#[cfg(windows)]
crate::platform::windows::try_change_desktop();
} else {
log::debug!("Exit {} service inner loop", sp.name());
}
}
thread::sleep(time::Duration::from_millis(HIBERATE_TIMEOUT));
}
});
self.0.write().unwrap().handle = Some(thread);
}
#[inline]
pub fn active(&self) -> bool {
self.0.read().unwrap().active
}
}
impl<T: Subscriber + From<ConnInner>> ServiceSwap<T> {
#[inline]
pub fn send(&self, msg: Message) {
self.send_shared(Arc::new(msg));
}
#[inline]
pub fn send_shared(&self, msg: Arc<Message>) {
(self.0).0.write().unwrap().send_new_subscribes(msg);
}
#[inline]
pub fn has_subscribes(&self) -> bool {
(self.0).0.read().unwrap().subscribes.len() > 0
}
}
impl<T: Subscriber + From<ConnInner>> Drop for ServiceSwap<T> {
fn drop(&mut self) {
(self.0).0.write().unwrap().swap_new_subscribes();
}
}

384
src/server/video_service.rs Normal file
View File

@@ -0,0 +1,384 @@
// 24FPS (actually 23.976FPS) is what video professionals ages ago determined to be the
// slowest playback rate that still looks smooth enough to feel real.
// Our eyes can see a slight difference and even though 30FPS actually shows
// more information and is more realistic.
// 60FPS is commonly used in game, teamviewer 12 support this for video editing user.
// how to capture with mouse cursor:
// https://docs.microsoft.com/zh-cn/windows/win32/direct3ddxgi/desktop-dup-api?redirectedfrom=MSDN
// 实现了硬件编解码和音频抓取,还绘制了鼠标
// https://github.com/PHZ76/DesktopSharing
// dxgi memory leak issue
// https://stackoverflow.com/questions/47801238/memory-leak-in-creating-direct2d-device
// but per my test, it is more related to AcquireNextFrame,
// https://forums.developer.nvidia.com/t/dxgi-outputduplication-memory-leak-when-using-nv-but-not-amd-drivers/108582
// to-do:
// https://slhck.info/video/2017/03/01/rate-control.html
use super::*;
use scrap::{Capturer, Config, Display, EncodeFrame, Encoder, VideoCodecId, STRIDE_ALIGN};
use std::{
io::ErrorKind::WouldBlock,
time::{self, Instant},
};
const WAIT_BASE: i32 = 17;
pub const NAME: &'static str = "video";
lazy_static::lazy_static! {
static ref CURRENT_DISPLAY: Arc<Mutex<usize>> = Arc::new(Mutex::new(usize::MAX));
static ref LAST_ACTIVE: Arc<Mutex<Instant>> = Arc::new(Mutex::new(Instant::now()));
static ref SWITCH: Arc<Mutex<bool>> = Default::default();
static ref INTERNAL_LATENCIES: Arc<Mutex<HashMap<i32, i64>>> = Default::default();
static ref TEST_LATENCIES: Arc<Mutex<HashMap<i32, i64>>> = Default::default();
static ref IMAGE_QUALITIES: Arc<Mutex<HashMap<i32, i32>>> = Default::default();
}
pub fn new() -> GenericService {
let sp = GenericService::new(NAME, true);
sp.run(run);
sp
}
fn run(sp: GenericService) -> ResultType<()> {
let fps = 30;
let spf = time::Duration::from_secs_f32(1. / (fps as f32));
let (n, current, display) = get_current_display()?;
let (origin, width, height) = (display.origin(), display.width(), display.height());
log::debug!(
"#displays={}, current={}, origin: {:?}, width={}, height={}",
n,
current,
&origin,
width,
height
);
// Capturer object is expensive, avoiding to create it frequently.
let mut c = Capturer::new(display, true).with_context(|| "Failed to create capturer")?;
let q = get_image_quality();
let (bitrate, rc_min_quantizer, rc_max_quantizer, speed) = get_quality(width, height, q);
log::info!("bitrate={}, rc_min_quantizer={}", bitrate, rc_min_quantizer);
let mut wait = WAIT_BASE;
let cfg = Config {
width: width as _,
height: height as _,
timebase: [1, 1000], // Output timestamp precision
bitrate,
codec: VideoCodecId::VP9,
rc_min_quantizer,
rc_max_quantizer,
speed,
};
let mut vpx = Encoder::new(&cfg, 1).with_context(|| "Failed to create encoder")?;
if *SWITCH.lock().unwrap() {
log::debug!("Broadcasting display switch");
let mut misc = Misc::new();
misc.set_switch_display(SwitchDisplay {
display: current as _,
x: origin.0 as _,
y: origin.1 as _,
width: width as _,
height: height as _,
..Default::default()
});
let mut msg_out = Message::new();
msg_out.set_misc(misc);
*SWITCH.lock().unwrap() = false;
sp.send(msg_out);
}
#[cfg(windows)]
if !c.is_gdi() {
// dxgi duplicateoutput has no output if display no change, so we use gdi this as workaround
if c.set_gdi() {
// dgx capture release has memory leak somehow, so use gdi always before fixing it, just sacrificing some cpu
/*
if let Ok(frame) = c.frame(wait as _) {
handle_one_frame(&sp, &frame, 0, &mut vpx)?;
}
c.cancel_gdi();
*/
}
}
let start = time::Instant::now();
let mut crc = (0, 0);
let mut last_sent = time::Instant::now();
while sp.ok() {
if *SWITCH.lock().unwrap() {
bail!("SWITCH");
}
if current != *CURRENT_DISPLAY.lock().unwrap() {
*SWITCH.lock().unwrap() = true;
bail!("SWITCH");
}
if get_image_quality() != q {
bail!("SWITCH");
}
#[cfg(windows)]
{
if crate::platform::windows::desktop_changed() {
bail!("Desktop changed");
}
}
let now = time::Instant::now();
*LAST_ACTIVE.lock().unwrap() = now;
if get_latency() < 1000 || last_sent.elapsed().as_millis() > 1000 {
match c.frame(wait as _) {
Ok(frame) => {
let time = now - start;
let ms = (time.as_secs() * 1000 + time.subsec_millis() as u64) as i64;
handle_one_frame(&sp, &frame, ms, &mut crc, &mut vpx)?;
last_sent = now;
}
Err(ref e) if e.kind() == WouldBlock => {
// https://github.com/NVIDIA/video-sdk-samples/tree/master/nvEncDXGIOutputDuplicationSample
wait = WAIT_BASE - now.elapsed().as_millis() as i32;
if wait < 0 {
wait = 0
}
continue;
}
Err(err) => {
return Err(err.into());
}
}
}
let elapsed = now.elapsed();
// may need to enable frame(timeout)
log::trace!("{:?} {:?}", time::Instant::now(), elapsed);
if elapsed < spf {
std::thread::sleep(spf - elapsed);
}
}
Ok(())
}
#[inline]
fn create_msg(vp9s: Vec<VP9>) -> Message {
let mut msg_out = Message::new();
let mut vf = VideoFrame::new();
vf.set_vp9s(VP9s {
frames: vp9s.into(),
..Default::default()
});
msg_out.set_video_frame(vf);
msg_out
}
#[inline]
fn create_frame(frame: &EncodeFrame) -> VP9 {
VP9 {
data: frame.data.to_vec(),
key: frame.key,
pts: frame.pts,
..Default::default()
}
}
#[inline]
fn handle_one_frame(
sp: &GenericService,
frame: &[u8],
ms: i64,
crc: &mut (u32, u32),
vpx: &mut Encoder,
) -> ResultType<()> {
sp.snapshot(|sps| {
// so that new sub and old sub share the same encoder after switch
if sps.has_subscribes() {
bail!("SWITCH");
}
Ok(())
})?;
let mut hasher = crc32fast::Hasher::new();
hasher.update(frame);
let checksum = hasher.finalize();
if checksum != crc.0 {
crc.0 = checksum;
crc.1 = 0;
} else {
crc.1 += 1;
}
if crc.1 <= 180 && crc.1 % 5 == 0 {
let mut frames = Vec::new();
for ref frame in vpx
.encode(ms, frame, STRIDE_ALIGN)
.with_context(|| "Failed to encode")?
{
frames.push(create_frame(frame));
}
for ref frame in vpx.flush().with_context(|| "Failed to flush")? {
frames.push(create_frame(frame));
}
// to-do: flush periodically, e.g. 1 second
if frames.len() > 0 {
sp.send(create_msg(frames));
}
}
Ok(())
}
pub fn get_displays() -> ResultType<(usize, Vec<DisplayInfo>)> {
// switch to primary display if long time (30 seconds) no users
if LAST_ACTIVE.lock().unwrap().elapsed().as_secs() >= 30 {
*CURRENT_DISPLAY.lock().unwrap() = usize::MAX;
}
let mut displays = Vec::new();
let mut primary = 0;
for (i, d) in Display::all()?.iter().enumerate() {
if d.is_primary() {
primary = i;
}
displays.push(DisplayInfo {
x: d.origin().0 as _,
y: d.origin().1 as _,
width: d.width() as _,
height: d.height() as _,
name: d.name(),
online: d.is_online(),
..Default::default()
});
}
let mut lock = CURRENT_DISPLAY.lock().unwrap();
if *lock >= displays.len() {
*lock = primary
}
Ok((*lock, displays))
}
pub fn switch_display(i: i32) {
let i = i as usize;
if let Ok((_, displays)) = get_displays() {
if i < displays.len() {
*CURRENT_DISPLAY.lock().unwrap() = i;
}
}
}
pub fn refresh() {
*SWITCH.lock().unwrap() = true;
}
fn get_primary() -> usize {
if let Ok(all) = Display::all() {
for (i, d) in all.iter().enumerate() {
if d.is_primary() {
return i;
}
}
}
0
}
pub fn switch_to_primary() {
switch_display(get_primary() as _);
}
fn get_current_display() -> ResultType<(usize, usize, Display)> {
let mut current = *CURRENT_DISPLAY.lock().unwrap() as usize;
let mut displays = Display::all()?;
if displays.len() == 0 {
bail!("No displays");
}
let n = displays.len();
if current >= n {
current = 0;
for (i, d) in displays.iter().enumerate() {
if d.is_primary() {
current = i;
break;
}
}
*CURRENT_DISPLAY.lock().unwrap() = current;
}
return Ok((n, current, displays.remove(current)));
}
#[inline]
fn update_latency(id: i32, latency: i64, latencies: &mut HashMap<i32, i64>) {
if latency <= 0 {
latencies.remove(&id);
} else {
latencies.insert(id, latency);
}
}
pub fn update_test_latency(id: i32, latency: i64) {
update_latency(id, latency, &mut *TEST_LATENCIES.lock().unwrap());
}
pub fn update_internal_latency(id: i32, latency: i64) {
update_latency(id, latency, &mut *INTERNAL_LATENCIES.lock().unwrap());
}
pub fn get_latency() -> i64 {
INTERNAL_LATENCIES
.lock()
.unwrap()
.values()
.max()
.unwrap_or(&0)
.clone()
}
fn convert_quality(q: i32) -> i32 {
let q = {
if q == ImageQuality::Balanced.value() {
(100 * 2 / 3, 12)
} else if q == ImageQuality::Low.value() {
(100 / 2, 18)
} else if q == ImageQuality::Best.value() {
(100, 12)
} else {
let bitrate = q >> 8 & 0xFF;
let quantizer = q & 0xFF;
(bitrate * 2, (100 - quantizer) * 36 / 100)
}
};
if q.0 <= 0 {
0
} else {
q.0 << 8 | q.1
}
}
pub fn update_image_quality(id: i32, q: Option<i32>) {
match q {
Some(q) => {
let q = convert_quality(q);
if q > 0 {
IMAGE_QUALITIES.lock().unwrap().insert(id, q);
} else {
IMAGE_QUALITIES.lock().unwrap().remove(&id);
}
}
None => {
IMAGE_QUALITIES.lock().unwrap().remove(&id);
}
}
}
fn get_image_quality() -> i32 {
IMAGE_QUALITIES
.lock()
.unwrap()
.values()
.min()
.unwrap_or(&convert_quality(ImageQuality::Balanced.value()))
.clone()
}
#[inline]
fn get_quality(w: usize, h: usize, q: i32) -> (u32, u32, u32, i32) {
// https://www.nvidia.com/en-us/geforce/guides/broadcasting-guide/
let bitrate = q >> 8 & 0xFF;
let quantizer = q & 0xFF;
let b = ((w * h) / 1000) as u32;
(bitrate as u32 * b / 100, quantizer as _, 56, 7)
}