fix: in net, use AtomicU64 in Peers to respect fd_manager limits

This commit is contained in:
dr-frmr 2024-10-11 14:01:32 -04:00
parent d1b8aea0c9
commit 6148d2df07
No known key found for this signature in database
3 changed files with 57 additions and 33 deletions

View File

@ -6,25 +6,40 @@ use tokio::sync::mpsc;
/// if target is a peer, queue to be routed
/// otherwise, create peer and initiate routing
pub async fn send_to_peer(ext: &IdentityExt, data: &NetData, km: KernelMessage) {
pub async fn send_to_peer(ext: &IdentityExt, data: &NetData, mut km: KernelMessage) {
if let Some(mut peer) = data.peers.get_mut(&km.target.node) {
peer.sender.send(km).expect("net: peer sender was dropped");
peer.set_last_message();
} else {
let Some(peer_id) = data.pki.get(&km.target.node) else {
return utils::error_offline(km, &ext.network_error_tx).await;
};
let (mut peer, peer_rx) = Peer::new(peer_id.clone(), false);
// send message to be routed
peer.send(km);
data.peers.insert(peer_id.name.clone(), peer).await;
tokio::spawn(connect_to_peer(
ext.clone(),
data.clone(),
peer_id.clone(),
peer_rx,
));
match peer.send(km) {
Ok(()) => {
peer.set_last_message();
return;
}
Err(e_km) => {
// peer connection was closed, remove it and try to reconnect
data.peers.remove(&peer.identity.name).await;
km = e_km.0;
}
}
}
let Some(peer_id) = data.pki.get(&km.target.node) else {
return utils::error_offline(km, &ext.network_error_tx).await;
};
let (mut peer, peer_rx) = Peer::new(peer_id.clone(), false);
// send message to be routed
match peer.send(km) {
Ok(()) => {
peer.set_last_message();
}
Err(e_km) => {
return utils::error_offline(e_km.0, &ext.network_error_tx).await;
}
};
data.peers.insert(peer_id.name.clone(), peer).await;
tokio::spawn(connect_to_peer(
ext.clone(),
data.clone(),
peer_id.clone(),
peer_rx,
));
}
/// based on peer's identity, either use one of their

View File

@ -69,7 +69,6 @@ pub async fn networking(
peers,
pending_passthroughs,
active_passthroughs,
max_peers,
max_passthroughs,
fds_limit: 10, // small hardcoded limit that gets replaced by fd_manager soon after boot
};
@ -212,7 +211,7 @@ async fn handle_local_request(
printout.push_str(&format!(
"we have connections with {} peers ({} max):\r\n",
data.peers.peers().len(),
data.max_peers,
data.peers.max_peers(),
));
let now = std::time::SystemTime::now()
@ -342,16 +341,13 @@ async fn handle_fdman(km: &KernelMessage, request_body: &[u8], data: &mut NetDat
match req {
lib::core::FdManagerRequest::FdsLimit(fds_limit) => {
data.fds_limit = fds_limit;
if data.max_peers > fds_limit {
data.max_peers = fds_limit;
}
data.peers.set_max_peers(fds_limit);
// TODO combine with max_peers check
if data.max_passthroughs > fds_limit {
data.max_passthroughs = fds_limit;
}
data.max_passthroughs = fds_limit;
// TODO cull passthroughs too
if data.peers.peers().len() >= data.fds_limit as usize {
let diff = data.peers.peers().len() - data.fds_limit as usize;
println!("net: culling {diff} peers\r\n");
data.peers.cull(diff).await;
}
}

View File

@ -6,6 +6,7 @@ use {
dashmap::DashMap,
ring::signature::Ed25519KeyPair,
serde::{Deserialize, Serialize},
std::sync::atomic::AtomicU64,
std::sync::Arc,
tokio::net::TcpStream,
tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender},
@ -57,7 +58,7 @@ pub struct RoutingRequest {
#[derive(Clone)]
pub struct Peers {
max_peers: u64,
max_peers: Arc<AtomicU64>,
send_to_loop: MessageSender,
peers: Arc<DashMap<String, Peer>>,
}
@ -65,7 +66,7 @@ pub struct Peers {
impl Peers {
pub fn new(max_peers: u64, send_to_loop: MessageSender) -> Self {
Self {
max_peers,
max_peers: Arc::new(max_peers.into()),
send_to_loop,
peers: Arc::new(DashMap::new()),
}
@ -75,6 +76,15 @@ impl Peers {
&self.peers
}
pub fn max_peers(&self) -> u64 {
self.max_peers.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn set_max_peers(&self, max_peers: u64) {
self.max_peers
.store(max_peers, std::sync::atomic::Ordering::Relaxed);
}
pub fn get(&self, name: &str) -> Option<dashmap::mapref::one::Ref<'_, String, Peer>> {
self.peers.get(name)
}
@ -94,7 +104,7 @@ impl Peers {
/// remove the one with the oldest last_message.
pub async fn insert(&self, name: String, peer: Peer) {
self.peers.insert(name, peer);
if self.peers.len() > self.max_peers as usize {
if self.peers.len() as u64 > self.max_peers.load(std::sync::atomic::Ordering::Relaxed) {
let oldest = self
.peers
.iter()
@ -102,7 +112,7 @@ impl Peers {
.unwrap()
.key()
.clone();
self.peers.remove(&oldest);
self.remove(&oldest).await;
crate::fd_manager::send_fd_manager_hit_fds_limit(
&Address::new("our", NET_PROCESS_ID.clone()),
&self.send_to_loop,
@ -122,7 +132,7 @@ impl Peers {
sorted_peers.sort_by_key(|p| p.last_message);
to_remove.extend(sorted_peers.iter().take(n));
for peer in to_remove {
self.peers.remove(&peer.identity.name);
self.remove(&peer.identity.name).await;
}
crate::fd_manager::send_fd_manager_hit_fds_limit(
&Address::new("our", NET_PROCESS_ID.clone()),
@ -189,9 +199,13 @@ impl Peer {
}
/// Send a message to the peer.
pub fn send(&mut self, km: KernelMessage) {
self.sender.send(km).expect("net: peer sender was dropped");
pub fn send(
&mut self,
km: KernelMessage,
) -> Result<(), tokio::sync::mpsc::error::SendError<KernelMessage>> {
self.sender.send(km)?;
self.set_last_message();
Ok(())
}
/// Update the last message time to now.
@ -222,7 +236,6 @@ pub struct NetData {
pub pending_passthroughs: PendingPassthroughs,
/// only used by routers
pub active_passthroughs: ActivePassthroughs,
pub max_peers: u64,
pub max_passthroughs: u64,
pub fds_limit: u64,
}