From 6148d2df076d3222e3c8883d049f1c6de91638d4 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Fri, 11 Oct 2024 14:01:32 -0400 Subject: [PATCH] fix: in net, use AtomicU64 in Peers to respect fd_manager limits --- kinode/src/net/connect.rs | 49 +++++++++++++++++++++++++-------------- kinode/src/net/mod.rs | 12 ++++------ kinode/src/net/types.rs | 29 ++++++++++++++++------- 3 files changed, 57 insertions(+), 33 deletions(-) diff --git a/kinode/src/net/connect.rs b/kinode/src/net/connect.rs index 12931a2a..b06ea7b5 100644 --- a/kinode/src/net/connect.rs +++ b/kinode/src/net/connect.rs @@ -6,25 +6,40 @@ use tokio::sync::mpsc; /// if target is a peer, queue to be routed /// otherwise, create peer and initiate routing -pub async fn send_to_peer(ext: &IdentityExt, data: &NetData, km: KernelMessage) { +pub async fn send_to_peer(ext: &IdentityExt, data: &NetData, mut km: KernelMessage) { if let Some(mut peer) = data.peers.get_mut(&km.target.node) { - peer.sender.send(km).expect("net: peer sender was dropped"); - peer.set_last_message(); - } else { - let Some(peer_id) = data.pki.get(&km.target.node) else { - return utils::error_offline(km, &ext.network_error_tx).await; - }; - let (mut peer, peer_rx) = Peer::new(peer_id.clone(), false); - // send message to be routed - peer.send(km); - data.peers.insert(peer_id.name.clone(), peer).await; - tokio::spawn(connect_to_peer( - ext.clone(), - data.clone(), - peer_id.clone(), - peer_rx, - )); + match peer.send(km) { + Ok(()) => { + peer.set_last_message(); + return; + } + Err(e_km) => { + // peer connection was closed, remove it and try to reconnect + data.peers.remove(&peer.identity.name).await; + km = e_km.0; + } + } } + let Some(peer_id) = data.pki.get(&km.target.node) else { + return utils::error_offline(km, &ext.network_error_tx).await; + }; + let (mut peer, peer_rx) = Peer::new(peer_id.clone(), false); + // send message to be routed + match peer.send(km) { + Ok(()) => { + peer.set_last_message(); + } + Err(e_km) => { + return utils::error_offline(e_km.0, &ext.network_error_tx).await; + } + }; + data.peers.insert(peer_id.name.clone(), peer).await; + tokio::spawn(connect_to_peer( + ext.clone(), + data.clone(), + peer_id.clone(), + peer_rx, + )); } /// based on peer's identity, either use one of their diff --git a/kinode/src/net/mod.rs b/kinode/src/net/mod.rs index 11226dc2..190468c4 100644 --- a/kinode/src/net/mod.rs +++ b/kinode/src/net/mod.rs @@ -69,7 +69,6 @@ pub async fn networking( peers, pending_passthroughs, active_passthroughs, - max_peers, max_passthroughs, fds_limit: 10, // small hardcoded limit that gets replaced by fd_manager soon after boot }; @@ -212,7 +211,7 @@ async fn handle_local_request( printout.push_str(&format!( "we have connections with {} peers ({} max):\r\n", data.peers.peers().len(), - data.max_peers, + data.peers.max_peers(), )); let now = std::time::SystemTime::now() @@ -342,16 +341,13 @@ async fn handle_fdman(km: &KernelMessage, request_body: &[u8], data: &mut NetDat match req { lib::core::FdManagerRequest::FdsLimit(fds_limit) => { data.fds_limit = fds_limit; - if data.max_peers > fds_limit { - data.max_peers = fds_limit; - } + data.peers.set_max_peers(fds_limit); // TODO combine with max_peers check - if data.max_passthroughs > fds_limit { - data.max_passthroughs = fds_limit; - } + data.max_passthroughs = fds_limit; // TODO cull passthroughs too if data.peers.peers().len() >= data.fds_limit as usize { let diff = data.peers.peers().len() - data.fds_limit as usize; + println!("net: culling {diff} peers\r\n"); data.peers.cull(diff).await; } } diff --git a/kinode/src/net/types.rs b/kinode/src/net/types.rs index f5119283..7b37d918 100644 --- a/kinode/src/net/types.rs +++ b/kinode/src/net/types.rs @@ -6,6 +6,7 @@ use { dashmap::DashMap, ring::signature::Ed25519KeyPair, serde::{Deserialize, Serialize}, + std::sync::atomic::AtomicU64, std::sync::Arc, tokio::net::TcpStream, tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}, @@ -57,7 +58,7 @@ pub struct RoutingRequest { #[derive(Clone)] pub struct Peers { - max_peers: u64, + max_peers: Arc, send_to_loop: MessageSender, peers: Arc>, } @@ -65,7 +66,7 @@ pub struct Peers { impl Peers { pub fn new(max_peers: u64, send_to_loop: MessageSender) -> Self { Self { - max_peers, + max_peers: Arc::new(max_peers.into()), send_to_loop, peers: Arc::new(DashMap::new()), } @@ -75,6 +76,15 @@ impl Peers { &self.peers } + pub fn max_peers(&self) -> u64 { + self.max_peers.load(std::sync::atomic::Ordering::Relaxed) + } + + pub fn set_max_peers(&self, max_peers: u64) { + self.max_peers + .store(max_peers, std::sync::atomic::Ordering::Relaxed); + } + pub fn get(&self, name: &str) -> Option> { self.peers.get(name) } @@ -94,7 +104,7 @@ impl Peers { /// remove the one with the oldest last_message. pub async fn insert(&self, name: String, peer: Peer) { self.peers.insert(name, peer); - if self.peers.len() > self.max_peers as usize { + if self.peers.len() as u64 > self.max_peers.load(std::sync::atomic::Ordering::Relaxed) { let oldest = self .peers .iter() @@ -102,7 +112,7 @@ impl Peers { .unwrap() .key() .clone(); - self.peers.remove(&oldest); + self.remove(&oldest).await; crate::fd_manager::send_fd_manager_hit_fds_limit( &Address::new("our", NET_PROCESS_ID.clone()), &self.send_to_loop, @@ -122,7 +132,7 @@ impl Peers { sorted_peers.sort_by_key(|p| p.last_message); to_remove.extend(sorted_peers.iter().take(n)); for peer in to_remove { - self.peers.remove(&peer.identity.name); + self.remove(&peer.identity.name).await; } crate::fd_manager::send_fd_manager_hit_fds_limit( &Address::new("our", NET_PROCESS_ID.clone()), @@ -189,9 +199,13 @@ impl Peer { } /// Send a message to the peer. - pub fn send(&mut self, km: KernelMessage) { - self.sender.send(km).expect("net: peer sender was dropped"); + pub fn send( + &mut self, + km: KernelMessage, + ) -> Result<(), tokio::sync::mpsc::error::SendError> { + self.sender.send(km)?; self.set_last_message(); + Ok(()) } /// Update the last message time to now. @@ -222,7 +236,6 @@ pub struct NetData { pub pending_passthroughs: PendingPassthroughs, /// only used by routers pub active_passthroughs: ActivePassthroughs, - pub max_peers: u64, pub max_passthroughs: u64, pub fds_limit: u64, }