revert back from lan discovery merge

This commit is contained in:
open-trade
2022-01-10 18:05:42 +08:00
parent b400e4305f
commit 9a92b6ac4a
19 changed files with 26 additions and 701 deletions

View File

@@ -2,7 +2,7 @@ fn main() {
std::fs::create_dir_all("src/protos").unwrap();
protobuf_codegen_pure::Codegen::new()
.out_dir("src/protos")
.inputs(&["protos/rendezvous.proto", "protos/base_proto.proto", "protos/message.proto", "protos/discovery.proto"])
.inputs(&["protos/rendezvous.proto", "protos/message.proto"])
.include("protos")
.run()
.expect("Codegen failed.");

View File

@@ -1,21 +0,0 @@
syntax = "proto3";
package base;
message DisplayInfo {
sint32 x = 1;
sint32 y = 2;
int32 width = 3;
int32 height = 4;
string name = 5;
bool online = 6;
}
message PeerInfo {
string username = 1;
string hostname = 2;
string platform = 3;
repeated DisplayInfo displays = 4;
int32 current_display = 5;
bool sas_enabled = 6;
string version = 7;
}

View File

@@ -1,16 +0,0 @@
syntax = "proto3";
package discovery;
import "base_proto.proto";
message Discovery {
string id = 1;
base.PeerInfo peer = 2;
/// response port for current listening port(udp for now)
int32 port = 3;
}
message DiscoveryBack {
string id = 1;
base.PeerInfo peer = 2;
}

View File

@@ -1,8 +1,6 @@
syntax = "proto3";
package hbb;
import "base_proto.proto";
message VP9 {
bytes data = 1;
bool key = 2;
@@ -27,6 +25,15 @@ message VideoFrame {
}
}
message DisplayInfo {
sint32 x = 1;
sint32 y = 2;
int32 width = 3;
int32 height = 4;
string name = 5;
bool online = 6;
}
message PortForward {
string host = 1;
int32 port = 2;
@@ -51,10 +58,20 @@ message LoginRequest {
message ChatMessage { string text = 1; }
message PeerInfo {
string username = 1;
string hostname = 2;
string platform = 3;
repeated DisplayInfo displays = 4;
int32 current_display = 5;
bool sas_enabled = 6;
string version = 7;
}
message LoginResponse {
oneof union {
string error = 1;
base.PeerInfo peer_info = 2;
PeerInfo peer_info = 2;
}
}

View File

@@ -1,8 +1,4 @@
pub mod compress;
#[path = "./protos/base_proto.rs"]
pub mod base_proto;
#[path = "./protos/discovery.rs"]
pub mod discovery_proto;
#[path = "./protos/message.rs"]
pub mod message_proto;
#[path = "./protos/rendezvous.rs"]

View File

@@ -1,14 +0,0 @@
[package]
name = "socket_cs"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
hbb_common = { path = "../hbb_common" }
async-trait = "0.1"
[dev-dependencies]
clap = "2.33"

View File

@@ -1,90 +0,0 @@
use hbb_common::{
base_proto::PeerInfo,
discovery_proto::{Discovery as DiscoveryProto, DiscoveryBack as DiscoveryBackProto},
env_logger::*,
log, protobuf, tokio,
};
use socket_cs::{discovery::*, udp::*};
use std::env;
async fn lan_discover(port: u16, port_back: u16) {
let peer = PeerInfo {
username: "client username".to_owned(),
hostname: "client hostname".to_owned(),
..Default::default()
};
let client = DiscoveryClient::create(DiscoveryProto {
id: "client id".to_owned(),
peer: protobuf::MessageField::from_option(Some(peer)),
port: port_back as i32,
..Default::default()
})
.await
.unwrap();
client.lan_discover(port).await.unwrap();
}
async fn listen_discovery_back(port: u16) {
fn proc_peer(info: DiscoveryBackProto) {
log::info!(
"peer recived, id: {}, username: {}, hostname: {}",
info.id,
info.peer.as_ref().unwrap().username,
info.peer.as_ref().unwrap().hostname
);
}
let handlers = UdpHandlers::new().handle(
CMD_DISCOVERY_BACK.as_bytes().to_vec(),
Box::new(HandlerDiscoveryBack::new(proc_peer)),
);
let mut server = Server::create(port).unwrap();
server.start(handlers).await.unwrap();
loop {
std::thread::sleep(std::time::Duration::from_millis(1000));
}
}
async fn listen_discovery(port: u16) {
let info = DiscoveryBackProto {
id: "server id".to_owned(),
peer: protobuf::MessageField::from_option(Some(PeerInfo {
username: "server username".to_owned(),
hostname: "server hostname".to_owned(),
..Default::default()
})),
..Default::default()
};
let handlers = UdpHandlers::new().handle(
CMD_DISCOVERY.as_bytes().to_vec(),
Box::new(HandlerDiscovery::new(Some(|| true), info)),
);
let mut server = Server::create(port).unwrap();
server.start(handlers).await.unwrap();
loop {
std::thread::sleep(std::time::Duration::from_millis(1000));
}
}
#[tokio::main]
async fn main() {
init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "trace"));
let args: Vec<String> = env::args().collect();
let port_back = 9801u16;
let server_port: u16 = 9802u16;
if args.len() == 1 {
lan_discover(server_port, port_back).await;
} else if args.len() == 2 {
listen_discovery_back(port_back).await;
} else {
listen_discovery(server_port).await;
}
}

View File

@@ -1,131 +0,0 @@
use super::udp::UdpRequest;
use async_trait::async_trait;
use hbb_common::{
discovery_proto::{Discovery as DiscoveryProto, DiscoveryBack as DiscoveryBackProto},
log,
protobuf::Message,
tokio::net::UdpSocket,
ResultType,
};
use std::net::SocketAddr;
pub const CMD_DISCOVERY: &str = "discovery";
pub const CMD_DISCOVERY_BACK: &str = "discovery_back";
pub struct DiscoveryClient {
socket: UdpSocket,
send_data: Vec<u8>,
}
fn make_send_data(cmd: &str, msg: &impl Message) -> ResultType<Vec<u8>> {
let info_bytes = msg.write_to_bytes()?;
let mut send_data = cmd.as_bytes().to_vec();
send_data.push(crate::CMD_TOKEN);
send_data.extend(info_bytes);
Ok(send_data)
}
impl DiscoveryClient {
pub async fn create(info: DiscoveryProto) -> ResultType<Self> {
let addr = "0.0.0.0:0";
let socket = UdpSocket::bind(addr).await?;
log::trace!("succeeded to bind {} for discovery client", addr);
socket.set_broadcast(true)?;
log::trace!("Broadcast mode set to ON");
let send_data = make_send_data(CMD_DISCOVERY, &info)?;
Ok(Self { socket, send_data })
}
pub async fn lan_discover(&self, peer_port: u16) -> ResultType<()> {
let addr = SocketAddr::from(([255, 255, 255, 255], peer_port));
self.socket.send_to(&self.send_data, addr).await?;
Ok(())
}
}
pub struct HandlerDiscovery {
get_allow: Option<fn() -> bool>,
id: String,
send_data: Vec<u8>,
}
impl HandlerDiscovery {
pub fn new(get_allow: Option<fn() -> bool>, self_info: DiscoveryBackProto) -> Self {
let send_data = make_send_data(CMD_DISCOVERY_BACK, &self_info).unwrap();
Self {
get_allow,
id: self_info.id,
send_data,
}
}
}
#[async_trait]
impl crate::Handler<UdpRequest> for HandlerDiscovery {
async fn call(&self, request: UdpRequest) -> ResultType<()> {
let discovery = DiscoveryProto::parse_from_bytes(&request.data)?;
if discovery.id == self.id {
return Ok(());
}
let peer = discovery.peer.as_ref().take().unwrap();
log::trace!(
"received discovery query from {} {}",
peer.username,
peer.hostname
);
let allowed = self.get_allow.map_or(false, |f| f());
if !allowed {
// log::info!(
// "received discovery query from {} {} {}, but discovery is not allowed",
// request.addr,
// peer.hostname,
// peer.username
// );
return Ok(());
}
let addr = "0.0.0.0:0";
let socket = UdpSocket::bind(addr).await?;
let mut peer_addr = request.addr;
peer_addr.set_port(discovery.port as u16);
log::trace!("send self peer info to {}", peer_addr);
let send_len = self.send_data.len();
let mut cur_len = 0usize;
while cur_len < send_len {
let len = socket
.send_to(&self.send_data[cur_len..], peer_addr)
.await?;
cur_len += len;
}
log::trace!("send self peer info to {} done", peer_addr);
Ok(())
}
}
pub struct HandlerDiscoveryBack {
proc: fn(info: DiscoveryBackProto),
}
impl HandlerDiscoveryBack {
pub fn new(proc: fn(info: DiscoveryBackProto)) -> Self {
Self { proc }
}
}
#[async_trait]
impl crate::Handler<UdpRequest> for HandlerDiscoveryBack {
async fn call(&self, request: UdpRequest) -> ResultType<()> {
log::trace!("recved discover back from {}", request.addr);
let info = DiscoveryBackProto::parse_from_bytes(&request.data)?;
(self.proc)(info);
Ok(())
}
}

View File

@@ -1,12 +0,0 @@
use async_trait::async_trait;
pub use hbb_common::ResultType;
pub mod discovery;
pub mod udp;
const CMD_TOKEN: u8 = '\n' as u8;
/// Use tower::Service may be better ?
#[async_trait]
pub trait Handler<Request>: Send + Sync {
async fn call(&self, request: Request) -> ResultType<()>;
}

View File

@@ -1,187 +0,0 @@
use async_trait::async_trait;
use hbb_common::{
log,
tokio::{self, runtime::Runtime, sync::Notify, task::JoinHandle},
udp::FramedSocket,
ResultType,
};
use std::{collections::HashMap, future::Future, net::SocketAddr, sync::Arc};
/// Simple udp server
pub struct Server {
port: u16,
exit_notify: Arc<Notify>,
rt: Arc<Runtime>,
join_handler: Option<JoinHandle<()>>,
}
pub struct UdpRequest {
pub data: Vec<u8>,
pub addr: SocketAddr,
}
type UdpHandler = Box<dyn crate::Handler<UdpRequest>>;
pub struct UdpFnHandler<F>(F);
/// Handlers of udp server.
/// After udp server received data. Command should be parsed.
/// Handler will then be used to process data.
pub struct UdpHandlers {
handlers: HashMap<Vec<u8>, UdpHandler>,
}
impl Server {
pub fn create(port: u16) -> ResultType<Self> {
let rt = Arc::new(Runtime::new()?);
let exit_notify = Arc::new(Notify::new());
Ok(Self {
port,
exit_notify,
rt,
join_handler: None,
})
}
/// Start server with the handlers.
pub async fn start(&mut self, handlers: UdpHandlers) -> ResultType<()> {
let exit_notify = self.exit_notify.clone();
let addr = SocketAddr::from(([0, 0, 0, 0], self.port));
let mut server = FramedSocket::new(addr).await?;
log::trace!("succeeded to bind {} for discovery server", addr);
let rt = self.rt.clone();
let join_handler = rt.clone().spawn(async move {
let handlers = Arc::new(handlers.handlers);
loop {
tokio::select! {
_ = exit_notify.notified() => {
log::debug!("exit server graceful");
break;
}
n = server.next() => {
let handlers = handlers.clone();
let rt = rt.clone();
match n {
Some(Ok((data, addr))) => {
match data.iter().position(|x| x == &crate::CMD_TOKEN) {
Some(p) => {
rt.spawn(async move {
let cmd = data[0..p].to_vec();
match handlers.get(&cmd) {
Some(h) => {
let request = UdpRequest {data: data[p+1..].to_vec(), addr};
if let Err(_e) = h.call(request).await {
// log::error!("handle {:?} failed, {}", cmd, e);
}
}
None => {
// log::warn!("no handler for {:?}", &cmd);
}
}
});
}
None => {
// log::error!("failed to parse command token");
}
}
}
Some(Err(_e)) => {
// log::error!("recv error: {}", e)
}
None => {
log::error!("should never reach here");
}
}
}
}
}
});
self.join_handler = Some(join_handler);
Ok(())
}
pub async fn shutdonw(&mut self) {
self.exit_notify.notify_one();
if let Some(h) = self.join_handler.take() {
if let Err(e) = h.await {
log::error!("failed to join udp server, {}", e);
}
}
}
}
impl Drop for Server {
fn drop(&mut self) {
self.rt.clone().block_on(async {
self.shutdonw().await;
})
}
}
impl UdpHandlers {
pub fn new() -> Self {
Self {
handlers: HashMap::new(),
}
}
/// Insert <cmd, handler> pair.
///
/// # Example
///
/// ```rust
/// extern crate socket_cs;
/// use socket_cs::{ResultType, udp::{UdpHandlers, UdpRequest}};
/// use async_trait::async_trait;
///
/// struct SimpleHandler;
///
/// #[async_trait]
/// impl socket_cs::Handler<UdpRequest> for SimpleHandler {
/// async fn call(&self, _: UdpRequest) -> ResultType<()> {
/// Ok(())
/// }
/// }
/// async fn simple_ignore(_: UdpRequest) -> ResultType<()> {
/// Ok(())
/// }
/// let handlers = UdpHandlers::new();
///
/// handlers
/// .handle(b"cmd".to_vec(), Box::new(SimpleHandler))
/// .handle(b"cmd2".to_vec(), simple_ignore.into());
///
/// ```
///
/// **Notice** Same cmd where override the previous one.
///
pub fn handle(mut self, cmd: Vec<u8>, h: UdpHandler) -> Self {
self.handlers.insert(cmd, h);
self
}
}
/// TODO: more generice Request.
#[async_trait]
impl<F, Fut> crate::Handler<UdpRequest> for UdpFnHandler<F>
where
Fut: Future<Output = ResultType<()>> + Send,
F: Fn(UdpRequest) -> Fut + Send + Sync,
{
async fn call(&self, request: UdpRequest) -> ResultType<()> {
self.0(request).await
}
}
impl<F, Fut> From<F> for UdpHandler
where
Fut: Future<Output = ResultType<()>> + Send,
F: Fn(UdpRequest) -> Fut + Send + Sync + 'static,
{
fn from(f: F) -> Self {
Box::new(UdpFnHandler(f))
}
}