100% open source

This commit is contained in:
rustdesk
2022-05-12 17:35:25 +08:00
parent 9098619162
commit c1bad84a86
58 changed files with 8397 additions and 3292 deletions

View File

@@ -1,381 +1,426 @@
// both soundio and cpal use wasapi on windows and coreaudio on mac, they do not support loopback.
// libpulseaudio support loopback because pulseaudio is a standalone audio service with some
// configuration, but need to install the library and start the service on OS, not a good choice.
// windows: https://docs.microsoft.com/en-us/windows/win32/coreaudio/loopback-recording
// mac: https://github.com/mattingalls/Soundflower
// https://docs.microsoft.com/en-us/windows/win32/api/audioclient/nn-audioclient-iaudioclient
// https://github.com/ExistentialAudio/BlackHole
// if pactl not work, please run
// sudo apt-get --purge --reinstall install pulseaudio
// https://askubuntu.com/questions/403416/how-to-listen-live-sounds-from-input-from-external-sound-card
// https://wiki.debian.org/audio-loopback
// https://github.com/krruzic/pulsectl
use super::*;
use magnum_opus::{Application::*, Channels::*, Encoder};
use std::sync::atomic::{AtomicBool, Ordering};
pub const NAME: &'static str = "audio";
pub const AUDIO_DATA_SIZE_U8: usize = 960 * 4; // 10ms in 48000 stereo
static RESTARTING: AtomicBool = AtomicBool::new(false);
#[cfg(not(target_os = "linux"))]
pub fn new() -> GenericService {
let sp = GenericService::new(NAME, true);
sp.repeat::<cpal_impl::State, _>(33, cpal_impl::run);
sp
}
#[cfg(target_os = "linux")]
pub fn new() -> GenericService {
let sp = GenericService::new(NAME, true);
sp.run(pa_impl::run);
sp
}
pub fn restart() {
log::info!("restart the audio service, freezing now...");
if RESTARTING.load(Ordering::SeqCst) {
return;
}
RESTARTING.store(true, Ordering::SeqCst);
}
#[cfg(target_os = "linux")]
mod pa_impl {
use super::*;
#[tokio::main(flavor = "current_thread")]
pub async fn run(sp: GenericService) -> ResultType<()> {
hbb_common::sleep(0.1).await; // one moment to wait for _pa ipc
RESTARTING.store(false, Ordering::SeqCst);
let mut stream = crate::ipc::connect(1000, "_pa").await?;
unsafe {
AUDIO_ZERO_COUNT = 0;
}
let mut encoder = Encoder::new(crate::platform::linux::PA_SAMPLE_RATE, Stereo, LowDelay)?;
allow_err!(
stream
.send(&crate::ipc::Data::Config((
"audio-input".to_owned(),
Some(Config::get_option("audio-input"))
)))
.await
);
let zero_audio_frame: Vec<f32> = vec![0.; AUDIO_DATA_SIZE_U8 / 4];
while sp.ok() && !RESTARTING.load(Ordering::SeqCst) {
sp.snapshot(|sps| {
sps.send(create_format_msg(crate::platform::linux::PA_SAMPLE_RATE, 2));
Ok(())
})?;
if let Ok(data) = stream.next_raw().await {
if data.len() == 0 {
send_f32(&zero_audio_frame, &mut encoder, &sp);
continue;
}
if data.len() != AUDIO_DATA_SIZE_U8 {
continue;
}
let data = unsafe {
std::slice::from_raw_parts::<f32>(data.as_ptr() as _, data.len() / 4)
};
send_f32(data, &mut encoder, &sp);
}
}
Ok(())
}
}
#[cfg(not(target_os = "linux"))]
mod cpal_impl {
use super::*;
use cpal::{
traits::{DeviceTrait, HostTrait, StreamTrait},
Device, Host, SupportedStreamConfig,
};
lazy_static::lazy_static! {
static ref HOST: Host = cpal::default_host();
}
#[derive(Default)]
pub struct State {
stream: Option<(Box<dyn StreamTrait>, Arc<Message>)>,
}
impl super::service::Reset for State {
fn reset(&mut self) {
self.stream.take();
}
}
pub fn run(sp: GenericService, state: &mut State) -> ResultType<()> {
sp.snapshot(|sps| {
match &state.stream {
None => {
state.stream = Some(play(&sp)?);
}
_ => {}
}
if let Some((_, format)) = &state.stream {
sps.send_shared(format.clone());
}
Ok(())
})?;
Ok(())
}
fn send(
data: &[f32],
sample_rate0: u32,
sample_rate: u32,
channels: u16,
encoder: &mut Encoder,
sp: &GenericService,
) {
let buffer;
let data = if sample_rate0 != sample_rate {
buffer = crate::common::resample_channels(data, sample_rate0, sample_rate, channels);
&buffer
} else {
data
};
send_f32(data, encoder, sp);
}
#[cfg(windows)]
fn get_device() -> ResultType<(Device, SupportedStreamConfig)> {
let audio_input = Config::get_option("audio-input");
if !audio_input.is_empty() {
return get_audio_input(&audio_input);
}
let device = HOST
.default_output_device()
.with_context(|| "Failed to get default output device for loopback")?;
log::info!(
"Default output device: {}",
device.name().unwrap_or("".to_owned())
);
let format = device
.default_output_config()
.map_err(|e| anyhow!(e))
.with_context(|| "Failed to get default output format")?;
log::info!("Default output format: {:?}", format);
Ok((device, format))
}
#[cfg(not(windows))]
fn get_device() -> ResultType<(Device, SupportedStreamConfig)> {
let audio_input = Config::get_option("audio-input");
get_audio_input(&audio_input)
}
fn get_audio_input(audio_input: &str) -> ResultType<(Device, SupportedStreamConfig)> {
let mut device = None;
if !audio_input.is_empty() {
for d in HOST
.devices()
.with_context(|| "Failed to get audio devices")?
{
if d.name().unwrap_or("".to_owned()) == audio_input {
device = Some(d);
break;
}
}
}
if device.is_none() {
device = Some(
HOST.default_input_device()
.with_context(|| "Failed to get default input device for loopback")?,
);
}
let device = device.unwrap();
log::info!("Input device: {}", device.name().unwrap_or("".to_owned()));
let format = device
.default_input_config()
.map_err(|e| anyhow!(e))
.with_context(|| "Failed to get default input format")?;
log::info!("Default input format: {:?}", format);
Ok((device, format))
}
fn play(sp: &GenericService) -> ResultType<(Box<dyn StreamTrait>, Arc<Message>)> {
let (device, config) = get_device()?;
let sp = sp.clone();
let err_fn = move |err| {
// too many UnknownErrno, will improve later
log::trace!("an error occurred on stream: {}", err);
};
// Sample rate must be one of 8000, 12000, 16000, 24000, or 48000.
let sample_rate_0 = config.sample_rate().0;
let sample_rate = if sample_rate_0 < 12000 {
8000
} else if sample_rate_0 < 16000 {
12000
} else if sample_rate_0 < 24000 {
16000
} else if sample_rate_0 < 48000 {
24000
} else {
48000
};
log::debug!("Audio sample rate : {}", sample_rate);
unsafe {
AUDIO_ZERO_COUNT = 0;
}
let mut encoder = Encoder::new(
sample_rate,
if config.channels() > 1 { Stereo } else { Mono },
LowDelay,
)?;
let channels = config.channels();
let stream = match config.sample_format() {
cpal::SampleFormat::F32 => device.build_input_stream(
&config.into(),
move |data, _: &_| {
send(
data,
sample_rate_0,
sample_rate,
channels,
&mut encoder,
&sp,
);
},
err_fn,
)?,
cpal::SampleFormat::I16 => device.build_input_stream(
&config.into(),
move |data: &[i16], _: &_| {
let buffer: Vec<_> = data.iter().map(|s| cpal::Sample::to_f32(s)).collect();
send(
&buffer,
sample_rate_0,
sample_rate,
channels,
&mut encoder,
&sp,
);
},
err_fn,
)?,
cpal::SampleFormat::U16 => device.build_input_stream(
&config.into(),
move |data: &[u16], _: &_| {
let buffer: Vec<_> = data.iter().map(|s| cpal::Sample::to_f32(s)).collect();
send(
&buffer,
sample_rate_0,
sample_rate,
channels,
&mut encoder,
&sp,
);
},
err_fn,
)?,
};
stream.play()?;
Ok((
Box::new(stream),
Arc::new(create_format_msg(sample_rate, channels)),
))
}
}
fn create_format_msg(sample_rate: u32, channels: u16) -> Message {
let format = AudioFormat {
sample_rate,
channels: channels as _,
..Default::default()
};
let mut misc = Misc::new();
misc.set_audio_format(format);
let mut msg = Message::new();
msg.set_misc(misc);
msg
}
// use AUDIO_ZERO_COUNT for the Noise(Zero) Gate Attack Time
// every audio data length is set to 480
// MAX_AUDIO_ZERO_COUNT=800 is similar as Gate Attack Time 3~5s(Linux) || 6~8s(Windows)
const MAX_AUDIO_ZERO_COUNT: u16 = 800;
static mut AUDIO_ZERO_COUNT: u16 = 0;
fn send_f32(data: &[f32], encoder: &mut Encoder, sp: &GenericService) {
if data.iter().filter(|x| **x != 0.).next().is_some() {
unsafe {
AUDIO_ZERO_COUNT = 0;
}
} else {
unsafe {
if AUDIO_ZERO_COUNT > MAX_AUDIO_ZERO_COUNT {
if AUDIO_ZERO_COUNT == MAX_AUDIO_ZERO_COUNT + 1 {
log::debug!("Audio Zero Gate Attack");
AUDIO_ZERO_COUNT += 1;
}
return;
}
AUDIO_ZERO_COUNT += 1;
}
}
match encoder.encode_vec_float(data, data.len() * 6) {
Ok(data) => {
let mut msg_out = Message::new();
msg_out.set_audio_frame(AudioFrame {
data,
..Default::default()
});
sp.send(msg_out);
}
Err(_) => {}
}
}
#[cfg(test)]
mod tests {
#[cfg(target_os = "linux")]
#[test]
fn test_pulse() {
let spec = pulse::sample::Spec {
format: pulse::sample::SAMPLE_FLOAT32NE,
channels: 2,
rate: 24000,
};
let hspec = hound::WavSpec {
channels: spec.channels as _,
sample_rate: spec.rate as _,
bits_per_sample: (4 * 8) as _,
sample_format: hound::SampleFormat::Float,
};
const PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/recorded.wav");
let mut writer =
hound::WavWriter::create(PATH, hspec).expect("Could not create hsound writer");
let device = crate::platform::linux::get_pa_monitor();
let s = psimple::Simple::new(
None, // Use the default server
"Test", // Our applications name
pulse::stream::Direction::Record, // We want a record stream
Some(&device), // Use the default device
"Test", // Description of our stream
&spec, // Our sample format
None, // Use default channel map
None, // Use default buffering attributes
)
.expect("Could not create simple pulse");
let mut out: Vec<u8> = Vec::with_capacity(1024);
unsafe {
out.set_len(out.capacity());
}
for _ in 0..600 {
s.read(&mut out).expect("Could not read pcm");
let out2 =
unsafe { std::slice::from_raw_parts::<f32>(out.as_ptr() as _, out.len() / 4) };
for v in out2 {
writer.write_sample(*v).ok();
}
}
println!("{:?} {}", device, out.len());
writer.finalize().expect("Could not finalize writer");
}
}
// both soundio and cpal use wasapi on windows and coreaudio on mac, they do not support loopback.
// libpulseaudio support loopback because pulseaudio is a standalone audio service with some
// configuration, but need to install the library and start the service on OS, not a good choice.
// windows: https://docs.microsoft.com/en-us/windows/win32/coreaudio/loopback-recording
// mac: https://github.com/mattingalls/Soundflower
// https://docs.microsoft.com/en-us/windows/win32/api/audioclient/nn-audioclient-iaudioclient
// https://github.com/ExistentialAudio/BlackHole
// if pactl not work, please run
// sudo apt-get --purge --reinstall install pulseaudio
// https://askubuntu.com/questions/403416/how-to-listen-live-sounds-from-input-from-external-sound-card
// https://wiki.debian.org/audio-loopback
// https://github.com/krruzic/pulsectl
use super::*;
use magnum_opus::{Application::*, Channels::*, Encoder};
use std::sync::atomic::{AtomicBool, Ordering};
pub const NAME: &'static str = "audio";
pub const AUDIO_DATA_SIZE_U8: usize = 960 * 4; // 10ms in 48000 stereo
static RESTARTING: AtomicBool = AtomicBool::new(false);
#[cfg(not(any(target_os = "linux", target_os = "android")))]
pub fn new() -> GenericService {
let sp = GenericService::new(NAME, true);
sp.repeat::<cpal_impl::State, _>(33, cpal_impl::run);
sp
}
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn new() -> GenericService {
let sp = GenericService::new(NAME, true);
sp.run(pa_impl::run);
sp
}
pub fn restart() {
log::info!("restart the audio service, freezing now...");
if RESTARTING.load(Ordering::SeqCst) {
return;
}
RESTARTING.store(true, Ordering::SeqCst);
}
#[cfg(any(target_os = "linux", target_os = "android"))]
mod pa_impl {
use super::*;
#[tokio::main(flavor = "current_thread")]
pub async fn run(sp: GenericService) -> ResultType<()> {
hbb_common::sleep(0.1).await; // one moment to wait for _pa ipc
RESTARTING.store(false, Ordering::SeqCst);
#[cfg(target_os = "linux")]
let mut stream = crate::ipc::connect(1000, "_pa").await?;
unsafe {
AUDIO_ZERO_COUNT = 0;
}
let mut encoder = Encoder::new(crate::platform::PA_SAMPLE_RATE, Stereo, LowDelay)?;
#[cfg(target_os = "linux")]
allow_err!(
stream
.send(&crate::ipc::Data::Config((
"audio-input".to_owned(),
Some(Config::get_option("audio-input"))
)))
.await
);
let zero_audio_frame: Vec<f32> = vec![0.; AUDIO_DATA_SIZE_U8 / 4];
while sp.ok() && !RESTARTING.load(Ordering::SeqCst) {
sp.snapshot(|sps| {
sps.send(create_format_msg(crate::platform::PA_SAMPLE_RATE, 2));
Ok(())
})?;
#[cfg(target_os = "linux")]
if let Ok(data) = stream.next_raw().await {
if data.len() == 0 {
send_f32(&zero_audio_frame, &mut encoder, &sp);
continue;
}
if data.len() != AUDIO_DATA_SIZE_U8 {
continue;
}
let data = unsafe {
std::slice::from_raw_parts::<f32>(data.as_ptr() as _, data.len() / 4)
};
send_f32(data, &mut encoder, &sp);
}
#[cfg(target_os = "android")]
if let Some(data) = scrap::android::ffi::get_audio_raw() {
let data = unsafe {
std::slice::from_raw_parts::<f32>(data.as_ptr() as _, data.len() / 4)
};
send_f32(data, &mut encoder, &sp);
} else {
hbb_common::sleep(0.1).await;
}
}
Ok(())
}
}
#[cfg(not(any(target_os = "linux", target_os = "android")))]
mod cpal_impl {
use super::*;
use cpal::{
traits::{DeviceTrait, HostTrait, StreamTrait},
Device, Host, SupportedStreamConfig,
};
lazy_static::lazy_static! {
static ref HOST: Host = cpal::default_host();
}
#[derive(Default)]
pub struct State {
stream: Option<(Box<dyn StreamTrait>, Arc<Message>)>,
}
impl super::service::Reset for State {
fn reset(&mut self) {
self.stream.take();
}
}
pub fn run(sp: GenericService, state: &mut State) -> ResultType<()> {
sp.snapshot(|sps| {
match &state.stream {
None => {
state.stream = Some(play(&sp)?);
}
_ => {}
}
if let Some((_, format)) = &state.stream {
sps.send_shared(format.clone());
}
Ok(())
})?;
Ok(())
}
fn send(
data: &[f32],
sample_rate0: u32,
sample_rate: u32,
channels: u16,
encoder: &mut Encoder,
sp: &GenericService,
) {
let buffer;
let data = if sample_rate0 != sample_rate {
buffer = crate::common::resample_channels(data, sample_rate0, sample_rate, channels);
&buffer
} else {
data
};
send_f32(data, encoder, sp);
}
#[cfg(windows)]
fn get_device() -> ResultType<(Device, SupportedStreamConfig)> {
let audio_input = Config::get_option("audio-input");
if !audio_input.is_empty() {
return get_audio_input(&audio_input);
}
let device = HOST
.default_output_device()
.with_context(|| "Failed to get default output device for loopback")?;
log::info!(
"Default output device: {}",
device.name().unwrap_or("".to_owned())
);
let format = device
.default_output_config()
.map_err(|e| anyhow!(e))
.with_context(|| "Failed to get default output format")?;
log::info!("Default output format: {:?}", format);
Ok((device, format))
}
#[cfg(not(windows))]
fn get_device() -> ResultType<(Device, SupportedStreamConfig)> {
let audio_input = Config::get_option("audio-input");
get_audio_input(&audio_input)
}
fn get_audio_input(audio_input: &str) -> ResultType<(Device, SupportedStreamConfig)> {
let mut device = None;
if !audio_input.is_empty() {
for d in HOST
.devices()
.with_context(|| "Failed to get audio devices")?
{
if d.name().unwrap_or("".to_owned()) == audio_input {
device = Some(d);
break;
}
}
}
if device.is_none() {
device = Some(
HOST.default_input_device()
.with_context(|| "Failed to get default input device for loopback")?,
);
}
let device = device.unwrap();
log::info!("Input device: {}", device.name().unwrap_or("".to_owned()));
let format = device
.default_input_config()
.map_err(|e| anyhow!(e))
.with_context(|| "Failed to get default input format")?;
log::info!("Default input format: {:?}", format);
Ok((device, format))
}
fn play(sp: &GenericService) -> ResultType<(Box<dyn StreamTrait>, Arc<Message>)> {
let (device, config) = get_device()?;
let sp = sp.clone();
let err_fn = move |err| {
// too many UnknownErrno, will improve later
log::trace!("an error occurred on stream: {}", err);
};
// Sample rate must be one of 8000, 12000, 16000, 24000, or 48000.
let sample_rate_0 = config.sample_rate().0;
let sample_rate = if sample_rate_0 < 12000 {
8000
} else if sample_rate_0 < 16000 {
12000
} else if sample_rate_0 < 24000 {
16000
} else if sample_rate_0 < 48000 {
24000
} else {
48000
};
log::debug!("Audio sample rate : {}", sample_rate);
unsafe {
AUDIO_ZERO_COUNT = 0;
}
let mut encoder = Encoder::new(
sample_rate,
if config.channels() > 1 { Stereo } else { Mono },
LowDelay,
)?;
let channels = config.channels();
let stream = match config.sample_format() {
cpal::SampleFormat::F32 => device.build_input_stream(
&config.into(),
move |data, _: &_| {
send(
data,
sample_rate_0,
sample_rate,
channels,
&mut encoder,
&sp,
);
},
err_fn,
)?,
cpal::SampleFormat::I16 => device.build_input_stream(
&config.into(),
move |data: &[i16], _: &_| {
let buffer: Vec<_> = data.iter().map(|s| cpal::Sample::to_f32(s)).collect();
send(
&buffer,
sample_rate_0,
sample_rate,
channels,
&mut encoder,
&sp,
);
},
err_fn,
)?,
cpal::SampleFormat::U16 => device.build_input_stream(
&config.into(),
move |data: &[u16], _: &_| {
let buffer: Vec<_> = data.iter().map(|s| cpal::Sample::to_f32(s)).collect();
send(
&buffer,
sample_rate_0,
sample_rate,
channels,
&mut encoder,
&sp,
);
},
err_fn,
)?,
};
stream.play()?;
Ok((
Box::new(stream),
Arc::new(create_format_msg(sample_rate, channels)),
))
}
}
fn create_format_msg(sample_rate: u32, channels: u16) -> Message {
let format = AudioFormat {
sample_rate,
channels: channels as _,
..Default::default()
};
let mut misc = Misc::new();
misc.set_audio_format(format);
let mut msg = Message::new();
msg.set_misc(misc);
msg
}
// use AUDIO_ZERO_COUNT for the Noise(Zero) Gate Attack Time
// every audio data length is set to 480
// MAX_AUDIO_ZERO_COUNT=800 is similar as Gate Attack Time 3~5s(Linux) || 6~8s(Windows)
const MAX_AUDIO_ZERO_COUNT: u16 = 800;
static mut AUDIO_ZERO_COUNT: u16 = 0;
fn send_f32(data: &[f32], encoder: &mut Encoder, sp: &GenericService) {
if data.iter().filter(|x| **x != 0.).next().is_some() {
unsafe {
AUDIO_ZERO_COUNT = 0;
}
} else {
unsafe {
if AUDIO_ZERO_COUNT > MAX_AUDIO_ZERO_COUNT {
if AUDIO_ZERO_COUNT == MAX_AUDIO_ZERO_COUNT + 1 {
log::debug!("Audio Zero Gate Attack");
AUDIO_ZERO_COUNT += 1;
}
return;
}
AUDIO_ZERO_COUNT += 1;
}
}
#[cfg(target_os = "android")]
{
// the permitted opus data size are 120, 240, 480, 960, 1920, and 2880
// if data size is bigger than BATCH_SIZE, AND is an integer multiple of BATCH_SIZE
// then upload in batches
const BATCH_SIZE: usize = 960;
let input_size = data.len();
if input_size > BATCH_SIZE && input_size % BATCH_SIZE == 0 {
let n = input_size / BATCH_SIZE;
for i in 0..n {
match encoder
.encode_vec_float(&data[i * BATCH_SIZE..(i + 1) * BATCH_SIZE], BATCH_SIZE)
{
Ok(data) => {
let mut msg_out = Message::new();
msg_out.set_audio_frame(AudioFrame {
data,
..Default::default()
});
sp.send(msg_out);
}
Err(_) => {}
}
}
} else {
log::debug!("invalid audio data size:{} ", input_size);
return;
}
}
#[cfg(not(target_os = "android"))]
match encoder.encode_vec_float(data, data.len() * 6) {
Ok(data) => {
let mut msg_out = Message::new();
msg_out.set_audio_frame(AudioFrame {
data,
..Default::default()
});
sp.send(msg_out);
}
Err(_) => {}
}
}
#[cfg(test)]
mod tests {
#[cfg(target_os = "linux")]
#[test]
fn test_pulse() {
use libpulse_binding as pulse;
use libpulse_simple_binding as psimple;
let spec = pulse::sample::Spec {
format: pulse::sample::SAMPLE_FLOAT32NE,
channels: 2,
rate: 24000,
};
let hspec = hound::WavSpec {
channels: spec.channels as _,
sample_rate: spec.rate as _,
bits_per_sample: (4 * 8) as _,
sample_format: hound::SampleFormat::Float,
};
const PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/recorded.wav");
let mut writer =
hound::WavWriter::create(PATH, hspec).expect("Could not create hsound writer");
let device = crate::platform::linux::get_pa_monitor();
let s = psimple::Simple::new(
None, // Use the default server
"Test", // Our applications name
pulse::stream::Direction::Record, // We want a record stream
Some(&device), // Use the default device
"Test", // Description of our stream
&spec, // Our sample format
None, // Use default channel map
None, // Use default buffering attributes
)
.expect("Could not create simple pulse");
let mut out: Vec<u8> = Vec::with_capacity(1024);
unsafe {
out.set_len(out.capacity());
}
for _ in 0..600 {
s.read(&mut out).expect("Could not read pcm");
let out2 =
unsafe { std::slice::from_raw_parts::<f32>(out.as_ptr() as _, out.len() / 4) };
for v in out2 {
writer.write_sample(*v).ok();
}
}
println!("{:?} {}", device, out.len());
writer.finalize().expect("Could not finalize writer");
}
}

