feat: server connections

This commit is contained in:
kingtous
2022-04-26 16:21:34 +08:00
committed by Kingtous
parent f4c6c4f6c4
commit b08877031f
3 changed files with 70 additions and 12 deletions

View File

@@ -1,4 +1,4 @@
use crate::{bail, message_proto::*, ResultType};
use crate::{bail, message_proto::*, ResultType, Stream};
use std::path::{Path, PathBuf};
// https://doc.rust-lang.org/std/os/windows/fs/trait.MetadataExt.html
use crate::{
@@ -198,6 +198,8 @@ pub struct TransferJob {
files: Vec<FileEntry>,
file_num: i32,
file: Option<File>,
file_confirmed: bool,
file_is_waiting: bool,
total_size: u64,
finished_size: u64,
transferred: u64,
@@ -360,7 +362,7 @@ impl TransferJob {
}
}
pub async fn read(&mut self) -> ResultType<Option<FileTransferBlock>> {
pub async fn read(&mut self, stream: &mut Stream) -> ResultType<Option<FileTransferBlock>> {
let file_num = self.file_num as usize;
if file_num >= self.files.len() {
self.file.take();
@@ -371,13 +373,41 @@ impl TransferJob {
match File::open(self.join(&name)).await {
Ok(file) => {
self.file = Some(file);
self.file_confirmed = false;
self.file_is_waiting = false;
}
Err(err) => {
self.file_num += 1;
self.file_confirmed = false;
self.file_is_waiting = false;
return Err(err.into());
}
}
}
if !self.file_confirmed() {
if !self.file_is_waiting() {
let mut msg = Message::new();
let mut resp = FileResponse::new();
let meta = self.file.as_ref().unwrap().metadata().await?;
let last_modified = meta
.modified()?
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
resp.set_digest(FileTransferDigest {
id: self.id,
file_num: self.file_num,
last_edit_timestamp: last_modified,
file_size: meta.len(),
unknown_fields: Default::default(),
cached_size: Default::default(),
});
self.set_file_is_waiting(true);
msg.set_file_response(resp);
stream.send(&msg);
println!("digest message is sent. waiting for confirm.");
}
return Ok(None);
}
const BUF_SIZE: usize = 128 * 1024;
let mut buf: Vec<u8> = Vec::with_capacity(BUF_SIZE);
unsafe {
@@ -390,6 +420,8 @@ impl TransferJob {
Err(err) => {
self.file_num += 1;
self.file = None;
self.file_confirmed = false;
self.file_is_waiting = false;
return Err(err.into());
}
Ok(n) => {
@@ -404,6 +436,8 @@ impl TransferJob {
if offset == 0 {
self.file_num += 1;
self.file = None;
self.file_confirmed = false;
self.file_is_waiting = false;
} else {
self.finished_size += offset as u64;
if !is_compressed_file(name) {
@@ -430,6 +464,30 @@ impl TransferJob {
pub fn default_overwrite_strategy(&self) -> Option<bool> {
self.default_overwrite_strategy
}
pub fn set_file_confirmed(&mut self, file_confirmed: bool) {
self.file_confirmed = file_confirmed;
}
pub fn set_file_is_waiting(&mut self, file_is_waiting: bool) {
self.file_is_waiting = file_is_waiting;
}
pub fn file_is_waiting(&self) -> bool {
self.file_is_waiting
}
pub fn file_confirmed(&self) -> bool {
self.file_confirmed
}
pub fn skip_current_file(&mut self) -> bool {
self.file.take();
self.set_file_confirmed(false);
self.set_file_is_waiting(false);
self.file_num += 1;
true
}
}
#[inline]
@@ -527,7 +585,7 @@ pub async fn handle_read_jobs(
) -> ResultType<()> {
let mut finished = Vec::new();
for job in jobs.iter_mut() {
match job.read().await {
match job.read(stream).await {
Err(err) => {
stream
.send(&new_error(job.id(), err, job.file_num()))
@@ -537,8 +595,10 @@ pub async fn handle_read_jobs(
stream.send(&new_block(block)).await?;
}
Ok(None) => {
finished.push(job.id());
stream.send(&new_done(job.id(), job.file_num())).await?;
if !job.file_confirmed && !job.file_is_waiting {
finished.push(job.id());
stream.send(&new_done(job.id(), job.file_num())).await?;
}
}
}
}