wire fdmanager cull up to net

This commit is contained in:
dr-frmr 2024-09-30 14:57:13 -04:00
parent f69c3d616e
commit f81af42461
No known key found for this signature in database
3 changed files with 50 additions and 8 deletions

View File

@ -48,6 +48,10 @@ const VERSION: &str = env!("CARGO_PKG_VERSION");
const WS_MIN_PORT: u16 = 9_000;
const TCP_MIN_PORT: u16 = 10_000;
const MAX_PORT: u16 = 65_535;
const DEFAULT_MAX_PEERS: u32 = 32;
const DEFAULT_MAX_PASSTHROUGHS: u32 = 0;
/// default routers as a eth-provider fallback
const DEFAULT_ETH_PROVIDERS: &str = include_str!("eth/default_providers_mainnet.json");
#[cfg(not(feature = "simulation-mode"))]
@ -353,8 +357,12 @@ async fn main() {
print_sender.clone(),
net_message_receiver,
*matches.get_one::<bool>("reveal-ip").unwrap_or(&true),
*matches.get_one::<u32>("max-peers").unwrap_or(&100),
*matches.get_one::<u32>("max-passthroughs").unwrap_or(&0),
*matches
.get_one::<u32>("max-peers")
.unwrap_or(&DEFAULT_MAX_PEERS),
*matches
.get_one::<u32>("max-passthroughs")
.unwrap_or(&DEFAULT_MAX_PASSTHROUGHS),
));
tasks.spawn(state::state_sender(
our_name_arc.clone(),
@ -699,7 +707,7 @@ fn build_command() -> Command {
.value_parser(value_parser!(u64)),
)
.arg(
arg!(--"max-peers" <MAX_PEERS> "Maximum number of peers to hold active connections with (default 100)")
arg!(--"max-peers" <MAX_PEERS> "Maximum number of peers to hold active connections with (default 32)")
.value_parser(value_parser!(u32)),
)
.arg(

View File

@ -167,7 +167,8 @@ async fn handle_local_request(
) {
match rmp_serde::from_slice::<NetAction>(request_body) {
Err(_e) => {
// ignore
// only other possible message is from fd_manager -- handle here
handle_fdman(km, request_body, data).await;
}
Ok(NetAction::ConnectionRequest(_)) => {
// we shouldn't get these locally, ignore
@ -222,10 +223,12 @@ async fn handle_local_request(
));
}
printout.push_str(&format!(
"we allow {} max passthroughs\r\n",
data.max_passthroughs
));
if data.max_passthroughs > 0 {
printout.push_str(&format!(
"we allow {} max passthroughs\r\n",
data.max_passthroughs
));
}
if !data.pending_passthroughs.is_empty() {
printout.push_str(&format!(
@ -324,6 +327,25 @@ async fn handle_local_request(
}
}
async fn handle_fdman(km: &KernelMessage, request_body: &[u8], data: &NetData) {
if km.source.process != *lib::core::FD_MANAGER_PROCESS_ID {
return;
}
let Ok(req) = rmp_serde::from_slice::<lib::core::FdManagerRequest>(request_body) else {
return;
};
match req {
lib::core::FdManagerRequest::Cull {
cull_fraction_denominator,
} => {
// we are requested to cull a fraction of our peers!
// TODO cull passthroughs too?
data.peers.cull(cull_fraction_denominator);
}
_ => return,
}
}
async fn handle_remote_request(
ext: &IdentityExt,
km: &KernelMessage,

View File

@ -100,6 +100,18 @@ impl Peers {
pub fn remove(&self, name: &str) -> Option<(String, Peer)> {
self.peers.remove(name)
}
/// close the (peer count / fraction) oldest connections
pub fn cull(&self, fraction: u64) {
let num_to_remove = (self.peers.len() as f64 / fraction as f64).ceil() as usize;
let mut to_remove = Vec::with_capacity(num_to_remove);
let mut sorted_peers: Vec<_> = self.peers.iter().collect();
sorted_peers.sort_by_key(|p| p.last_message);
to_remove.extend(sorted_peers.iter().take(num_to_remove));
for peer in to_remove {
self.peers.remove(&peer.identity.name);
}
}
}
pub type OnchainPKI = Arc<DashMap<String, Identity>>;