try out muliticast on multiple interface

This commit is contained in:
open-trade
2022-01-14 18:16:00 +08:00
parent 07aa0898c5
commit 0a2bc1cf8a
5 changed files with 322 additions and 106 deletions

View File

@@ -519,11 +519,29 @@ pub fn get_mac() -> String {
}
async fn lan_discovery() -> ResultType<()> {
let mut socket = udp::bind_multicast(Some(get_multicast_addr()))?;
log::info!("lan discovery listener started");
let mut jobs = Vec::new();
for iface in pnet::datalink::interfaces() {
for i in 0..iface.ips.len() {
let x = iface.ips[i];
if let pnet::ipnetwork::IpNetwork::V4(v) = x {
if v.prefix() >= 16 {
jobs.push(tokio::spawn(async move {
allow_err!(lan_discovery_interface(v.ip()).await);
}));
}
}
}
}
join_all(jobs).await;
Ok(())
}
async fn lan_discovery_interface(interface: std::net::Ipv4Addr) -> ResultType<()> {
let mut socket = udp::bind_multicast(Some(get_multicast_addr()), interface)?;
log::info!("lan discovery listener started on {:?}", interface);
loop {
select! {
Some(Ok((bytes, addr))) = socket.next() => {
Some(Ok((bytes, addr))) = socket.next_timeout(1000) => {
if let Ok(msg_in) = Message::parse_from_bytes(&bytes) {
match msg_in.union {
Some(rendezvous_message::Union::peer_discovery(p)) => {
@@ -553,18 +571,30 @@ async fn lan_discovery() -> ResultType<()> {
#[tokio::main(flavor = "current_thread")]
pub async fn discover() -> ResultType<()> {
let mut socket = udp::bind_multicast(None)?;
let maddr = SocketAddr::V4(get_multicast_addr());
let mut sockets = Vec::new();
for iface in pnet::datalink::interfaces() {
iface.ips.iter().for_each(|x| {
if let pnet::ipnetwork::IpNetwork::V4(v) = x {
if v.prefix() >= 16 {
if let Ok(s) = udp::bind_multicast(None, v.ip()) {
sockets.push(s);
}
}
}
});
}
let mut msg_out = Message::new();
let peer = PeerDiscovery {
cmd: "ping".to_owned(),
..Default::default()
};
msg_out.set_peer_discovery(peer);
let maddr = SocketAddr::V4(get_multicast_addr());
socket.send(&msg_out, maddr).await?;
log::debug!("discover ping sent");
const TIMER_OUT: Duration = Duration::from_millis(100);
let mut timer = interval(TIMER_OUT);
for i in 0..sockets.len() {
let socket = &mut sockets[i];
socket.send(&msg_out, maddr).await?;
}
log::info!("discover ping sent");
let mut last_recv_time = Instant::now();
let mut last_write_time = Instant::now();
let mut last_write_n = 0;
@@ -572,8 +602,9 @@ pub async fn discover() -> ResultType<()> {
let mut peers = Vec::new();
let mac = get_mac();
loop {
select! {
Some(Ok((bytes, _))) = socket.next() => {
for i in 0..sockets.len() {
let socket = &mut sockets[i];
if let Some(Ok((bytes, _))) = socket.next_timeout(10).await {
if let Ok(msg_in) = Message::parse_from_bytes(&bytes) {
match msg_in.union {
Some(rendezvous_message::Union::peer_discovery(p)) => {
@@ -587,20 +618,18 @@ pub async fn discover() -> ResultType<()> {
_ => {}
}
}
},
_ = timer.tick() => {
if last_write_time.elapsed().as_millis() > 300 && last_write_n != peers.len() {
config::LanPeers::store(serde_json::to_string(&peers)?);
last_write_time = Instant::now();
last_write_n = peers.len();
}
if last_recv_time.elapsed().as_millis() > 3_000 {
break;
}
}
}
if last_write_time.elapsed().as_millis() > 300 && last_write_n != peers.len() {
config::LanPeers::store(serde_json::to_string(&peers)?);
last_write_time = Instant::now();
last_write_n = peers.len();
}
if last_recv_time.elapsed().as_millis() > 3_000 {
break;
}
}
log::debug!("discover ping done");
log::info!("discover ping done");
config::LanPeers::store(serde_json::to_string(&peers)?);
Ok(())
}

View File

@@ -1,4 +1,6 @@
use crate::ipc::{ConnectionTmpl, Data};
#[cfg(target_os = "macos")]
use crate::ipc::ConnectionTmpl;
use crate::ipc::Data;
use connection::{ConnInner, Connection};
use hbb_common::{
allow_err,
@@ -15,14 +17,14 @@ use hbb_common::{
};
#[cfg(target_os = "macos")]
use notify::{watcher, RecursiveMode, Watcher};
#[cfg(target_os = "macos")]
use parity_tokio_ipc::ConnectionClient;
use service::{GenericService, Service, ServiceTmpl, Subscriber};
use std::path::PathBuf;
use std::time::Duration;
use std::{
collections::HashMap,
net::SocketAddr,
sync::{Arc, Mutex, RwLock, Weak},
time::Duration,
};
mod audio_service;
@@ -261,7 +263,7 @@ pub fn check_zombie() {
}
}
drop(lock);
std::thread::sleep(std::time::Duration::from_millis(100));
std::thread::sleep(Duration::from_millis(100));
});
}
@@ -407,7 +409,7 @@ async fn sync_config_to_user(conn: &mut ConnectionTmpl<ConnectionClient>) -> Res
#[cfg(target_os = "macos")]
async fn sync_config_to_root(
conn: &mut ConnectionTmpl<ConnectionClient>,
from: PathBuf,
from: std::path::PathBuf,
) -> ResultType<()> {
allow_err!(
conn.send(&Data::SyncConfigToRootReq {