diff --git a/libs/hbb_common/src/fs.rs b/libs/hbb_common/src/fs.rs index 8477c82ff..6de638ce0 100644 --- a/libs/hbb_common/src/fs.rs +++ b/libs/hbb_common/src/fs.rs @@ -572,6 +572,22 @@ impl TransferJob { self.file_skipped() && self.files.len() == 1 } + /// Check whether the job is completed after `read` returns `None` + /// This is a helper function which gives additional lifecycle when the job reads `None`. + /// If returns `true`, it means we can delete the job automatically. `False` otherwise. + /// + /// [`Note`] + /// Conditions: + /// 1. Files are not waiting for comfirmation by peers. + #[inline] + pub fn job_completed(&self) -> bool { + // has no error, Condition 2 + if !self.enable_overwrite_detection || (!self.file_confirmed && !self.file_is_waiting) { + return true; + } + return false; + } + /// Get job error message, useful for getting status when job had finished pub fn job_error(&self) -> Option { if self.job_skipped() { @@ -597,7 +613,6 @@ impl TransferJob { match r.union { Some(file_transfer_send_confirm_request::Union::Skip(s)) => { if s { - log::debug!("skip file id:{}, file_num:{}", r.id, r.file_num); self.set_file_skipped(); } else { self.set_file_confirmed(true); @@ -744,13 +759,16 @@ pub async fn handle_read_jobs( stream.send(&new_block(block)).await?; } Ok(None) => { - if !job.enable_overwrite_detection || (!job.file_confirmed && !job.file_is_waiting) - { - // for getting error detail, we do not remove this job, we will handle it in io loop - if job.job_error().is_none() { - finished.push(job.id()); + if job.job_completed() { + finished.push(job.id()); + let err = job.job_error(); + if err.is_some() { + stream + .send(&new_error(job.id(), err.unwrap(), job.file_num())) + .await?; + } else { + stream.send(&new_done(job.id(), job.file_num())).await?; } - stream.send(&new_done(job.id(), job.file_num())).await?; } else { // waiting confirmation. } @@ -758,8 +776,10 @@ pub async fn handle_read_jobs( } } for id in finished { + log::info!("remove read job {}", id); remove_job(id, jobs); } + // log::info!("read jobs: {:?}", jobs.iter().map(|item| {item.id}).collect::>()); Ok(()) } diff --git a/src/ipc.rs b/src/ipc.rs index eb2d364ae..478094cf2 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -75,6 +75,11 @@ pub enum FS { id: i32, file_num: i32, }, + WriteError { + id: i32, + file_num: i32, + err: String + }, WriteOffset { id: i32, file_num: i32, diff --git a/src/server/connection.rs b/src/server/connection.rs index c45a00af6..fdd0ea77a 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -1304,6 +1304,13 @@ impl Connection { last_modified: d.last_modified, is_upload: true, }), + Some(file_response::Union::Error(e)) => { + self.send_fs(ipc::FS::WriteError { + id: e.id, + file_num: e.file_num, + err: e.error, + }); + } _ => {} }, Some(message::Union::Misc(misc)) => match misc.union { diff --git a/src/ui_cm_interface.rs b/src/ui_cm_interface.rs index 26e5e4077..97ae82b8b 100644 --- a/src/ui_cm_interface.rs +++ b/src/ui_cm_interface.rs @@ -596,6 +596,12 @@ async fn handle_fs(fs: ipc::FS, write_jobs: &mut Vec, tx: &Unbo fs::remove_job(id, write_jobs); } } + ipc::FS::WriteError { id, file_num, err } => { + if let Some(job) = fs::get_job(id, write_jobs) { + send_raw(fs::new_error(id, err, file_num), tx); + fs::remove_job(id, write_jobs); + } + } ipc::FS::WriteBlock { id, file_num,