Merge pull request #510 from Kingtous/opt/file_transfer_resume

opt: file transfer can resume when encountering reconnecting or close&open window
This commit is contained in:
RustDesk
2022-05-17 17:22:40 +08:00
committed by GitHub
17 changed files with 950 additions and 98 deletions

View File

@@ -251,6 +251,7 @@ message FileAction {
FileRemoveFile remove_file = 6;
ReadAllFiles all_files = 7;
FileTransferCancel cancel = 8;
FileTransferSendConfirmRequest send_confirm = 9;
}
}
@@ -262,14 +263,24 @@ message FileResponse {
FileTransferBlock block = 2;
FileTransferError error = 3;
FileTransferDone done = 4;
FileTransferDigest digest = 5;
}
}
message FileTransferDigest {
int32 id = 1;
sint32 file_num = 2;
uint64 last_modified = 3;
uint64 file_size = 4;
bool is_upload = 5;
}
message FileTransferBlock {
int32 id = 1;
sint32 file_num = 2;
bytes data = 3;
bool compressed = 4;
uint32 blk_id = 5;
}
message FileTransferError {
@@ -282,6 +293,16 @@ message FileTransferSendRequest {
int32 id = 1;
string path = 2;
bool include_hidden = 3;
int32 file_num = 4;
}
message FileTransferSendConfirmRequest {
int32 id = 1;
sint32 file_num = 2;
oneof union {
bool skip = 3;
uint32 offset_blk = 4;
}
}
message FileTransferDone {
@@ -293,6 +314,7 @@ message FileTransferReceiveRequest {
int32 id = 1;
string path = 2; // path written to
repeated FileEntry files = 3;
int32 file_num = 4;
}
message FileRemoveDir {

View File

@@ -145,6 +145,8 @@ pub struct PeerConfig {
pub options: HashMap<String, String>,
#[serde(default)]
pub info: PeerInfoSerde,
#[serde(default)]
pub transfer: TransferSerde,
}
#[derive(Debug, PartialEq, Default, Serialize, Deserialize, Clone)]
@@ -157,6 +159,14 @@ pub struct PeerInfoSerde {
pub platform: String,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct TransferSerde {
#[serde(default)]
pub write_jobs: Vec<String>,
#[serde(default)]
pub read_jobs: Vec<String>,
}
fn patch(path: PathBuf) -> PathBuf {
if let Some(_tmp) = path.to_str() {
#[cfg(windows)]
@@ -864,6 +874,7 @@ impl LanPeers {
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_serialize() {
let cfg: Config = Default::default();

View File

@@ -1,13 +1,17 @@
use crate::{bail, message_proto::*, ResultType};
#[cfg(windows)]
use std::os::windows::prelude::*;
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use serde_derive::{Deserialize, Serialize};
use tokio::{fs::File, io::*};
use crate::{bail, get_version_number, message_proto::*, ResultType, Stream};
// https://doc.rust-lang.org/std/os/windows/fs/trait.MetadataExt.html
use crate::{
compress::{compress, decompress},
config::{Config, COMPRESS_LEVEL},
};
#[cfg(windows)]
use std::os::windows::prelude::*;
use tokio::{fs::File, io::*};
pub fn read_dir(path: &PathBuf, include_hidden: bool) -> ResultType<FileDirectory> {
let mut dir = FileDirectory {
@@ -184,16 +188,61 @@ pub fn get_recursive_files(path: &str, include_hidden: bool) -> ResultType<Vec<F
read_dir_recursive(&get_path(path), &get_path(""), include_hidden)
}
#[inline]
pub fn is_file_exists(file_path: &str) -> bool {
return Path::new(file_path).exists();
}
#[inline]
pub fn can_enable_overwrite_detection(version: i64) -> bool {
version >= get_version_number("1.2.0")
}
#[derive(Default)]
pub struct TransferJob {
id: i32,
path: PathBuf,
files: Vec<FileEntry>,
file_num: i32,
pub id: i32,
pub remote: String,
pub path: PathBuf,
pub show_hidden: bool,
pub is_remote: bool,
pub is_last_job: bool,
pub file_num: i32,
pub files: Vec<FileEntry>,
file: Option<File>,
total_size: u64,
finished_size: u64,
transferred: u64,
enable_overwrite_detection: bool,
file_confirmed: bool,
file_is_waiting: bool,
default_overwrite_strategy: Option<bool>,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct TransferJobMeta {
#[serde(default)]
pub id: i32,
#[serde(default)]
pub remote: String,
#[serde(default)]
pub to: String,
#[serde(default)]
pub show_hidden: bool,
#[serde(default)]
pub file_num: i32,
#[serde(default)]
pub is_remote: bool,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct RemoveJobMeta {
#[serde(default)]
pub path: String,
#[serde(default)]
pub is_remote: bool,
#[serde(default)]
pub no_confirm: bool,
}
#[inline]
@@ -219,25 +268,54 @@ fn is_compressed_file(name: &str) -> bool {
}
impl TransferJob {
pub fn new_write(id: i32, path: String, files: Vec<FileEntry>) -> Self {
pub fn new_write(
id: i32,
remote: String,
path: String,
file_num: i32,
show_hidden: bool,
is_remote: bool,
files: Vec<FileEntry>,
enable_override_detection: bool,
) -> Self {
log::info!("new write {}", path);
let total_size = files.iter().map(|x| x.size as u64).sum();
Self {
id,
remote,
path: get_path(&path),
file_num,
show_hidden,
is_remote,
files,
total_size,
enable_overwrite_detection: enable_override_detection,
..Default::default()
}
}
pub fn new_read(id: i32, path: String, include_hidden: bool) -> ResultType<Self> {
let files = get_recursive_files(&path, include_hidden)?;
pub fn new_read(
id: i32,
remote: String,
path: String,
file_num: i32,
show_hidden: bool,
is_remote: bool,
enable_override_detection: bool,
) -> ResultType<Self> {
log::info!("new read {}", path);
let files = get_recursive_files(&path, show_hidden)?;
let total_size = files.iter().map(|x| x.size as u64).sum();
Ok(Self {
id,
remote,
path: get_path(&path),
file_num,
show_hidden,
is_remote,
files,
total_size,
enable_overwrite_detection: enable_override_detection,
..Default::default()
})
}
@@ -342,7 +420,7 @@ impl TransferJob {
}
#[inline]
fn join(&self, name: &str) -> PathBuf {
pub fn join(&self, name: &str) -> PathBuf {
if name.is_empty() {
self.path.clone()
} else {
@@ -350,7 +428,7 @@ impl TransferJob {
}
}
pub async fn read(&mut self) -> ResultType<Option<FileTransferBlock>> {
pub async fn read(&mut self, stream: &mut Stream) -> ResultType<Option<FileTransferBlock>> {
let file_num = self.file_num as usize;
if file_num >= self.files.len() {
self.file.take();
@@ -361,13 +439,26 @@ impl TransferJob {
match File::open(self.join(&name)).await {
Ok(file) => {
self.file = Some(file);
self.file_confirmed = false;
self.file_is_waiting = false;
}
Err(err) => {
self.file_num += 1;
self.file_confirmed = false;
self.file_is_waiting = false;
return Err(err.into());
}
}
}
if self.enable_overwrite_detection {
if !self.file_confirmed() {
if !self.file_is_waiting() {
self.send_current_digest(stream).await?;
self.set_file_is_waiting(true);
}
return Ok(None);
}
}
const BUF_SIZE: usize = 128 * 1024;
let mut buf: Vec<u8> = Vec::with_capacity(BUF_SIZE);
unsafe {
@@ -380,6 +471,8 @@ impl TransferJob {
Err(err) => {
self.file_num += 1;
self.file = None;
self.file_confirmed = false;
self.file_is_waiting = false;
return Err(err.into());
}
Ok(n) => {
@@ -394,6 +487,8 @@ impl TransferJob {
if offset == 0 {
self.file_num += 1;
self.file = None;
self.file_confirmed = false;
self.file_is_waiting = false;
} else {
self.finished_size += offset as u64;
if !is_compressed_file(name) {
@@ -413,6 +508,99 @@ impl TransferJob {
..Default::default()
}))
}
async fn send_current_digest(&mut self, stream: &mut Stream) -> ResultType<()> {
let mut msg = Message::new();
let mut resp = FileResponse::new();
let meta = self.file.as_ref().unwrap().metadata().await?;
let last_modified = meta
.modified()?
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
resp.set_digest(FileTransferDigest {
id: self.id,
file_num: self.file_num,
last_modified,
file_size: meta.len(),
..Default::default()
});
msg.set_file_response(resp);
stream.send(&msg).await?;
log::info!(
"id: {}, file_num:{}, digest message is sent. waiting for confirm. msg: {:?}",
self.id,
self.file_num,
msg
);
Ok(())
}
pub fn set_overwrite_strategy(&mut self, overwrite_strategy: Option<bool>) {
self.default_overwrite_strategy = overwrite_strategy;
}
pub fn default_overwrite_strategy(&self) -> Option<bool> {
self.default_overwrite_strategy
}
pub fn set_file_confirmed(&mut self, file_confirmed: bool) {
self.file_confirmed = file_confirmed;
}
pub fn set_file_is_waiting(&mut self, file_is_waiting: bool) {
self.file_is_waiting = file_is_waiting;
}
pub fn file_is_waiting(&self) -> bool {
self.file_is_waiting
}
pub fn file_confirmed(&self) -> bool {
self.file_confirmed
}
pub fn skip_current_file(&mut self) -> bool {
self.file.take();
self.set_file_confirmed(false);
self.set_file_is_waiting(false);
self.file_num += 1;
true
}
pub fn confirm(&mut self, r: &FileTransferSendConfirmRequest) -> bool {
if self.file_num() != r.file_num {
log::info!("file num truncated, ignoring");
} else {
match r.union {
Some(file_transfer_send_confirm_request::Union::skip(s)) => {
if s {
log::debug!("skip file id:{}, file_num:{}", r.id, r.file_num);
self.skip_current_file();
} else {
self.set_file_confirmed(true);
}
}
Some(file_transfer_send_confirm_request::Union::offset_blk(offset)) => {
log::debug!("file confirmed");
self.set_file_confirmed(true);
}
_ => {}
}
}
true
}
#[inline]
pub fn gen_meta(&self) -> TransferJobMeta {
TransferJobMeta {
id: self.id,
remote: self.remote.to_string(),
to: self.path.to_string_lossy().to_string(),
file_num: self.file_num,
show_hidden: self.show_hidden,
is_remote: self.is_remote,
}
}
}
#[inline]
@@ -453,12 +641,22 @@ pub fn new_block(block: FileTransferBlock) -> Message {
}
#[inline]
pub fn new_receive(id: i32, path: String, files: Vec<FileEntry>) -> Message {
pub fn new_send_confirm(r: FileTransferSendConfirmRequest) -> Message {
let mut msg_out = Message::new();
let mut action = FileAction::new();
action.set_send_confirm(r);
msg_out.set_file_action(action);
msg_out
}
#[inline]
pub fn new_receive(id: i32, path: String, file_num: i32, files: Vec<FileEntry>) -> Message {
let mut action = FileAction::new();
action.set_receive(FileTransferReceiveRequest {
id,
path,
files: files.into(),
file_num,
..Default::default()
});
let mut msg_out = Message::new();
@@ -467,12 +665,14 @@ pub fn new_receive(id: i32, path: String, files: Vec<FileEntry>) -> Message {
}
#[inline]
pub fn new_send(id: i32, path: String, include_hidden: bool) -> Message {
pub fn new_send(id: i32, path: String, file_num: i32, include_hidden: bool) -> Message {
log::info!("new send: {},id : {}", path, id);
let mut action = FileAction::new();
action.set_send(FileTransferSendRequest {
id,
path,
include_hidden,
file_num,
..Default::default()
});
let mut msg_out = Message::new();
@@ -509,7 +709,10 @@ pub async fn handle_read_jobs(
) -> ResultType<()> {
let mut finished = Vec::new();
for job in jobs.iter_mut() {
match job.read().await {
if job.is_last_job {
continue;
}
match job.read(stream).await {
Err(err) => {
stream
.send(&new_error(job.id(), err, job.file_num()))
@@ -519,8 +722,13 @@ pub async fn handle_read_jobs(
stream.send(&new_block(block)).await?;
}
Ok(None) => {
finished.push(job.id());
stream.send(&new_done(job.id(), job.file_num())).await?;
if !job.enable_overwrite_detection || (!job.file_confirmed && !job.file_is_waiting)
{
finished.push(job.id());
stream.send(&new_done(job.id(), job.file_num())).await?;
} else {
// waiting confirmation.
}
}
}
}
@@ -558,3 +766,35 @@ pub fn create_dir(dir: &str) -> ResultType<()> {
std::fs::create_dir_all(get_path(dir))?;
Ok(())
}
pub enum DigestCheckResult {
IsSame,
NeedConfirm(FileTransferDigest),
NoSuchFile,
}
#[inline]
pub fn is_write_need_confirmation(
file_path: &str,
digest: &FileTransferDigest,
) -> ResultType<DigestCheckResult> {
let path = Path::new(file_path);
if path.exists() && path.is_file() {
let metadata = std::fs::metadata(path)?;
let modified_time = metadata.modified()?;
let remote_mt = Duration::from_secs(digest.last_modified);
let local_mt = modified_time.duration_since(UNIX_EPOCH)?;
if remote_mt == local_mt && digest.file_size == metadata.len() {
return Ok(DigestCheckResult::IsSame);
}
Ok(DigestCheckResult::NeedConfirm(FileTransferDigest {
id: digest.id,
file_num: digest.file_num,
last_modified: local_mt.as_secs(),
file_size: metadata.len(),
..Default::default()
}))
} else {
Ok(DigestCheckResult::NoSuchFile)
}
}