diff --git a/Cargo.lock b/Cargo.lock index 50aedf974..1acf25279 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3899,7 +3899,7 @@ dependencies = [ [[package]] name = "rustdesk" -version = "1.1.9" +version = "1.2.0" dependencies = [ "android_logger 0.11.0", "arboard", diff --git a/Cargo.toml b/Cargo.toml index 334d67e9d..019f2b6db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rustdesk" -version = "1.1.9" +version = "1.2.0" authors = ["rustdesk "] edition = "2021" build= "build.rs" @@ -9,7 +9,7 @@ default-run = "rustdesk" [lib] name = "librustdesk" -crate-type = ["cdylib", "staticlib", "rlib"] +crate-type = ["cdylib", "staticlib", "rlib"] [[bin]] name = "naming" diff --git a/libs/hbb_common/protos/message.proto b/libs/hbb_common/protos/message.proto index 9fba78a92..a8edb9563 100644 --- a/libs/hbb_common/protos/message.proto +++ b/libs/hbb_common/protos/message.proto @@ -251,6 +251,7 @@ message FileAction { FileRemoveFile remove_file = 6; ReadAllFiles all_files = 7; FileTransferCancel cancel = 8; + FileTransferSendConfirmRequest send_confirm = 9; } } @@ -262,14 +263,24 @@ message FileResponse { FileTransferBlock block = 2; FileTransferError error = 3; FileTransferDone done = 4; + FileTransferDigest digest = 5; } } +message FileTransferDigest { + int32 id = 1; + sint32 file_num = 2; + uint64 last_modified = 3; + uint64 file_size = 4; + bool is_upload = 5; +} + message FileTransferBlock { int32 id = 1; sint32 file_num = 2; bytes data = 3; bool compressed = 4; + uint32 blk_id = 5; } message FileTransferError { @@ -282,6 +293,16 @@ message FileTransferSendRequest { int32 id = 1; string path = 2; bool include_hidden = 3; + int32 file_num = 4; +} + +message FileTransferSendConfirmRequest { + int32 id = 1; + sint32 file_num = 2; + oneof union { + bool skip = 3; + uint32 offset_blk = 4; + } } message FileTransferDone { @@ -293,6 +314,7 @@ message FileTransferReceiveRequest { int32 id = 1; string path = 2; // path written to repeated FileEntry files = 3; + int32 file_num = 4; } message FileRemoveDir { diff --git a/libs/hbb_common/src/config.rs b/libs/hbb_common/src/config.rs index a7c1bc634..ce0fc509a 100644 --- a/libs/hbb_common/src/config.rs +++ b/libs/hbb_common/src/config.rs @@ -145,6 +145,8 @@ pub struct PeerConfig { pub options: HashMap, #[serde(default)] pub info: PeerInfoSerde, + #[serde(default)] + pub transfer: TransferSerde, } #[derive(Debug, PartialEq, Default, Serialize, Deserialize, Clone)] @@ -157,6 +159,14 @@ pub struct PeerInfoSerde { pub platform: String, } +#[derive(Debug, Default, Serialize, Deserialize, Clone)] +pub struct TransferSerde { + #[serde(default)] + pub write_jobs: Vec, + #[serde(default)] + pub read_jobs: Vec, +} + fn patch(path: PathBuf) -> PathBuf { if let Some(_tmp) = path.to_str() { #[cfg(windows)] @@ -864,6 +874,7 @@ impl LanPeers { #[cfg(test)] mod tests { use super::*; + #[test] fn test_serialize() { let cfg: Config = Default::default(); diff --git a/libs/hbb_common/src/fs.rs b/libs/hbb_common/src/fs.rs index 475f4dfc6..4ba132f57 100644 --- a/libs/hbb_common/src/fs.rs +++ b/libs/hbb_common/src/fs.rs @@ -1,13 +1,17 @@ -use crate::{bail, message_proto::*, ResultType}; +#[cfg(windows)] +use std::os::windows::prelude::*; use std::path::{Path, PathBuf}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use serde_derive::{Deserialize, Serialize}; +use tokio::{fs::File, io::*}; + +use crate::{bail, get_version_number, message_proto::*, ResultType, Stream}; // https://doc.rust-lang.org/std/os/windows/fs/trait.MetadataExt.html use crate::{ compress::{compress, decompress}, config::{Config, COMPRESS_LEVEL}, }; -#[cfg(windows)] -use std::os::windows::prelude::*; -use tokio::{fs::File, io::*}; pub fn read_dir(path: &PathBuf, include_hidden: bool) -> ResultType { let mut dir = FileDirectory { @@ -184,16 +188,61 @@ pub fn get_recursive_files(path: &str, include_hidden: bool) -> ResultType bool { + return Path::new(file_path).exists(); +} + +#[inline] +pub fn can_enable_overwrite_detection(version: i64) -> bool { + version >= get_version_number("1.2.0") +} + #[derive(Default)] pub struct TransferJob { - id: i32, - path: PathBuf, - files: Vec, - file_num: i32, + pub id: i32, + pub remote: String, + pub path: PathBuf, + pub show_hidden: bool, + pub is_remote: bool, + pub is_last_job: bool, + pub file_num: i32, + pub files: Vec, + file: Option, total_size: u64, finished_size: u64, transferred: u64, + enable_overwrite_detection: bool, + file_confirmed: bool, + file_is_waiting: bool, + default_overwrite_strategy: Option, +} + +#[derive(Debug, Default, Serialize, Deserialize, Clone)] +pub struct TransferJobMeta { + #[serde(default)] + pub id: i32, + #[serde(default)] + pub remote: String, + #[serde(default)] + pub to: String, + #[serde(default)] + pub show_hidden: bool, + #[serde(default)] + pub file_num: i32, + #[serde(default)] + pub is_remote: bool, +} + +#[derive(Debug, Default, Serialize, Deserialize, Clone)] +pub struct RemoveJobMeta { + #[serde(default)] + pub path: String, + #[serde(default)] + pub is_remote: bool, + #[serde(default)] + pub no_confirm: bool, } #[inline] @@ -219,25 +268,54 @@ fn is_compressed_file(name: &str) -> bool { } impl TransferJob { - pub fn new_write(id: i32, path: String, files: Vec) -> Self { + pub fn new_write( + id: i32, + remote: String, + path: String, + file_num: i32, + show_hidden: bool, + is_remote: bool, + files: Vec, + enable_override_detection: bool, + ) -> Self { + log::info!("new write {}", path); let total_size = files.iter().map(|x| x.size as u64).sum(); Self { id, + remote, path: get_path(&path), + file_num, + show_hidden, + is_remote, files, total_size, + enable_overwrite_detection: enable_override_detection, ..Default::default() } } - pub fn new_read(id: i32, path: String, include_hidden: bool) -> ResultType { - let files = get_recursive_files(&path, include_hidden)?; + pub fn new_read( + id: i32, + remote: String, + path: String, + file_num: i32, + show_hidden: bool, + is_remote: bool, + enable_override_detection: bool, + ) -> ResultType { + log::info!("new read {}", path); + let files = get_recursive_files(&path, show_hidden)?; let total_size = files.iter().map(|x| x.size as u64).sum(); Ok(Self { id, + remote, path: get_path(&path), + file_num, + show_hidden, + is_remote, files, total_size, + enable_overwrite_detection: enable_override_detection, ..Default::default() }) } @@ -342,7 +420,7 @@ impl TransferJob { } #[inline] - fn join(&self, name: &str) -> PathBuf { + pub fn join(&self, name: &str) -> PathBuf { if name.is_empty() { self.path.clone() } else { @@ -350,7 +428,7 @@ impl TransferJob { } } - pub async fn read(&mut self) -> ResultType> { + pub async fn read(&mut self, stream: &mut Stream) -> ResultType> { let file_num = self.file_num as usize; if file_num >= self.files.len() { self.file.take(); @@ -361,13 +439,26 @@ impl TransferJob { match File::open(self.join(&name)).await { Ok(file) => { self.file = Some(file); + self.file_confirmed = false; + self.file_is_waiting = false; } Err(err) => { self.file_num += 1; + self.file_confirmed = false; + self.file_is_waiting = false; return Err(err.into()); } } } + if self.enable_overwrite_detection { + if !self.file_confirmed() { + if !self.file_is_waiting() { + self.send_current_digest(stream).await?; + self.set_file_is_waiting(true); + } + return Ok(None); + } + } const BUF_SIZE: usize = 128 * 1024; let mut buf: Vec = Vec::with_capacity(BUF_SIZE); unsafe { @@ -380,6 +471,8 @@ impl TransferJob { Err(err) => { self.file_num += 1; self.file = None; + self.file_confirmed = false; + self.file_is_waiting = false; return Err(err.into()); } Ok(n) => { @@ -394,6 +487,8 @@ impl TransferJob { if offset == 0 { self.file_num += 1; self.file = None; + self.file_confirmed = false; + self.file_is_waiting = false; } else { self.finished_size += offset as u64; if !is_compressed_file(name) { @@ -413,6 +508,99 @@ impl TransferJob { ..Default::default() })) } + + async fn send_current_digest(&mut self, stream: &mut Stream) -> ResultType<()> { + let mut msg = Message::new(); + let mut resp = FileResponse::new(); + let meta = self.file.as_ref().unwrap().metadata().await?; + let last_modified = meta + .modified()? + .duration_since(SystemTime::UNIX_EPOCH)? + .as_secs(); + resp.set_digest(FileTransferDigest { + id: self.id, + file_num: self.file_num, + last_modified, + file_size: meta.len(), + ..Default::default() + }); + msg.set_file_response(resp); + stream.send(&msg).await?; + log::info!( + "id: {}, file_num:{}, digest message is sent. waiting for confirm. msg: {:?}", + self.id, + self.file_num, + msg + ); + Ok(()) + } + + pub fn set_overwrite_strategy(&mut self, overwrite_strategy: Option) { + self.default_overwrite_strategy = overwrite_strategy; + } + + pub fn default_overwrite_strategy(&self) -> Option { + self.default_overwrite_strategy + } + + pub fn set_file_confirmed(&mut self, file_confirmed: bool) { + self.file_confirmed = file_confirmed; + } + + pub fn set_file_is_waiting(&mut self, file_is_waiting: bool) { + self.file_is_waiting = file_is_waiting; + } + + pub fn file_is_waiting(&self) -> bool { + self.file_is_waiting + } + + pub fn file_confirmed(&self) -> bool { + self.file_confirmed + } + + pub fn skip_current_file(&mut self) -> bool { + self.file.take(); + self.set_file_confirmed(false); + self.set_file_is_waiting(false); + self.file_num += 1; + true + } + + pub fn confirm(&mut self, r: &FileTransferSendConfirmRequest) -> bool { + if self.file_num() != r.file_num { + log::info!("file num truncated, ignoring"); + } else { + match r.union { + Some(file_transfer_send_confirm_request::Union::skip(s)) => { + if s { + log::debug!("skip file id:{}, file_num:{}", r.id, r.file_num); + self.skip_current_file(); + } else { + self.set_file_confirmed(true); + } + } + Some(file_transfer_send_confirm_request::Union::offset_blk(offset)) => { + log::debug!("file confirmed"); + self.set_file_confirmed(true); + } + _ => {} + } + } + true + } + + #[inline] + pub fn gen_meta(&self) -> TransferJobMeta { + TransferJobMeta { + id: self.id, + remote: self.remote.to_string(), + to: self.path.to_string_lossy().to_string(), + file_num: self.file_num, + show_hidden: self.show_hidden, + is_remote: self.is_remote, + } + } } #[inline] @@ -453,12 +641,22 @@ pub fn new_block(block: FileTransferBlock) -> Message { } #[inline] -pub fn new_receive(id: i32, path: String, files: Vec) -> Message { +pub fn new_send_confirm(r: FileTransferSendConfirmRequest) -> Message { + let mut msg_out = Message::new(); + let mut action = FileAction::new(); + action.set_send_confirm(r); + msg_out.set_file_action(action); + msg_out +} + +#[inline] +pub fn new_receive(id: i32, path: String, file_num: i32, files: Vec) -> Message { let mut action = FileAction::new(); action.set_receive(FileTransferReceiveRequest { id, path, files: files.into(), + file_num, ..Default::default() }); let mut msg_out = Message::new(); @@ -467,12 +665,14 @@ pub fn new_receive(id: i32, path: String, files: Vec) -> Message { } #[inline] -pub fn new_send(id: i32, path: String, include_hidden: bool) -> Message { +pub fn new_send(id: i32, path: String, file_num: i32, include_hidden: bool) -> Message { + log::info!("new send: {},id : {}", path, id); let mut action = FileAction::new(); action.set_send(FileTransferSendRequest { id, path, include_hidden, + file_num, ..Default::default() }); let mut msg_out = Message::new(); @@ -509,7 +709,10 @@ pub async fn handle_read_jobs( ) -> ResultType<()> { let mut finished = Vec::new(); for job in jobs.iter_mut() { - match job.read().await { + if job.is_last_job { + continue; + } + match job.read(stream).await { Err(err) => { stream .send(&new_error(job.id(), err, job.file_num())) @@ -519,8 +722,13 @@ pub async fn handle_read_jobs( stream.send(&new_block(block)).await?; } Ok(None) => { - finished.push(job.id()); - stream.send(&new_done(job.id(), job.file_num())).await?; + if !job.enable_overwrite_detection || (!job.file_confirmed && !job.file_is_waiting) + { + finished.push(job.id()); + stream.send(&new_done(job.id(), job.file_num())).await?; + } else { + // waiting confirmation. + } } } } @@ -558,3 +766,35 @@ pub fn create_dir(dir: &str) -> ResultType<()> { std::fs::create_dir_all(get_path(dir))?; Ok(()) } + +pub enum DigestCheckResult { + IsSame, + NeedConfirm(FileTransferDigest), + NoSuchFile, +} + +#[inline] +pub fn is_write_need_confirmation( + file_path: &str, + digest: &FileTransferDigest, +) -> ResultType { + let path = Path::new(file_path); + if path.exists() && path.is_file() { + let metadata = std::fs::metadata(path)?; + let modified_time = metadata.modified()?; + let remote_mt = Duration::from_secs(digest.last_modified); + let local_mt = modified_time.duration_since(UNIX_EPOCH)?; + if remote_mt == local_mt && digest.file_size == metadata.len() { + return Ok(DigestCheckResult::IsSame); + } + Ok(DigestCheckResult::NeedConfirm(FileTransferDigest { + id: digest.id, + file_num: digest.file_num, + last_modified: local_mt.as_secs(), + file_size: metadata.len(), + ..Default::default() + })) + } else { + Ok(DigestCheckResult::NoSuchFile) + } +} diff --git a/src/client.rs b/src/client.rs index cfe02c20c..e191df80f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,9 +1,20 @@ +use std::{ + collections::HashMap, + net::SocketAddr, + ops::Deref, + sync::{mpsc, Arc, RwLock}, +}; + pub use async_trait::async_trait; #[cfg(not(any(target_os = "android", target_os = "linux")))] use cpal::{ traits::{DeviceTrait, HostTrait, StreamTrait}, Device, Host, StreamConfig, }; +use magnum_opus::{Channels::*, Decoder as AudioDecoder}; +use sha2::{Digest, Sha256}; +use uuid::Uuid; + use hbb_common::{ allow_err, anyhow::{anyhow, Context}, @@ -19,24 +30,15 @@ use hbb_common::{ tokio::time::Duration, AddrMangle, ResultType, Stream, }; -use magnum_opus::{Channels::*, Decoder as AudioDecoder}; use scrap::{Decoder, Image, VideoCodecId}; -use sha2::{Digest, Sha256}; -use std::{ - collections::HashMap, - net::SocketAddr, - ops::Deref, - sync::{mpsc, Arc, RwLock}, -}; -use uuid::Uuid; + +pub use super::lang::*; pub mod file_trait; pub use file_trait::FileManager; pub const SEC30: Duration = Duration::from_secs(30); pub struct Client; -pub use super::lang::*; - #[cfg(not(any(target_os = "android", target_os = "linux")))] lazy_static::lazy_static! { static ref AUDIO_HOST: Host = cpal::default_host(); @@ -1311,7 +1313,7 @@ pub enum Data { Close, Login((String, bool)), Message(Message), - SendFiles((i32, String, String, bool, bool)), + SendFiles((i32, String, String, i32, bool, bool)), RemoveDirAll((i32, String, bool)), ConfirmDeleteFiles((i32, i32)), SetNoConfirm(i32), @@ -1323,6 +1325,9 @@ pub enum Data { AddPortForward((i32, String, i32)), ToggleClipboardFile, NewRDP, + SetConfirmOverrideFile((i32, i32, bool, bool, bool)), + AddJob((i32, String, String, i32, bool, bool)), + ResumeJob((i32, bool)), } #[derive(Clone)] diff --git a/src/client/file_trait.rs b/src/client/file_trait.rs index 0e6e97e42..be790b035 100644 --- a/src/client/file_trait.rs +++ b/src/client/file_trait.rs @@ -80,9 +80,26 @@ pub trait FileManager: Interface { id: i32, path: String, to: String, + file_num: i32, include_hidden: bool, is_remote: bool, ) { - self.send(Data::SendFiles((id, path, to, include_hidden, is_remote))); + self.send(Data::SendFiles((id, path, to, file_num, include_hidden, is_remote))); + } + + fn add_job( + &mut self, + id: i32, + path: String, + to: String, + file_num: i32, + include_hidden: bool, + is_remote: bool, + ) { + self.send(Data::AddJob((id, path, to, file_num, include_hidden, is_remote))); + } + + fn resume_job(&mut self, id: i32, is_remote: bool){ + self.send(Data::ResumeJob((id,is_remote))); } } diff --git a/src/ipc.rs b/src/ipc.rs index f09367a1a..2388a7d9c 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -44,6 +44,7 @@ pub enum FS { NewWrite { path: String, id: i32, + file_num: i32, files: Vec<(String, u64)>, }, CancelWrite { @@ -59,6 +60,18 @@ pub enum FS { id: i32, file_num: i32, }, + WriteOffset { + id: i32, + file_num: i32, + offset_blk: u32 + }, + CheckDigest { + id: i32, + file_num: i32, + file_size: u64, + last_modified: u64, + is_upload: bool, + }, } #[derive(Debug, Serialize, Deserialize, Clone)] diff --git a/src/lang/cn.rs b/src/lang/cn.rs index c7849b11b..b2a3dc9a4 100644 --- a/src/lang/cn.rs +++ b/src/lang/cn.rs @@ -266,6 +266,8 @@ pub static ref T: std::collections::HashMap<&'static str, &'static str> = ("android_start_service_tip", "点击 [启动服务] 或打开 [屏幕录制] 权限开启手机屏幕共享服务。"), ("Account", "账号"), ("Quit", "退出"), + ("Overwrite", "覆盖"), + ("This file exists, skip or overwrite this file?", "这个文件/文件夹已存在,跳过/覆盖?"), ("doc_mac_permission", "https://rustdesk.com/docs/zh-cn/manual/mac/#启用权限"), ("Help", "帮助"), ].iter().cloned().collect(); diff --git a/src/lang/ru.rs b/src/lang/ru.rs index 0249b4eee..e3cf96dae 100644 --- a/src/lang/ru.rs +++ b/src/lang/ru.rs @@ -266,6 +266,8 @@ pub static ref T: std::collections::HashMap<&'static str, &'static str> = ("android_start_service_tip", "Нажмите [Запуск промежуточного сервера] или ОТКРЫТЬ разрешение [Скриншот], чтобы запустить службу демонстрации экрана."), ("Account", "Аккаунт"), ("Quit", "Выйти"), + ("Overwrite", "крышка"), + ("This file exists, skip or overwrite this file?", "Этот файл существует, пропустить или перезаписать этот файл?"), ("doc_mac_permission", "https://rustdesk.com/docs/en/manual/mac/#enable-permissions"), ("Help", "Помощь"), ].iter().cloned().collect(); diff --git a/src/lang/template.rs b/src/lang/template.rs index 9cfed1283..84be6ed8d 100644 --- a/src/lang/template.rs +++ b/src/lang/template.rs @@ -265,6 +265,8 @@ pub static ref T: std::collections::HashMap<&'static str, &'static str> = ("android_version_audio_tip", ""), ("android_start_service_tip", ""), ("Account", ""), + ("Overwrite", ""), + ("This file exists, skip or overwrite this file?", "") ("Quit", ""), ("doc_mac_permission", ""), ("Help", ""), diff --git a/src/lang/tw.rs b/src/lang/tw.rs index 6d9e2814a..5ce7713d9 100644 --- a/src/lang/tw.rs +++ b/src/lang/tw.rs @@ -266,6 +266,8 @@ pub static ref T: std::collections::HashMap<&'static str, &'static str> = ("android_start_service_tip", "點擊 [啟動服務] 或打開 [屏幕錄製] 權限開啟手機屏幕共享服務。"), ("Account", "帳戶"), ("Quit", "退出"), + ("Overwrite", "覆蓋"), + ("This file exists, skip or overwrite this file?", "這個文件/文件夾已存在,跳過/覆蓋?"), ("doc_mac_permission", "https://rustdesk.com/docs/zh-tw/manual/mac/#啟用權限"), ("Help", "幫助"), ].iter().cloned().collect(); diff --git a/src/server/connection.rs b/src/server/connection.rs index e726ba5b0..f4780ea4d 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -3,13 +3,17 @@ use super::{input_service::*, *}; use crate::clipboard_file::*; #[cfg(not(any(target_os = "android", target_os = "ios")))] use crate::common::update_clipboard; -use crate::ipc; #[cfg(any(target_os = "android", target_os = "ios"))] use crate::{common::MOBILE_INFO2, mobile::connection_manager::start_channel}; +use crate::{ipc, VERSION}; +use hbb_common::fs::can_enable_overwrite_detection; +use hbb_common::log::debug; +use hbb_common::message_proto::file_transfer_send_confirm_request::Union; use hbb_common::{ config::Config, fs, futures::{SinkExt, StreamExt}, + get_version_number, message_proto::{option_message::BoolOption, permission_info::Permission}, sleep, timeout, tokio::{ @@ -19,6 +23,7 @@ use hbb_common::{ }, tokio_util::codec::{BytesCodec, Framed}, }; +use libc::{printf, send}; #[cfg(any(target_os = "android", target_os = "ios"))] use scrap::android::call_input_service_mouse_input; use serde_json::{json, value::Value}; @@ -968,9 +973,18 @@ impl Connection { } Some(file_action::Union::send(s)) => { let id = s.id; - let path = s.path; - match fs::TransferJob::new_read(id, path.clone(), s.include_hidden) - { + let od = + can_enable_overwrite_detection(get_version_number(VERSION)); + let path = s.path.clone(); + match fs::TransferJob::new_read( + id, + "".to_string(), + path.clone(), + s.file_num, + s.include_hidden, + false, + od, + ) { Err(err) => { self.send(fs::new_error(id, err, 0)).await; } @@ -986,6 +1000,7 @@ impl Connection { self.send_fs(ipc::FS::NewWrite { path: r.path, id: r.id, + file_num: r.file_num, files: r .files .to_vec() @@ -1018,6 +1033,11 @@ impl Connection { self.send_fs(ipc::FS::CancelWrite { id: c.id }); fs::remove_job(c.id, &mut self.read_jobs); } + Some(file_action::Union::send_confirm(r)) => { + if let Some(job) = fs::get_job(r.id, &mut self.read_jobs) { + job.confirm(&r); + } + } _ => {} } } @@ -1037,6 +1057,13 @@ impl Connection { file_num: d.file_num, }); } + Some(file_response::Union::digest(d)) => self.send_fs(ipc::FS::CheckDigest { + id: d.id, + file_num: d.file_num, + file_size: d.file_size, + last_modified: d.last_modified, + is_upload: true, + }), _ => {} }, Some(message::Union::misc(misc)) => match misc.union { diff --git a/src/ui/cm.rs b/src/ui/cm.rs index 1fafedde7..421750200 100644 --- a/src/ui/cm.rs +++ b/src/ui/cm.rs @@ -1,15 +1,22 @@ use crate::ipc::{self, new_listener, Connection, Data}; +use crate::VERSION; #[cfg(windows)] use clipboard::{ create_cliprdr_context, empty_clipboard, get_rx_clip_client, server_clip_file, set_conn_enabled, }; +use hbb_common::fs::{ + can_enable_overwrite_detection, get_string, is_write_need_confirmation, new_send_confirm, + DigestCheckResult, +}; +use hbb_common::log::log; use hbb_common::{ allow_err, config::Config, - fs, log, + fs, get_version_number, log, message_proto::*, protobuf::Message as _, tokio::{self, sync::mpsc, task::spawn_blocking}, + ResultType, }; use sciter::{make_args, Element, Value, HELEMENT}; use std::{ @@ -151,11 +158,19 @@ impl ConnectionManager { ipc::FS::NewWrite { path, id, + file_num, mut files, } => { + let od = can_enable_overwrite_detection(get_version_number(VERSION)); + // cm has no show_hidden context + // dummy remote, show_hidden, is_remote write_jobs.push(fs::TransferJob::new_write( id, + "".to_string(), path, + file_num, + false, + false, files .drain(..) .map(|f| FileEntry { @@ -164,6 +179,7 @@ impl ConnectionManager { ..Default::default() }) .collect(), + od, )); } ipc::FS::CancelWrite { id } => { @@ -179,6 +195,59 @@ impl ConnectionManager { fs::remove_job(id, write_jobs); } } + ipc::FS::CheckDigest { + id, + file_num, + file_size, + last_modified, + is_upload, + } => { + if let Some(job) = fs::get_job(id, write_jobs) { + let mut req = FileTransferSendConfirmRequest { + id, + file_num, + union: Some(file_transfer_send_confirm_request::Union::offset_blk(0)), + ..Default::default() + }; + let digest = FileTransferDigest { + id, + file_num, + last_modified, + file_size, + ..Default::default() + }; + if let Some(file) = job.files().get(file_num as usize) { + let path = get_string(&job.join(&file.name)); + match is_write_need_confirmation(&path, &digest) { + Ok(digest_result) => { + match digest_result { + DigestCheckResult::IsSame => { + req.set_skip(true); + let msg_out = new_send_confirm(req); + Self::send(msg_out, conn).await; + } + DigestCheckResult::NeedConfirm(mut digest) => { + // upload to server, but server has the same file, request + digest.is_upload = is_upload; + let mut msg_out = Message::new(); + let mut fr = FileResponse::new(); + fr.set_digest(digest); + msg_out.set_file_response(fr); + Self::send(msg_out, conn).await; + } + DigestCheckResult::NoSuchFile => { + let msg_out = new_send_confirm(req); + Self::send(msg_out, conn).await; + } + } + } + Err(err) => { + Self::send(fs::new_error(id, err, file_num), conn).await; + } + } + } + } + } ipc::FS::WriteBlock { id, file_num, @@ -208,6 +277,11 @@ impl ConnectionManager { } } } + ipc::FS::WriteOffset { + id, + file_num, + offset_blk, + } => {} }, #[cfg(windows)] Data::ClipbaordFile(_clip) => { diff --git a/src/ui/file_transfer.css b/src/ui/file_transfer.css index d1e1a4072..9b45ea2b7 100644 --- a/src/ui/file_transfer.css +++ b/src/ui/file_transfer.css @@ -220,6 +220,10 @@ table.job-table tr.is_remote svg { transform: scale(-1, 1); } +table.job-table tr.is_remote div.svg_continue svg { + transform: scale(1, 1); +} + table.job-table tr td div.text { width: *; overflow-x: hidden; diff --git a/src/ui/file_transfer.tis b/src/ui/file_transfer.tis index d66a8688e..7d50bdf7a 100644 --- a/src/ui/file_transfer.tis +++ b/src/ui/file_transfer.tis @@ -19,6 +19,7 @@ var svg_refresh = ; var svg_cancel = ; +var svg_continue = ; var svg_computer = @@ -100,6 +101,19 @@ class JobTable: Reactor.Component { refreshDir(is_remote); } + event click $(svg.continue) (_, me) { + var job = this.jobs[me.parent.parent.parent.index]; + var id = job.id; + this.continueJob(id); + this.update(); + } + + function clearAllJobs() { + this.jobs = []; + this.job_map = {}; + this.update(); + } + function send(path, is_remote) { var to; var show_hidden; @@ -117,13 +131,36 @@ class JobTable: Reactor.Component { this.jobs.push({ type: "transfer", id: id, path: path, to: to, include_hidden: show_hidden, - is_remote: is_remote }); + is_remote: is_remote, + is_last: false + }); this.job_map[id] = this.jobs[this.jobs.length - 1]; - handler.send_files(id, path, to, show_hidden, is_remote); + handler.send_files(id, path, to, 0, show_hidden, is_remote); var self = this; self.timer(30ms, function() { self.update(); }); } + function addJob(id, path, to, file_num, show_hidden, is_remote) { + var job = { type: "transfer", + id: id, path: path, to: to, + include_hidden: show_hidden, + is_remote: is_remote, is_last: true, file_num: file_num }; + this.jobs.push(job); + this.job_map[id] = this.jobs[this.jobs.length - 1]; + jobIdCounter = id + 1; + handler.add_job(id, path, to, file_num, show_hidden, is_remote); + stdout.println(JSON.stringify(job)); + } + + function continueJob(id) { + var job = this.job_map[id]; + if (job == null || !job.is_last){ + return; + } + job.is_last = false; + handler.resume_job(job.id, job.is_remote); + } + function addDelDir(path, is_remote) { var id = jobIdCounter; jobIdCounter += 1; @@ -258,6 +295,9 @@ class JobTable: Reactor.Component {
{job.path}
{this.getStatus(job)}
+
+ {svg_continue} +
{svg_cancel} ; } @@ -603,6 +643,7 @@ function initializeFileTransfer() } handler.updateFolderFiles = function(fd) { + // stdout.println("update folder files: " + JSON.stringify(fd)); fd.entries = fd.entries || []; if (fd.id > 0) { var jt = file_transfer.job_table; @@ -649,6 +690,19 @@ handler.jobError = function(id, err, file_num = -1) { file_transfer.job_table.updateJobStatus(id, file_num, err); } +handler.clearAllJobs = function() { + file_transfer.job_table.clearAllJobs(); +} + +handler.addJob = function (id, path, to, file_num, show_hidden, is_remote) { + // stdout.println("restore job: " + is_remote); + file_transfer.job_table.addJob(id,path,to,file_num,show_hidden,is_remote); +} + +handler.updateTransferList = function () { + file_transfer.job_table.update(); +} + function refreshDir(is_remote) { if (is_remote) file_transfer.remote_folder_view.refreshDir(); else file_transfer.local_folder_view.refreshDir(); @@ -716,6 +770,33 @@ handler.confirmDeleteFiles = function(id, i, name) { }); } +handler.overrideFileConfirm = function(id, file_num, to, is_upload) { + var jt = file_transfer.job_table; + msgbox("custom-skip", "Confirm Write Strategy", "
\ +
" + translate('Overwrite') + translate('files') + ".
\ +
" + translate('This file exists, skip or overwrite this file?') + "
\ + " + to + "
\ +
" + translate('Do this for all conflicts') + "
\ + ", function(res=null) { + if (!res) { + jt.updateJobStatus(id, -1, "cancel"); + handler.cancel_job(id); + } else if (res.skip) { + if (res.remember){ + handler.set_write_override(id,file_num,false,true, is_upload); // + } else { + handler.set_write_override(id,file_num,false,false,is_upload); // + } + } else { + if (res.remember){ + handler.set_write_override(id,file_num,true,true,is_upload); // + } else { + handler.set_write_override(id,file_num,true,false,is_upload); // + } + } + }); +} + function save_file_transfer_close_state() { var local_dir = file_transfer.local_folder_view.fd.path || ""; var local_show_hidden = file_transfer.local_folder_view.show_hidden ? "Y" : ""; diff --git a/src/ui/remote.rs b/src/ui/remote.rs index de9ca032c..34db3d507 100644 --- a/src/ui/remote.rs +++ b/src/ui/remote.rs @@ -1,19 +1,34 @@ -#[cfg(windows)] -use crate::clipboard_file::*; -use crate::{ - client::*, - common::{self, check_clipboard, update_clipboard, ClipboardContext, CLIPBOARD_INTERVAL}, +use std::{ + collections::HashMap, + ops::Deref, + sync::{Arc, Mutex, RwLock}, }; + +use sciter::{ + dom::{ + event::{EventReason, BEHAVIOR_EVENTS, EVENT_GROUPS, PHASE_MASK}, + Element, HELEMENT, + }, + make_args, + video::{video_destination, AssetPtr, COLOR_SPACE}, + Value, +}; + #[cfg(windows)] use clipboard::{ cliprdr::CliprdrClientContext, create_cliprdr_context as create_clipboard_file_context, get_rx_clip_client, server_clip_file, }; use enigo::{self, Enigo, KeyboardControllable}; +use hbb_common::fs::{ + can_enable_overwrite_detection, get_string, is_file_exists, new_send_confirm, + DigestCheckResult, RemoveJobMeta, get_job, +}; +use hbb_common::log::log; use hbb_common::{ allow_err, - config::{Config, LocalConfig, PeerConfig}, - fs, log, + config::{self, Config, LocalConfig, PeerConfig}, + fs, get_version_number, log, message_proto::{permission_info::Permission, *}, protobuf::Message as _, rendezvous_proto::ConnType, @@ -25,19 +40,14 @@ use hbb_common::{ }, Stream, }; -use sciter::{ - dom::{ - event::{EventReason, BEHAVIOR_EVENTS, EVENT_GROUPS, PHASE_MASK}, - Element, HELEMENT, - }, - make_args, - video::{video_destination, AssetPtr, COLOR_SPACE}, - Value, -}; -use std::{ - collections::HashMap, - ops::Deref, - sync::{Arc, Mutex, RwLock}, +use hbb_common::{config::TransferSerde, fs::TransferJobMeta}; + +#[cfg(windows)] +use crate::clipboard_file::*; +use crate::{ + client::*, + common::{self, check_clipboard, update_clipboard, ClipboardContext, CLIPBOARD_INTERVAL}, + VERSION, }; type Video = AssetPtr; @@ -194,7 +204,9 @@ impl sciter::EventHandler for Handler { fn confirm_delete_files(i32, i32); fn set_no_confirm(i32); fn cancel_job(i32); - fn send_files(i32, String, String, bool, bool); + fn send_files(i32, String, String, i32, bool, bool); + fn add_job(i32, String, String, i32, bool, bool); + fn resume_job(i32, bool); fn get_platform(bool); fn get_path_sep(bool); fn get_icon_path(i32, String); @@ -216,6 +228,7 @@ impl sciter::EventHandler for Handler { fn toggle_option(String); fn get_remember(); fn peer_platform(); + fn set_write_override(i32, i32, bool, bool, bool); } } @@ -536,6 +549,24 @@ impl Handler { self.lc.read().unwrap().remember } + fn set_write_override( + &mut self, + job_id: i32, + file_num: i32, + is_override: bool, + remember: bool, + is_upload: bool, + ) -> bool { + self.send(Data::SetConfirmOverrideFile(( + job_id, + file_num, + is_override, + remember, + is_upload, + ))); + true + } + fn t(&self, name: String) -> String { crate::client::translate(name) } @@ -744,6 +775,7 @@ impl Handler { } fn reconnect(&mut self) { + println!("reconnecting"); let cloned = self.clone(); let mut lock = self.write().unwrap(); lock.thread.take().map(|t| t.join()); @@ -1289,6 +1321,7 @@ async fn io_loop(handler: Handler) { clipboard_file_context: None, }; remote.io_loop(&key, &token).await; + remote.sync_jobs_status_to_local().await; } struct RemoveJob { @@ -1311,6 +1344,14 @@ impl RemoveJob { last_update_job_status: Instant::now(), } } + + pub fn gen_meta(&self) -> RemoveJobMeta { + RemoveJobMeta { + path: self.path.clone(), + is_remote: self.is_remote, + no_confirm: self.no_confirm, + } + } } struct Remote { @@ -1497,7 +1538,57 @@ impl Remote { Some(tx) } + async fn load_last_jobs(&mut self) { + log::info!("start load last jobs"); + self.handler.call("clearAllJobs", &make_args!()); + let pc = self.handler.load_config(); + if pc.transfer.write_jobs.is_empty() && pc.transfer.read_jobs.is_empty() { + // no last jobs + return; + } + // TODO: can add a confirm dialog + let mut cnt = 1; + for job_str in pc.transfer.read_jobs.iter() { + let job: Result = serde_json::from_str(&job_str); + if let Ok(job) = job { + self.handler.call( + "addJob", + &make_args!( + cnt, + job.to.clone(), + job.remote.clone(), + job.file_num, + job.show_hidden, + false + ), + ); + cnt += 1; + println!("restore read_job: {:?}", job); + } + } + for job_str in pc.transfer.write_jobs.iter() { + let job: Result = serde_json::from_str(&job_str); + if let Ok(job) = job { + self.handler.call( + "addJob", + &make_args!( + cnt, + job.remote.clone(), + job.to.clone(), + job.file_num, + job.show_hidden, + true + ), + ); + cnt += 1; + println!("restore write_job: {:?}", job); + } + } + self.handler.call("updateTransferList", &make_args!()); + } + async fn handle_msg_from_ui(&mut self, data: Data, peer: &mut Stream) -> bool { + // log::info!("new msg from ui, {}",data); match data { Data::Close => { return false; @@ -1513,14 +1604,35 @@ impl Remote { Data::Message(msg) => { allow_err!(peer.send(&msg).await); } - Data::SendFiles((id, path, to, include_hidden, is_remote)) => { + Data::SendFiles((id, path, to, file_num, include_hidden, is_remote)) => { + log::info!("send files, is remote {}", is_remote); + let od = can_enable_overwrite_detection(self.handler.lc.read().unwrap().version); if is_remote { log::debug!("New job {}, write to {} from remote {}", id, to, path); - self.write_jobs - .push(fs::TransferJob::new_write(id, to, Vec::new())); - allow_err!(peer.send(&fs::new_send(id, path, include_hidden)).await); + self.write_jobs.push(fs::TransferJob::new_write( + id, + path.clone(), + to, + file_num, + include_hidden, + is_remote, + Vec::new(), + od, + )); + allow_err!( + peer.send(&fs::new_send(id, path, file_num, include_hidden)) + .await + ); } else { - match fs::TransferJob::new_read(id, path.clone(), include_hidden) { + match fs::TransferJob::new_read( + id, + to.clone(), + path.clone(), + file_num, + include_hidden, + is_remote, + od, + ) { Err(err) => { self.handle_job_status(id, -1, Some(err.to_string())); } @@ -1537,11 +1649,74 @@ impl Remote { let files = job.files().clone(); self.read_jobs.push(job); self.timer = time::interval(MILLI1); - allow_err!(peer.send(&fs::new_receive(id, to, files)).await); + allow_err!(peer.send(&fs::new_receive(id, to, file_num, files)).await); } } } } + Data::AddJob((id, path, to, file_num, include_hidden, is_remote)) => { + let od = can_enable_overwrite_detection(self.handler.lc.read().unwrap().version); + if is_remote { + log::debug!("new write waiting job {}, write to {} from remote {}", id, to, path); + let mut job = fs::TransferJob::new_write( + id, + path.clone(), + to, + file_num, + include_hidden, + is_remote, + Vec::new(), + od, + ); + job.is_last_job = true; + self.write_jobs.push(job); + } else { + match fs::TransferJob::new_read( + id, + to.clone(), + path.clone(), + file_num, + include_hidden, + is_remote, + od, + ) { + Err(err) => { + self.handle_job_status(id, -1, Some(err.to_string())); + } + Ok(mut job) => { + log::debug!( + "new read waiting job {}, read {} to remote {}, {} files", + id, + path, + to, + job.files().len() + ); + let m = make_fd(job.id(), job.files(), true); + self.handler.call("updateFolderFiles", &make_args!(m)); + job.is_last_job = true; + self.read_jobs.push(job); + self.timer = time::interval(MILLI1); + } + } + } + } + Data::ResumeJob((id, is_remote)) => { + if is_remote { + if let Some(job) = get_job(id, &mut self.write_jobs) { + job.is_last_job = false; + allow_err!( + peer.send(&fs::new_send(id, job.remote.clone(), job.file_num, job.show_hidden)) + .await + ); + } + } else { + if let Some(job) = get_job(id, &mut self.read_jobs) { + job.is_last_job = false; + allow_err!(peer.send(&fs::new_receive(id, job.path.to_string_lossy().to_string(), + job.file_num, job.files.clone())).await); + } + } + } Data::SetNoConfirm(id) => { if let Some(job) = self.remove_jobs.get_mut(&id) { job.no_confirm = true; @@ -1558,6 +1733,45 @@ impl Remote { } } } + Data::SetConfirmOverrideFile((id, file_num, need_override, remember, is_upload)) => { + if is_upload { + if let Some(job) = fs::get_job(id, &mut self.read_jobs) { + if remember { + job.set_overwrite_strategy(Some(need_override)); + } + job.confirm(&FileTransferSendConfirmRequest { + id, + file_num, + union: if need_override { + Some(file_transfer_send_confirm_request::Union::offset_blk(0)) + } else { + Some(file_transfer_send_confirm_request::Union::skip(true)) + }, + ..Default::default() + }); + } + } else { + if let Some(job) = fs::get_job(id, &mut self.write_jobs) { + if remember { + job.set_overwrite_strategy(Some(need_override)); + } + let mut msg = Message::new(); + let mut file_action = FileAction::new(); + file_action.set_send_confirm(FileTransferSendConfirmRequest { + id, + file_num, + union: if need_override { + Some(file_transfer_send_confirm_request::Union::offset_blk(0)) + } else { + Some(file_transfer_send_confirm_request::Union::skip(true)) + }, + ..Default::default() + }); + msg.set_file_action(file_action); + allow_err!(peer.send(&msg).await); + } + } + } Data::RemoveDirAll((id, path, is_remote)) => { let sep = self.handler.get_path_sep(is_remote); if is_remote { @@ -1715,6 +1929,24 @@ impl Remote { } } + async fn sync_jobs_status_to_local(&mut self) -> bool { + log::info!("sync transfer job status"); + let mut config: PeerConfig = self.handler.load_config(); + let mut transfer_metas = TransferSerde::default(); + for job in self.read_jobs.iter() { + let json_str = serde_json::to_string(&job.gen_meta()).unwrap(); + transfer_metas.read_jobs.push(json_str); + } + for job in self.write_jobs.iter() { + let json_str = serde_json::to_string(&job.gen_meta()).unwrap(); + transfer_metas.write_jobs.push(json_str); + } + log::info!("meta: {:?}",transfer_metas); + config.transfer = transfer_metas; + self.handler.save_config(config); + true + } + async fn handle_msg_from_peer(&mut self, data: &[u8], peer: &mut Stream) -> bool { if let Ok(msg_in) = Message::parse_from_bytes(&data) { match msg_in.union { @@ -1755,6 +1987,10 @@ impl Remote { }); } } + + if self.handler.is_file_transfer() { + self.load_last_jobs().await; + } } _ => {} }, @@ -1782,40 +2018,146 @@ impl Remote { } } } - Some(message::Union::file_response(fr)) => match fr.union { - Some(file_response::Union::dir(fd)) => { - let entries = fd.entries.to_vec(); - let mut m = make_fd(fd.id, &entries, fd.id > 0); - if fd.id <= 0 { - m.set_item("path", fd.path); - } - self.handler.call("updateFolderFiles", &make_args!(m)); - if let Some(job) = fs::get_job(fd.id, &mut self.write_jobs) { - job.set_files(entries); - } else if let Some(job) = self.remove_jobs.get_mut(&fd.id) { - job.files = entries; - } - } - Some(file_response::Union::block(block)) => { - if let Some(job) = fs::get_job(block.id, &mut self.write_jobs) { - if let Err(_err) = job.write(block, None).await { - // to-do: add "skip" for writing job + Some(message::Union::file_response(fr)) => { + match fr.union { + Some(file_response::Union::dir(fd)) => { + let entries = fd.entries.to_vec(); + let mut m = make_fd(fd.id, &entries, fd.id > 0); + if fd.id <= 0 { + m.set_item("path", fd.path); + } + self.handler.call("updateFolderFiles", &make_args!(m)); + if let Some(job) = fs::get_job(fd.id, &mut self.write_jobs) { + log::info!("job set_files: {:?}", entries); + job.set_files(entries); + } else if let Some(job) = self.remove_jobs.get_mut(&fd.id) { + job.files = entries; } - self.update_jobs_status(); } - } - Some(file_response::Union::done(d)) => { - if let Some(job) = fs::get_job(d.id, &mut self.write_jobs) { - job.modify_time(); - fs::remove_job(d.id, &mut self.write_jobs); + Some(file_response::Union::digest(digest)) => { + if digest.is_upload { + if let Some(job) = fs::get_job(digest.id, &mut self.read_jobs) { + if let Some(file) = job.files().get(digest.file_num as usize) { + let read_path = get_string(&job.join(&file.name)); + let overwrite_strategy = job.default_overwrite_strategy(); + if let Some(overwrite) = overwrite_strategy { + let req = FileTransferSendConfirmRequest { + id: digest.id, + file_num: digest.file_num, + union: Some(if overwrite { + file_transfer_send_confirm_request::Union::offset_blk(0) + } else { + file_transfer_send_confirm_request::Union::skip( + true, + ) + }), + ..Default::default() + }; + job.confirm(&req); + let msg = new_send_confirm(req); + allow_err!(peer.send(&msg).await); + } else { + self.handler.call( + "overrideFileConfirm", + &make_args!( + digest.id, + digest.file_num, + read_path, + true + ), + ); + } + } + } + } else { + if let Some(job) = fs::get_job(digest.id, &mut self.write_jobs) { + if let Some(file) = job.files().get(digest.file_num as usize) { + let write_path = get_string(&job.join(&file.name)); + let overwrite_strategy = job.default_overwrite_strategy(); + match fs::is_write_need_confirmation(&write_path, &digest) { + Ok(res) => match res { + DigestCheckResult::IsSame => { + let msg= new_send_confirm(FileTransferSendConfirmRequest { + id: digest.id, + file_num: digest.file_num, + union: Some(file_transfer_send_confirm_request::Union::skip(true)), + ..Default::default() + }); + allow_err!(peer.send(&msg).await); + } + DigestCheckResult::NeedConfirm(digest) => { + if let Some(overwrite) = overwrite_strategy { + let msg = new_send_confirm( + FileTransferSendConfirmRequest { + id: digest.id, + file_num: digest.file_num, + union: Some(if overwrite { + file_transfer_send_confirm_request::Union::offset_blk(0) + } else { + file_transfer_send_confirm_request::Union::skip(true) + }), + ..Default::default() + }, + ); + allow_err!(peer.send(&msg).await); + } else { + self.handler.call( + "overrideFileConfirm", + &make_args!( + digest.id, + digest.file_num, + write_path, + false + ), + ); + } + } + DigestCheckResult::NoSuchFile => { + let msg = new_send_confirm( + FileTransferSendConfirmRequest { + id: digest.id, + file_num: digest.file_num, + union: Some(file_transfer_send_confirm_request::Union::offset_blk(0)), + ..Default::default() + }, + ); + allow_err!(peer.send(&msg).await); + } + }, + Err(err) => { + println!("error recving digest: {}", err); + } + } + } + } + } } - self.handle_job_status(d.id, d.file_num, None); + Some(file_response::Union::block(block)) => { + log::info!( + "file response block, file id:{}, file num: {}", + block.id, + block.file_num + ); + if let Some(job) = fs::get_job(block.id, &mut self.write_jobs) { + if let Err(_err) = job.write(block, None).await { + // to-do: add "skip" for writing job + } + self.update_jobs_status(); + } + } + Some(file_response::Union::done(d)) => { + if let Some(job) = fs::get_job(d.id, &mut self.write_jobs) { + job.modify_time(); + fs::remove_job(d.id, &mut self.write_jobs); + } + self.handle_job_status(d.id, d.file_num, None); + } + Some(file_response::Union::error(e)) => { + self.handle_job_status(e.id, e.file_num, Some(e.error)); + } + _ => {} } - Some(file_response::Union::error(e)) => { - self.handle_job_status(e.id, e.file_num, Some(e.error)); - } - _ => {} - }, + } Some(message::Union::misc(misc)) => match misc.union { Some(misc::Union::audio_format(f)) => { self.audio_sender.send(MediaData::AudioFormat(f)).ok(); @@ -1891,6 +2233,14 @@ impl Remote { self.audio_sender.send(MediaData::AudioFrame(frame)).ok(); } } + Some(message::Union::file_action(action)) => match action.union { + Some(file_action::Union::send_confirm(c)) => { + if let Some(job) = fs::get_job(c.id, &mut self.read_jobs) { + job.confirm(&c); + } + } + _ => {} + }, _ => {} } }