View File

@@ -3,119 +3,49 @@ pub use crate::common::{
check_clipboard, ClipboardContext, CLIPBOARD_INTERVAL as INTERVAL, CLIPBOARD_NAME as NAME,
CONTENT,
};
use clipboard_master::{CallbackResult, ClipboardHandler, Master};
use hbb_common::{anyhow, ResultType};
use std::{
io, sync,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::SyncSender,
},
time::Duration,
};
struct State {
ctx: Option<ClipboardContext>,
}
impl Default for State {
fn default() -> Self {
let ctx = match ClipboardContext::new() {
Ok(ctx) => Some(ctx),
Err(err) => {
log::error!("Failed to start {}: {}", NAME, err);
None
}
};
Self { ctx }
}
}
impl super::service::Reset for State {
fn reset(&mut self) {
*CONTENT.lock().unwrap() = Default::default();
}
}
pub fn new() -> GenericService {
let sp = GenericService::new(NAME, true);
sp.run::<_>(listen::run);
sp.repeat::<State, _>(INTERVAL, run);
sp
}
mod listen {
use super::*;
static RUNNING: AtomicBool = AtomicBool::new(true);
static WAIT: Duration = Duration::from_millis(33);
struct ClipHandle {
tx: SyncSender<()>,
}
impl ClipboardHandler for ClipHandle {
fn on_clipboard_change(&mut self) -> CallbackResult {
if !RUNNING.load(Ordering::SeqCst) {
return CallbackResult::Stop;
}
let _ = self.tx.send(());
CallbackResult::Next
fn run(sp: GenericService, state: &mut State) -> ResultType<()> {
if let Some(ctx) = state.ctx.as_mut() {
if let Some(msg) = check_clipboard(ctx, None) {
sp.send(msg);
}
fn on_clipboard_error(&mut self, error: io::Error) -> CallbackResult {
if !RUNNING.load(Ordering::SeqCst) {
CallbackResult::Stop
} else {
CallbackResult::StopWithError(error)
sp.snapshot(|sps| {
let txt = crate::CONTENT.lock().unwrap().clone();
if !txt.is_empty() {
let msg_out = crate::create_clipboard_msg(txt);
sps.send_shared(Arc::new(msg_out));
}
}
}
#[tokio::main]
pub async fn run(sp: GenericService) -> ResultType<()> {
let mut ctx = match ClipboardContext::new() {
Ok(ctx) => ctx,
Err(err) => {
log::error!("Failed to start {}: {}", NAME, err);
return Err(anyhow::Error::from(err));
}
};
if !RUNNING.load(Ordering::SeqCst) {
RUNNING.store(true, Ordering::SeqCst);
}
let (tx, rx) = sync::mpsc::sync_channel(12);
let listener = tokio::spawn(async {
log::info!("Clipboard listener running!");
let _ = Master::new(ClipHandle { tx }).run();
});
check_clipboard(&mut ctx, None); // initialize CONTENT for snapshot
while sp.ok() {
let mut update = None;
sp.snapshot(|sps| {
if sps.has_subscribes() {
update = check_clipboard(&mut ctx, None);
}
// if there is update, msg will be later together,
// otherwise it will be only sent to new subscriber,
// but old subscribers ignored
if update.is_none() {
let txt = crate::CONTENT.lock().unwrap().clone();
if !txt.is_empty() {
let msg_out = crate::create_clipboard_msg(txt);
sps.send_shared(Arc::new(msg_out));
}
}
Ok(())
})?;
if let Some(msg) = update {
sp.send(msg);
}
if let Ok(_) = rx.recv_timeout(WAIT) {
if let Some(msg) = check_clipboard(&mut ctx, None) {
sp.send(msg);
}
}
}
RUNNING.store(false, Ordering::SeqCst);
trigger(&mut ctx);
let _ = listener.await;
log::info!("Clipboard listener stopped!");
*CONTENT.lock().unwrap() = Default::default();
Ok(())
}
fn trigger(ctx: &mut ClipboardContext) {
let mut old_text = "".to_owned();
let _ = match ctx.get_text() {
Ok(text) => {
old_text = text;
}
Err(_) => {}
};
ctx.set_text(old_text).ok();
Ok(())
})?;
}
Ok(())
}

View File

@@ -1,7 +1,11 @@
use super::{input_service::*, *};
#[cfg(windows)]
use crate::clipboard_file::*;
use crate::{common::update_clipboard, ipc};
#[cfg(not(any(target_os = "android", target_os = "ios")))]
use crate::common::update_clipboard;
use crate::ipc;
#[cfg(any(target_os = "android", target_os = "ios"))]
use crate::{common::MOBILE_INFO2, mobile::connection_manager::start_channel};
use hbb_common::{
config::Config,
fs,
@@ -15,14 +19,22 @@ use hbb_common::{
},
tokio_util::codec::{BytesCodec, Framed},
};
#[cfg(any(target_os = "android", target_os = "ios"))]
use scrap::android::call_input_service_mouse_input;
use serde_json::{json, value::Value};
use sha2::{Digest, Sha256};
use std::sync::mpsc as std_mpsc;
use std::sync::{
atomic::{AtomicI64, Ordering},
mpsc as std_mpsc,
};
pub type Sender = mpsc::UnboundedSender<(Instant, Arc<Message>)>;
lazy_static::lazy_static! {
static ref LOGIN_FAILURES: Arc::<Mutex<HashMap<String, (i32, i32, i32)>>> = Default::default();
}
pub static CLICK_TIME: AtomicI64 = AtomicI64::new(0);
pub static MOUSE_MOVE_TIME: AtomicI64 = AtomicI64::new(0);
#[derive(Clone, Default)]
pub struct ConnInner {
@@ -67,6 +79,8 @@ pub struct Connection {
enable_file_transfer: bool, // by peer
tx_input: std_mpsc::Sender<MessageInput>, // handle input messages
video_ack_required: bool,
peer_info: (String, String),
api_server: String,
}
impl Subscriber for ConnInner {
@@ -151,12 +165,18 @@ impl Connection {
disable_clipboard: false,
tx_input,
video_ack_required: false,
peer_info: Default::default(),
api_server: "".to_owned(),
};
#[cfg(not(any(target_os = "android", target_os = "ios")))]
tokio::spawn(async move {
if let Err(err) = start_ipc(rx_to_cm, tx_from_cm).await {
log::error!("ipc to connection manager exit: {}", err);
}
});
#[cfg(target_os = "android")]
start_channel(rx_to_cm, tx_from_cm);
if !conn.on_open(addr).await {
return;
}
@@ -184,6 +204,7 @@ impl Connection {
},
);
#[cfg(not(any(target_os = "android", target_os = "ios")))]
std::thread::spawn(move || Self::handle_input(rx_input, tx_cloned));
loop {
@@ -252,9 +273,9 @@ impl Connection {
ipc::Data::RawMessage(bytes) => {
allow_err!(conn.stream.send_raw(bytes).await);
}
#[cfg(windows)]
ipc::Data::ClipbaordFile(_clip) => {
if conn.file_transfer_enabled() {
#[cfg(windows)]
allow_err!(conn.stream.send(&clip_2_msg(_clip)).await);
}
}
@@ -291,6 +312,7 @@ impl Connection {
} else {
conn.timer = time::interval_at(Instant::now() + SEC30, SEC30);
}
conn.post_audit(json!({})); // heartbeat
},
Some((instant, value)) = rx_video.recv() => {
if !conn.video_ack_required {
@@ -345,9 +367,13 @@ impl Connection {
conn.on_close(&err.to_string(), false);
}
conn.post_audit(json!({
"action": "close",
}));
log::info!("#{} connection loop exited", id);
}
#[cfg(not(any(target_os = "android", target_os = "ios")))]
fn handle_input(receiver: std_mpsc::Receiver<MessageInput>, tx: Sender) {
let mut block_input_mode = false;
let (tx_blank, rx_blank) = std_mpsc::channel();
@@ -398,6 +424,7 @@ impl Connection {
}
},
Err(err) => {
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if block_input_mode {
let _ = crate::platform::block_input(true);
}
@@ -410,6 +437,7 @@ impl Connection {
log::info!("Input thread exited");
}
#[cfg(not(any(target_os = "android", target_os = "ios")))]
fn handle_blank(receiver: std_mpsc::Receiver<MessageInput>) {
let mut last_privacy = false;
loop {
@@ -443,7 +471,7 @@ impl Connection {
rx_from_cm: &mut mpsc::UnboundedReceiver<Data>,
) -> ResultType<()> {
let mut last_recv_time = Instant::now();
if let Some(forward) = self.port_forward_socket.as_mut() {
if let Some(mut forward) = self.port_forward_socket.take() {
log::info!("Running port forwarding loop");
self.stream.set_raw();
loop {
@@ -476,6 +504,7 @@ impl Connection {
if last_recv_time.elapsed() >= H1 {
bail!("Timeout");
}
self.post_audit(json!({})); // heartbeat
}
}
}
@@ -523,25 +552,78 @@ impl Connection {
let mut msg_out = Message::new();
msg_out.set_hash(self.hash.clone());
self.send(msg_out).await;
self.get_api_server();
self.post_audit(json!({
"ip": addr.ip(),
"action": "new",
}));
true
}
fn get_api_server(&mut self) {
self.api_server = crate::get_audit_server(
Config::get_option("api-server"),
Config::get_option("custom-rendezvous-server"),
);
}
fn post_audit(&self, v: Value) {
if self.api_server.is_empty() {
return;
}
let url = self.api_server.clone();
let mut v = v;
v["id"] = json!(Config::get_id());
v["uuid"] = json!(base64::encode(crate::get_uuid()));
v["Id"] = json!(self.inner.id);
tokio::spawn(async move {
allow_err!(Self::post_audit_async(url, v).await);
});
}
#[inline]
async fn post_audit_async(url: String, v: Value) -> ResultType<()> {
crate::post_request(url, v.to_string(), "").await?;
Ok(())
}
async fn send_logon_response(&mut self) {
if self.authorized {
return;
}
let conn_type = if self.file_transfer.is_some() {
1
} else if self.port_forward_socket.is_some() {
2
} else {
0
};
self.post_audit(json!({"peer": self.peer_info, "Type": conn_type}));
#[allow(unused_mut)]
let mut username = crate::platform::get_active_username();
let mut res = LoginResponse::new();
let mut pi = PeerInfo {
username: username.clone(),
conn_id: self.inner.id,
version: crate::VERSION.to_owned(),
..Default::default()
};
#[cfg(not(target_os = "android"))]
{
pi.hostname = whoami::hostname();
pi.platform = whoami::platform().to_string();
}
#[cfg(target_os = "android")]
{
pi.hostname = MOBILE_INFO2.lock().unwrap().clone();
pi.platform = "Android".into();
}
if self.port_forward_socket.is_some() {
let mut msg_out = Message::new();
res.set_peer_info(PeerInfo {
hostname: whoami::hostname(),
username,
platform: whoami::platform().to_string(),
version: crate::VERSION.to_owned(),
..Default::default()
});
res.set_peer_info(pi);
msg_out.set_login_response(res);
self.send(msg_out).await;
return;
@@ -566,20 +648,15 @@ impl Connection {
if crate::platform::is_root() {
sas_enabled = true;
}
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if self.file_transfer.is_some() {
if crate::platform::is_prelogin() || self.tx_to_cm.send(ipc::Data::Test).is_err() {
username = "".to_owned();
}
}
self.authorized = true;
let mut pi = PeerInfo {
hostname: whoami::hostname(),
username,
platform: whoami::platform().to_string(),
version: crate::VERSION.to_owned(),
sas_enabled,
..Default::default()
};
pi.username = username;
pi.sas_enabled = sas_enabled;
let mut sub_service = false;
if self.file_transfer.is_some() {
res.set_peer_info(pi);
@@ -641,7 +718,8 @@ impl Connection {
self.file && self.enable_file_transfer
}
async fn try_start_cm(&mut self, peer_id: String, name: String, authorized: bool) {
fn try_start_cm(&mut self, peer_id: String, name: String, authorized: bool) {
self.peer_info = (peer_id.clone(), name.clone());
self.send_to_cm(ipc::Data::Login {
id: self.inner.id(),
is_file_transfer: self.file_transfer.is_some(),
@@ -753,7 +831,7 @@ impl Connection {
if !crate::is_ip(&lr.username) && lr.username != Config::get_id() {
self.send_login_error("Offline").await;
} else if lr.password.is_empty() {
self.try_start_cm(lr.my_id, lr.my_name, false).await;
self.try_start_cm(lr.my_id, lr.my_name, false);
} else {
let mut hasher = Sha256::new();
hasher.update(&Config::get_password());
@@ -787,13 +865,13 @@ impl Connection {
.unwrap()
.insert(self.ip.clone(), failure);
self.send_login_error("Wrong Password").await;
self.try_start_cm(lr.my_id, lr.my_name, false).await;
self.try_start_cm(lr.my_id, lr.my_name, false);
} else {
if failure.0 != 0 {
LOGIN_FAILURES.lock().unwrap().remove(&self.ip);
}
self.try_start_cm(lr.my_id, lr.my_name, true);
self.send_logon_response().await;
self.try_start_cm(lr.my_id, lr.my_name, true).await;
if self.port_forward_socket.is_some() {
return false;
}
@@ -814,12 +892,26 @@ impl Connection {
} else if self.authorized {
match msg.union {
Some(message::Union::mouse_event(me)) => {
#[cfg(any(target_os = "android", target_os = "ios"))]
if let Err(e) = call_input_service_mouse_input(me.mask, me.x, me.y) {
log::debug!("call_input_service_mouse_input fail:{}", e);
}
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if self.keyboard {
if is_left_up(&me) {
CLICK_TIME.store(crate::get_time(), Ordering::SeqCst);
} else {
MOUSE_MOVE_TIME.store(crate::get_time(), Ordering::SeqCst);
}
self.input_mouse(me, self.inner.id());
}
}
Some(message::Union::key_event(me)) => {
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if self.keyboard {
if is_enter(&me) {
CLICK_TIME.store(crate::get_time(), Ordering::SeqCst);
}
// handle all down as press
// fix unexpected repeating key on remote linux, seems also fix abnormal alt/shift, which
// make sure all key are released
@@ -843,7 +935,9 @@ impl Connection {
}
}
}
Some(message::Union::clipboard(cb)) => {
Some(message::Union::clipboard(cb)) =>
{
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if self.clipboard {
update_clipboard(cb, None);
}
@@ -868,18 +962,21 @@ impl Connection {
self.send(fs::new_error(f.id, err, -1)).await;
}
Ok(files) => {
self.send(fs::new_dir(f.id, files)).await;
self.send(fs::new_dir(f.id, f.path, files)).await;
}
}
}
Some(file_action::Union::send(s)) => {
let id = s.id;
match fs::TransferJob::new_read(id, s.path, s.include_hidden) {
let path = s.path;
match fs::TransferJob::new_read(id, path.clone(), s.include_hidden)
{
Err(err) => {
self.send(fs::new_error(id, err, 0)).await;
}
Ok(job) => {
self.send(fs::new_dir(id, job.files().to_vec())).await;
self.send(fs::new_dir(id, path, job.files().to_vec()))
.await;
self.read_jobs.push(job);
self.timer = time::interval(MILLI1);
}
@@ -1020,7 +1117,9 @@ impl Connection {
if let Ok(q) = o.enable_file_transfer.enum_value() {
if q != BoolOption::NotSet {
self.enable_file_transfer = q == BoolOption::Yes;
self.send_to_cm(ipc::Data::ClipboardFileEnabled(self.file_transfer_enabled()));
self.send_to_cm(ipc::Data::ClipboardFileEnabled(
self.file_transfer_enabled(),
));
}
}
if let Ok(q) = o.disable_clipboard.enum_value() {
@@ -1071,6 +1170,7 @@ impl Connection {
}
log::info!("#{} Connection closed: {}", self.inner.id(), reason);
if lock && self.lock_after_session_end && self.keyboard {
#[cfg(not(any(target_os = "android", target_os = "ios")))]
lock_screen();
}
self.tx_to_cm.send(ipc::Data::Close).ok();
@@ -1091,6 +1191,7 @@ impl Connection {
}
}
#[cfg(not(any(target_os = "android", target_os = "ios")))]
async fn start_ipc(
mut rx_to_cm: mpsc::UnboundedReceiver<ipc::Data>,
tx_from_cm: mpsc::UnboundedSender<ipc::Data>,
@@ -1148,7 +1249,16 @@ async fn start_ipc(
return Err(err.into());
}
Ok(Some(data)) => {
tx_from_cm.send(data)?;
match data {
ipc::Data::ClickTime(_)=> {
let ct = CLICK_TIME.load(Ordering::SeqCst);
let data = ipc::Data::ClickTime(ct);
stream.send(&data).await?;
}
_ => {
tx_from_cm.send(data)?;
}
}
}
_ => {}
}

View File

@@ -1,275 +1,275 @@
use super::*;
use std::{
collections::HashSet,
thread::{self, JoinHandle},
time,
};
pub trait Service: Send + Sync {
fn name(&self) -> &'static str;
fn on_subscribe(&self, sub: ConnInner);
fn on_unsubscribe(&self, id: i32);
fn is_subed(&self, id: i32) -> bool;
fn join(&self);
}
pub trait Subscriber: Default + Send + Sync + 'static {
fn id(&self) -> i32;
fn send(&mut self, msg: Arc<Message>);
}
#[derive(Default)]
pub struct ServiceInner<T: Subscriber + From<ConnInner>> {
name: &'static str,
handle: Option<JoinHandle<()>>,
subscribes: HashMap<i32, T>,
new_subscribes: HashMap<i32, T>,
active: bool,
need_snapshot: bool,
}
pub trait Reset {
fn reset(&mut self);
}
pub struct ServiceTmpl<T: Subscriber + From<ConnInner>>(Arc<RwLock<ServiceInner<T>>>);
pub struct ServiceSwap<T: Subscriber + From<ConnInner>>(ServiceTmpl<T>);
pub type GenericService = ServiceTmpl<ConnInner>;
pub const HIBERNATE_TIMEOUT: u64 = 30;
pub const MAX_ERROR_TIMEOUT: u64 = 1_000;
impl<T: Subscriber + From<ConnInner>> ServiceInner<T> {
fn send_new_subscribes(&mut self, msg: Arc<Message>) {
for s in self.new_subscribes.values_mut() {
s.send(msg.clone());
}
}
fn swap_new_subscribes(&mut self) {
for (_, s) in self.new_subscribes.drain() {
self.subscribes.insert(s.id(), s);
}
assert!(self.new_subscribes.is_empty());
}
#[inline]
fn has_subscribes(&self) -> bool {
self.subscribes.len() > 0 || self.new_subscribes.len() > 0
}
}
impl<T: Subscriber + From<ConnInner>> Service for ServiceTmpl<T> {
#[inline]
fn name(&self) -> &'static str {
self.0.read().unwrap().name
}
fn is_subed(&self, id: i32) -> bool {
self.0.read().unwrap().subscribes.get(&id).is_some()
}
fn on_subscribe(&self, sub: ConnInner) {
let mut lock = self.0.write().unwrap();
if lock.subscribes.get(&sub.id()).is_some() {
return;
}
if lock.need_snapshot {
lock.new_subscribes.insert(sub.id(), sub.into());
} else {
lock.subscribes.insert(sub.id(), sub.into());
}
}
fn on_unsubscribe(&self, id: i32) {
let mut lock = self.0.write().unwrap();
if let None = lock.subscribes.remove(&id) {
lock.new_subscribes.remove(&id);
}
}
fn join(&self) {
self.0.write().unwrap().active = false;
self.0.write().unwrap().handle.take().map(JoinHandle::join);
}
}
impl<T: Subscriber + From<ConnInner>> Clone for ServiceTmpl<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T: Subscriber + From<ConnInner>> ServiceTmpl<T> {
pub fn new(name: &'static str, need_snapshot: bool) -> Self {
Self(Arc::new(RwLock::new(ServiceInner::<T> {
name,
active: true,
need_snapshot,
..Default::default()
})))
}
#[inline]
pub fn has_subscribes(&self) -> bool {
self.0.read().unwrap().has_subscribes()
}
#[inline]
pub fn ok(&self) -> bool {
let lock = self.0.read().unwrap();
lock.active && lock.has_subscribes()
}
pub fn snapshot<F>(&self, callback: F) -> ResultType<()>
where
F: FnMut(ServiceSwap<T>) -> ResultType<()>,
{
if self.0.read().unwrap().new_subscribes.len() > 0 {
log::info!("Call snapshot of {} service", self.name());
let mut callback = callback;
callback(ServiceSwap::<T>(self.clone()))?;
}
Ok(())
}
#[inline]
pub fn send(&self, msg: Message) {
self.send_shared(Arc::new(msg));
}
pub fn send_to(&self, msg: Message, id: i32) {
if let Some(s) = self.0.write().unwrap().subscribes.get_mut(&id) {
s.send(Arc::new(msg));
}
}
pub fn send_shared(&self, msg: Arc<Message>) {
let mut lock = self.0.write().unwrap();
for s in lock.subscribes.values_mut() {
s.send(msg.clone());
}
}
pub fn send_video_frame(&self, msg: Message) -> HashSet<i32> {
self.send_video_frame_shared(Arc::new(msg))
}
pub fn send_video_frame_shared(&self, msg: Arc<Message>) -> HashSet<i32> {
let mut conn_ids = HashSet::new();
let mut lock = self.0.write().unwrap();
for s in lock.subscribes.values_mut() {
s.send(msg.clone());
conn_ids.insert(s.id());
}
conn_ids
}
pub fn send_without(&self, msg: Message, sub: i32) {
let mut lock = self.0.write().unwrap();
let msg = Arc::new(msg);
for s in lock.subscribes.values_mut() {
if sub != s.id() {
s.send(msg.clone());
}
}
}
pub fn repeat<S, F>(&self, interval_ms: u64, callback: F)
where
F: 'static + FnMut(Self, &mut S) -> ResultType<()> + Send,
S: 'static + Default + Reset,
{
let interval = time::Duration::from_millis(interval_ms);
let mut callback = callback;
let sp = self.clone();
let thread = thread::spawn(move || {
let mut state = S::default();
let mut may_reset = false;
while sp.active() {
let now = time::Instant::now();
if sp.has_subscribes() {
if let Err(err) = callback(sp.clone(), &mut state) {
log::error!("Error of {} service: {}", sp.name(), err);
thread::sleep(time::Duration::from_millis(MAX_ERROR_TIMEOUT));
#[cfg(windows)]
crate::platform::windows::try_change_desktop();
}
if !may_reset {
may_reset = true;
}
} else if may_reset {
state.reset();
may_reset = false;
}
let elapsed = now.elapsed();
if elapsed < interval {
thread::sleep(interval - elapsed);
}
}
});
self.0.write().unwrap().handle = Some(thread);
}
pub fn run<F>(&self, callback: F)
where
F: 'static + FnMut(Self) -> ResultType<()> + Send,
{
let sp = self.clone();
let mut callback = callback;
let thread = thread::spawn(move || {
let mut error_timeout = HIBERNATE_TIMEOUT;
while sp.active() {
if sp.has_subscribes() {
log::debug!("Enter {} service inner loop", sp.name());
let tm = time::Instant::now();
if let Err(err) = callback(sp.clone()) {
log::error!("Error of {} service: {}", sp.name(), err);
if tm.elapsed() > time::Duration::from_millis(MAX_ERROR_TIMEOUT) {
error_timeout = HIBERNATE_TIMEOUT;
} else {
error_timeout *= 2;
}
if error_timeout > MAX_ERROR_TIMEOUT {
error_timeout = MAX_ERROR_TIMEOUT;
}
thread::sleep(time::Duration::from_millis(error_timeout));
#[cfg(windows)]
crate::platform::windows::try_change_desktop();
} else {
log::debug!("Exit {} service inner loop", sp.name());
}
}
thread::sleep(time::Duration::from_millis(HIBERNATE_TIMEOUT));
}
});
self.0.write().unwrap().handle = Some(thread);
}
#[inline]
pub fn active(&self) -> bool {
self.0.read().unwrap().active
}
}
impl<T: Subscriber + From<ConnInner>> ServiceSwap<T> {
#[inline]
pub fn send(&self, msg: Message) {
self.send_shared(Arc::new(msg));
}
#[inline]
pub fn send_shared(&self, msg: Arc<Message>) {
(self.0).0.write().unwrap().send_new_subscribes(msg);
}
#[inline]
pub fn has_subscribes(&self) -> bool {
(self.0).0.read().unwrap().subscribes.len() > 0
}
}
impl<T: Subscriber + From<ConnInner>> Drop for ServiceSwap<T> {
fn drop(&mut self) {
(self.0).0.write().unwrap().swap_new_subscribes();
}
}
use super::*;
use std::{
collections::HashSet,
thread::{self, JoinHandle},
time,
};
pub trait Service: Send + Sync {
fn name(&self) -> &'static str;
fn on_subscribe(&self, sub: ConnInner);
fn on_unsubscribe(&self, id: i32);
fn is_subed(&self, id: i32) -> bool;
fn join(&self);
}
pub trait Subscriber: Default + Send + Sync + 'static {
fn id(&self) -> i32;
fn send(&mut self, msg: Arc<Message>);
}
#[derive(Default)]
pub struct ServiceInner<T: Subscriber + From<ConnInner>> {
name: &'static str,
handle: Option<JoinHandle<()>>,
subscribes: HashMap<i32, T>,
new_subscribes: HashMap<i32, T>,
active: bool,
need_snapshot: bool,
}
pub trait Reset {
fn reset(&mut self);
}
pub struct ServiceTmpl<T: Subscriber + From<ConnInner>>(Arc<RwLock<ServiceInner<T>>>);
pub struct ServiceSwap<T: Subscriber + From<ConnInner>>(ServiceTmpl<T>);
pub type GenericService = ServiceTmpl<ConnInner>;
pub const HIBERNATE_TIMEOUT: u64 = 30;
pub const MAX_ERROR_TIMEOUT: u64 = 1_000;
impl<T: Subscriber + From<ConnInner>> ServiceInner<T> {
fn send_new_subscribes(&mut self, msg: Arc<Message>) {
for s in self.new_subscribes.values_mut() {
s.send(msg.clone());
}
}
fn swap_new_subscribes(&mut self) {
for (_, s) in self.new_subscribes.drain() {
self.subscribes.insert(s.id(), s);
}
assert!(self.new_subscribes.is_empty());
}
#[inline]
fn has_subscribes(&self) -> bool {
self.subscribes.len() > 0 || self.new_subscribes.len() > 0
}
}
impl<T: Subscriber + From<ConnInner>> Service for ServiceTmpl<T> {
#[inline]
fn name(&self) -> &'static str {
self.0.read().unwrap().name
}
fn is_subed(&self, id: i32) -> bool {
self.0.read().unwrap().subscribes.get(&id).is_some()
}
fn on_subscribe(&self, sub: ConnInner) {
let mut lock = self.0.write().unwrap();
if lock.subscribes.get(&sub.id()).is_some() {
return;
}
if lock.need_snapshot {
lock.new_subscribes.insert(sub.id(), sub.into());
} else {
lock.subscribes.insert(sub.id(), sub.into());
}
}
fn on_unsubscribe(&self, id: i32) {
let mut lock = self.0.write().unwrap();
if let None = lock.subscribes.remove(&id) {
lock.new_subscribes.remove(&id);
}
}
fn join(&self) {
self.0.write().unwrap().active = false;
self.0.write().unwrap().handle.take().map(JoinHandle::join);
}
}
impl<T: Subscriber + From<ConnInner>> Clone for ServiceTmpl<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T: Subscriber + From<ConnInner>> ServiceTmpl<T> {
pub fn new(name: &'static str, need_snapshot: bool) -> Self {
Self(Arc::new(RwLock::new(ServiceInner::<T> {
name,
active: true,
need_snapshot,
..Default::default()
})))
}
#[inline]
pub fn has_subscribes(&self) -> bool {
self.0.read().unwrap().has_subscribes()
}
#[inline]
pub fn ok(&self) -> bool {
let lock = self.0.read().unwrap();
lock.active && lock.has_subscribes()
}
pub fn snapshot<F>(&self, callback: F) -> ResultType<()>
where
F: FnMut(ServiceSwap<T>) -> ResultType<()>,
{
if self.0.read().unwrap().new_subscribes.len() > 0 {
log::info!("Call snapshot of {} service", self.name());
let mut callback = callback;
callback(ServiceSwap::<T>(self.clone()))?;
}
Ok(())
}
#[inline]
pub fn send(&self, msg: Message) {
self.send_shared(Arc::new(msg));
}
pub fn send_to(&self, msg: Message, id: i32) {
if let Some(s) = self.0.write().unwrap().subscribes.get_mut(&id) {
s.send(Arc::new(msg));
}
}
pub fn send_shared(&self, msg: Arc<Message>) {
let mut lock = self.0.write().unwrap();
for s in lock.subscribes.values_mut() {
s.send(msg.clone());
}
}
pub fn send_video_frame(&self, msg: Message) -> HashSet<i32> {
self.send_video_frame_shared(Arc::new(msg))
}
pub fn send_video_frame_shared(&self, msg: Arc<Message>) -> HashSet<i32> {
let mut conn_ids = HashSet::new();
let mut lock = self.0.write().unwrap();
for s in lock.subscribes.values_mut() {
s.send(msg.clone());
conn_ids.insert(s.id());
}
conn_ids
}
pub fn send_without(&self, msg: Message, sub: i32) {
let mut lock = self.0.write().unwrap();
let msg = Arc::new(msg);
for s in lock.subscribes.values_mut() {
if sub != s.id() {
s.send(msg.clone());
}
}
}
pub fn repeat<S, F>(&self, interval_ms: u64, callback: F)
where
F: 'static + FnMut(Self, &mut S) -> ResultType<()> + Send,
S: 'static + Default + Reset,
{
let interval = time::Duration::from_millis(interval_ms);
let mut callback = callback;
let sp = self.clone();
let thread = thread::spawn(move || {
let mut state = S::default();
let mut may_reset = false;
while sp.active() {
let now = time::Instant::now();
if sp.has_subscribes() {
if let Err(err) = callback(sp.clone(), &mut state) {
log::error!("Error of {} service: {}", sp.name(), err);
thread::sleep(time::Duration::from_millis(MAX_ERROR_TIMEOUT));
#[cfg(windows)]
crate::platform::windows::try_change_desktop();
}
if !may_reset {
may_reset = true;
}
} else if may_reset {
state.reset();
may_reset = false;
}
let elapsed = now.elapsed();
if elapsed < interval {
thread::sleep(interval - elapsed);
}
}
});
self.0.write().unwrap().handle = Some(thread);
}
pub fn run<F>(&self, callback: F)
where
F: 'static + FnMut(Self) -> ResultType<()> + Send,
{
let sp = self.clone();
let mut callback = callback;
let thread = thread::spawn(move || {
let mut error_timeout = HIBERNATE_TIMEOUT;
while sp.active() {
if sp.has_subscribes() {
log::debug!("Enter {} service inner loop", sp.name());
let tm = time::Instant::now();
if let Err(err) = callback(sp.clone()) {
log::error!("Error of {} service: {}", sp.name(), err);
if tm.elapsed() > time::Duration::from_millis(MAX_ERROR_TIMEOUT) {
error_timeout = HIBERNATE_TIMEOUT;
} else {
error_timeout *= 2;
}
if error_timeout > MAX_ERROR_TIMEOUT {
error_timeout = MAX_ERROR_TIMEOUT;
}
thread::sleep(time::Duration::from_millis(error_timeout));
#[cfg(windows)]
crate::platform::windows::try_change_desktop();
} else {
log::debug!("Exit {} service inner loop", sp.name());
}
}
thread::sleep(time::Duration::from_millis(HIBERNATE_TIMEOUT));
}
});
self.0.write().unwrap().handle = Some(thread);
}
#[inline]
pub fn active(&self) -> bool {
self.0.read().unwrap().active
}
}
impl<T: Subscriber + From<ConnInner>> ServiceSwap<T> {
#[inline]
pub fn send(&self, msg: Message) {
self.send_shared(Arc::new(msg));
}
#[inline]
pub fn send_shared(&self, msg: Arc<Message>) {
(self.0).0.write().unwrap().send_new_subscribes(msg);
}
#[inline]
pub fn has_subscribes(&self) -> bool {
(self.0).0.read().unwrap().subscribes.len() > 0
}
}
impl<T: Subscriber + From<ConnInner>> Drop for ServiceSwap<T> {
fn drop(&mut self) {
(self.0).0.write().unwrap().swap_new_subscribes();
}
}

View File

@@ -32,7 +32,6 @@ use std::{
io::ErrorKind::WouldBlock,
time::{self, Duration, Instant},
};
use virtual_display;
pub const NAME: &'static str = "video";
@@ -133,7 +132,7 @@ fn check_display_changed(
last_width: usize,
last_hegiht: usize,
) -> bool {
let displays = match try_get_displays() {
let displays = match Display::all() {
Ok(d) => d,
_ => return false,
};
@@ -158,30 +157,20 @@ fn check_display_changed(
}
fn run(sp: GenericService) -> ResultType<()> {
let num_displays = Display::all()?.len();
if num_displays == 0 {
// Device may sometimes be uninstalled by user in "Device Manager" Window.
// Closing device will clear the instance data.
virtual_display::close_device();
} else if num_displays > 1 {
// Try close device, if display device changed.
if virtual_display::is_device_created() {
virtual_display::close_device();
}
}
let fps = 30;
let wait = 1000 / fps;
let spf = time::Duration::from_secs_f32(1. / (fps as f32));
let (ndisplay, current, display) = get_current_display()?;
let (origin, width, height) = (display.origin(), display.width(), display.height());
log::debug!(
"#displays={}, current={}, origin: {:?}, width={}, height={}",
"#displays={}, current={}, origin: {:?}, width={}, height={}, cpus={}/{}",
ndisplay,
current,
&origin,
width,
height
height,
num_cpus::get_physical(),
num_cpus::get(),
);
// Capturer object is expensive, avoiding to create it frequently.
let mut c = Capturer::new(display, true).with_context(|| "Failed to create capturer")?;
@@ -260,7 +249,31 @@ fn run(sp: GenericService) -> ResultType<()> {
frame_controller.reset();
match c.frame(wait as _) {
#[cfg(any(target_os = "android", target_os = "ios"))]
let res = match c.frame(wait as _) {
Ok(frame) => {
let time = now - start;
let ms = (time.as_secs() * 1000 + time.subsec_millis() as u64) as i64;
match frame {
scrap::Frame::VP9(data) => {
let send_conn_ids = handle_one_frame_encoded(&sp, data, ms)?;
frame_controller.set_send(now, send_conn_ids);
}
scrap::Frame::RAW(data) => {
if (data.len() != 0) {
let send_conn_ids = handle_one_frame(&sp, data, ms, &mut vpx)?;
frame_controller.set_send(now, send_conn_ids);
}
}
_ => {}
};
Ok(())
}
Err(err) => Err(err),
};
#[cfg(not(any(target_os = "android", target_os = "ios")))]
let res = match c.frame(wait as _) {
Ok(frame) => {
let time = now - start;
let ms = (time.as_secs() * 1000 + time.subsec_millis() as u64) as i64;
@@ -270,8 +283,14 @@ fn run(sp: GenericService) -> ResultType<()> {
{
try_gdi = 0;
}
Ok(())
}
Err(ref e) if e.kind() == WouldBlock => {
Err(err) => Err(err),
};
match res {
Err(ref e) if e.kind() == WouldBlock =>
{
#[cfg(windows)]
if try_gdi > 0 && !c.is_gdi() {
if try_gdi > 3 {
@@ -298,6 +317,7 @@ fn run(sp: GenericService) -> ResultType<()> {
return Err(err.into());
}
_ => {}
}
// i love 3, 6, 8
@@ -310,7 +330,6 @@ fn run(sp: GenericService) -> ResultType<()> {
std::thread::sleep(spf - elapsed);
}
}
Ok(())
}
@@ -370,8 +389,33 @@ fn handle_one_frame(
Ok(send_conn_ids)
}
#[inline]
#[cfg(any(target_os = "android", target_os = "ios"))]
pub fn handle_one_frame_encoded(
sp: &GenericService,
frame: &[u8],
ms: i64,
) -> ResultType<HashSet<i32>> {
sp.snapshot(|sps| {
// so that new sub and old sub share the same encoder after switch
if sps.has_subscribes() {
bail!("SWITCH");
}
Ok(())
})?;
let mut send_conn_ids: HashSet<i32> = Default::default();
let vp9_frame = VP9 {
data: frame.to_vec(),
key: true,
pts: ms,
..Default::default()
};
send_conn_ids = sp.send_video_frame(create_msg(vec![vp9_frame]));
Ok(send_conn_ids)
}
fn get_display_num() -> usize {
if let Ok(d) = try_get_displays() {
if let Ok(d) = Display::all() {
d.len()
} else {
0
@@ -385,7 +429,7 @@ pub fn get_displays() -> ResultType<(usize, Vec<DisplayInfo>)> {
}
let mut displays = Vec::new();
let mut primary = 0;
for (i, d) in try_get_displays()?.iter().enumerate() {
for (i, d) in Display::all()?.iter().enumerate() {
if d.is_primary() {
primary = i;
}
@@ -416,11 +460,13 @@ pub fn switch_display(i: i32) {
}
pub fn refresh() {
#[cfg(target_os = "android")]
Display::refresh_size();
*SWITCH.lock().unwrap() = true;
}
fn get_primary() -> usize {
if let Ok(all) = try_get_displays() {
if let Ok(all) = Display::all() {
for (i, d) in all.iter().enumerate() {
if d.is_primary() {
return i;
@@ -434,42 +480,12 @@ pub fn switch_to_primary() {
switch_display(get_primary() as _);
}
fn try_get_displays() -> ResultType<Vec<Display>> {
let mut displays = Display::all()?;
if displays.len() == 0 {
log::debug!("no displays, create virtual display");
// Try plugin monitor
if !virtual_display::is_device_created() {
if let Err(e) = virtual_display::create_device() {
log::debug!("Create device failed {}", e);
}
}
if virtual_display::is_device_created() {
if let Err(e) = virtual_display::plug_in_monitor() {
log::debug!("Plug in monitor failed {}", e);
} else {
if let Err(e) = virtual_display::update_monitor_modes() {
log::debug!("Update monitor modes failed {}", e);
}
}
}
displays = Display::all()?;
} else if displays.len() > 1 {
// If more than one displays exists, close RustDeskVirtualDisplay
if virtual_display::is_device_created() {
virtual_display::close_device()
}
}
Ok(displays)
}
fn get_current_display() -> ResultType<(usize, usize, Display)> {
let mut current = *CURRENT_DISPLAY.lock().unwrap() as usize;
let mut displays = try_get_displays()?;
let mut displays = Display::all()?;
if displays.len() == 0 {
bail!("No displays");
}
let n = displays.len();
if current >= n {
current = 0;