file audit

Signed-off-by: 21pages <pages21@163.com>
This commit is contained in:
21pages
2022-12-18 16:17:10 +08:00
parent 8c86a82422
commit b3114d4147
7 changed files with 90 additions and 25 deletions

View File

@@ -93,7 +93,8 @@ pub struct Connection {
tx_input: std_mpsc::Sender<MessageInput>, // handle input messages
video_ack_required: bool,
peer_info: (String, String),
api_server: String,
server_audit_conn: String,
server_audit_file: String,
lr: LoginRequest,
last_recv_time: Arc<Mutex<Instant>>,
chat_unanswered: bool,
@@ -184,7 +185,8 @@ impl Connection {
tx_input,
video_ack_required: false,
peer_info: Default::default(),
api_server: "".to_owned(),
server_audit_conn: "".to_owned(),
server_audit_file: "".to_owned(),
lr: Default::default(),
last_recv_time: Arc::new(Mutex::new(Instant::now())),
chat_unanswered: false,
@@ -384,7 +386,7 @@ impl Connection {
} else {
conn.timer = time::interval_at(Instant::now() + SEC30, SEC30);
}
conn.post_audit(json!({})); // heartbeat
conn.post_conn_audit(json!({})); // heartbeat
},
Some((instant, value)) = rx_video.recv() => {
if !conn.video_ack_required {
@@ -497,7 +499,7 @@ impl Connection {
conn.on_close(&err.to_string(), false).await;
}
conn.post_audit(json!({
conn.post_conn_audit(json!({
"action": "close",
}));
log::info!("#{} connection loop exited", id);
@@ -601,7 +603,7 @@ impl Connection {
if last_recv_time.elapsed() >= H1 {
bail!("Timeout");
}
self.post_audit(json!({})); // heartbeat
self.post_conn_audit(json!({})); // heartbeat
}
}
}
@@ -650,7 +652,7 @@ impl Connection {
msg_out.set_hash(self.hash.clone());
self.send(msg_out).await;
self.get_api_server();
self.post_audit(json!({
self.post_conn_audit(json!({
"ip": addr.ip(),
"action": "new",
}));
@@ -658,17 +660,23 @@ impl Connection {
}
fn get_api_server(&mut self) {
self.api_server = crate::get_audit_server(
self.server_audit_conn = crate::get_audit_server(
Config::get_option("api-server"),
Config::get_option("custom-rendezvous-server"),
"conn".to_owned(),
);
self.server_audit_file = crate::get_audit_server(
Config::get_option("api-server"),
Config::get_option("custom-rendezvous-server"),
"file".to_owned(),
);
}
fn post_audit(&self, v: Value) {
if self.api_server.is_empty() {
fn post_conn_audit(&self, v: Value) {
if self.server_audit_conn.is_empty() {
return;
}
let url = self.api_server.clone();
let url = self.server_audit_conn.clone();
let mut v = v;
v["id"] = json!(Config::get_id());
v["uuid"] = json!(base64::encode(hbb_common::get_uuid()));
@@ -678,6 +686,41 @@ impl Connection {
});
}
fn post_file_audit(&self, action: &str, path: &str, files: Vec<(String, i64)>, info: Value) {
if self.server_audit_file.is_empty() {
return;
}
let url = self.server_audit_file.clone();
let file_num = files.len();
let mut files = files;
files.sort_by(|a, b| b.1.cmp(&a.1));
files.truncate(10);
let is_file = match action {
"send" | "receive" => files.len() == 1 && files[0].0.is_empty(),
"remove_dir" | "create_dir" => false,
"remove_file" => true,
_ => true,
};
let mut info = info;
info["ip"] = json!(self.ip.clone());
info["name"] = json!(self.lr.my_name.clone());
info["num"] = json!(file_num);
info["files"] = json!(files);
let v = json!({
"id":json!(Config::get_id()),
"uuid":json!(base64::encode(hbb_common::get_uuid())),
"Id":json!(self.inner.id),
"peer_id":json!(self.lr.my_id),
"action": action,
"path":path,
"is_file":is_file,
"info":json!(info).to_string(),
});
tokio::spawn(async move {
allow_err!(Self::post_audit_async(url, v).await);
});
}
#[inline]
async fn post_audit_async(url: String, v: Value) -> ResultType<()> {
crate::post_request(url, v.to_string(), "").await?;
@@ -695,7 +738,7 @@ impl Connection {
} else {
0
};
self.post_audit(json!({"peer": self.peer_info, "Type": conn_type}));
self.post_conn_audit(json!({"peer": self.peer_info, "Type": conn_type}));
#[allow(unused_mut)]
let mut username = crate::platform::get_active_username();
let mut res = LoginResponse::new();
@@ -1225,8 +1268,18 @@ impl Connection {
Ok(job) => {
self.send(fs::new_dir(id, path, job.files().to_vec()))
.await;
let mut files = job.files().to_owned();
self.read_jobs.push(job);
self.timer = time::interval(MILLI1);
self.post_file_audit(
"send",
&s.path,
files
.drain(..)
.map(|f| (f.name, f.size as _))
.collect(),
json!({}),
);
}
}
}
@@ -1237,7 +1290,7 @@ impl Connection {
&self.lr.version,
));
self.send_fs(ipc::FS::NewWrite {
path: r.path,
path: r.path.clone(),
id: r.id,
file_num: r.file_num,
files: r
@@ -1248,6 +1301,16 @@ impl Connection {
.collect(),
overwrite_detection: od,
});
self.post_file_audit(
"receive",
&r.path,
r.files
.to_vec()
.drain(..)
.map(|f| (f.name, f.size as _))
.collect(),
json!({}),
);
}
Some(file_action::Union::RemoveDir(d)) => {
self.send_fs(ipc::FS::RemoveDir {