From 1da3132392e4db48950cbfbb671b84cc3884f710 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Fri, 4 Oct 2024 16:37:39 -0400 Subject: [PATCH] MVP of limit-based fd_manager --- .../app_store/app_store/src/http_api.rs | 5 +- .../packages/app_store/app_store/src/lib.rs | 4 +- kinode/src/fd_manager.rs | 170 +++++++----------- kinode/src/main.rs | 8 +- kinode/src/net/mod.rs | 49 +++-- kinode/src/net/types.rs | 27 +-- kinode/src/net/utils.rs | 18 +- kinode/src/vfs.rs | 83 ++++----- lib/src/core.rs | 31 ++-- 9 files changed, 180 insertions(+), 215 deletions(-) diff --git a/kinode/packages/app_store/app_store/src/http_api.rs b/kinode/packages/app_store/app_store/src/http_api.rs index edc088d4..3788d737 100644 --- a/kinode/packages/app_store/app_store/src/http_api.rs +++ b/kinode/packages/app_store/app_store/src/http_api.rs @@ -474,7 +474,10 @@ fn serve_paths( &our.node().to_string(), ) { Ok(_) => { - println!("successfully installed package: {:?}", process_package_id); + println!( + "successfully installed {}:{}", + process_package_id.package_name, process_package_id.publisher_node + ); Ok((StatusCode::CREATED, None, vec![])) } Err(e) => Ok(( diff --git a/kinode/packages/app_store/app_store/src/lib.rs b/kinode/packages/app_store/app_store/src/lib.rs index bcfeb7a1..f3b5731b 100644 --- a/kinode/packages/app_store/app_store/src/lib.rs +++ b/kinode/packages/app_store/app_store/src/lib.rs @@ -261,8 +261,8 @@ fn handle_local_request( match utils::install(&package_id, metadata, &version_hash, state, &our.node) { Ok(()) => { println!( - "successfully installed package: {:?}", - &package_id.to_process_lib() + "successfully installed {}:{}", + package_id.package_name, package_id.publisher_node ); LocalResponse::InstallResponse(InstallResponse::Success) } diff --git a/kinode/src/fd_manager.rs b/kinode/src/fd_manager.rs index ad8590f0..4b597038 100644 --- a/kinode/src/fd_manager.rs +++ b/kinode/src/fd_manager.rs @@ -1,5 +1,5 @@ use lib::types::core::{ - Address, FdManagerError, FdManagerRequest, FdManagerResponse, KernelMessage, Message, + Address, FdManagerError, FdManagerRequest, FdManagerResponse, FdsLimit, KernelMessage, Message, MessageReceiver, MessageSender, PrintSender, Printout, ProcessId, Request, FD_MANAGER_PROCESS_ID, }; @@ -8,14 +8,12 @@ use std::{collections::HashMap, sync::Arc}; const DEFAULT_MAX_OPEN_FDS: u64 = 180; const DEFAULT_FDS_AS_FRACTION_OF_ULIMIT_PERCENTAGE: u64 = 60; const DEFAULT_UPDATE_ULIMIT_SECS: u64 = 3600; -const DEFAULT_CULL_FRACTION_DENOMINATOR: u64 = 2; +const _DEFAULT_CULL_FRACTION_DENOMINATOR: u64 = 2; struct State { - fds: HashMap, + fds_limits: HashMap, mode: Mode, - total_fds: u64, max_fds: u64, - cull_fraction_denominator: u64, } enum Mode { @@ -35,14 +33,12 @@ impl State { fn default(static_max_fds: Option) -> Self { Self { - fds: HashMap::new(), + fds_limits: HashMap::new(), mode: Mode::default(static_max_fds), - total_fds: 0, max_fds: match static_max_fds { Some(max) => max, None => DEFAULT_MAX_OPEN_FDS, }, - cull_fraction_denominator: DEFAULT_CULL_FRACTION_DENOMINATOR, } } @@ -91,11 +87,11 @@ pub async fn fd_manager( }; tokio::time::interval(tokio::time::Duration::from_secs(*update_ulimit_secs)) }; - let our_node = our_node.as_str(); loop { tokio::select! { Some(message) = recv_from_loop.recv() => { match handle_message( + &our_node, message, &mut interval, &mut state, @@ -120,23 +116,11 @@ pub async fn fd_manager( } } } - - if state.total_fds >= state.max_fds { - Printout::new( - 2, - format!( - "Have {} open >= {} max fds; sending Cull Request...", - state.total_fds, state.max_fds, - ), - ) - .send(&send_to_terminal) - .await; - send_cull(our_node, &send_to_loop, &state).await; - } } } async fn handle_message( + our_node: &str, km: KernelMessage, _interval: &mut tokio::time::Interval, state: &mut State, @@ -153,56 +137,37 @@ async fn handle_message( let request: FdManagerRequest = serde_json::from_slice(&body).map_err(|_e| FdManagerError::BadRequest)?; let return_value = match request { - FdManagerRequest::OpenFds { number_opened } => { - state.total_fds += number_opened; - state - .fds - .entry(km.source.process) - .and_modify(|e| *e += number_opened) - .or_insert(number_opened); + FdManagerRequest::RequestFdsLimit => { + // divide max_fds by number of processes requesting fds limits, + // then send each process its new limit + // TODO can weight different processes differently + let per_process_limit = state.max_fds / (state.fds_limits.len() + 1) as u64; + state.fds_limits.insert( + km.source.process, + FdsLimit { + limit: per_process_limit, + hit_count: 0, + }, + ); + state.fds_limits.iter_mut().for_each(|(_process, limit)| { + limit.limit = per_process_limit; + limit.hit_count = 0; + }); + send_all_fds_limits(our_node, send_to_loop, state).await; + None } - FdManagerRequest::CloseFds { mut number_closed } => { - if !state.fds.contains_key(&km.source.process) { - return Err(anyhow::anyhow!( - "{} attempted to CloseFds {} but does not have any open!", - km.source.process, - number_closed, - )); - } - let mut return_value = Some(format!( - "{} closed {} of {} total;", - km.source.process, number_closed, state.total_fds, - )); - if state.total_fds < number_closed { - return_value.as_mut().unwrap().push_str(&format!( - " !!process claims to have closed more fds ({}) than we have open for all processes ({})!!", - number_closed, - state.total_fds, - )); - state.total_fds = 0; - } else { - state.total_fds -= number_closed; - } - state.fds.entry(km.source.process).and_modify(|e| { - if e < &mut number_closed { - return_value.as_mut().unwrap().push_str(&format!( - " !!process claims to have closed more fds ({}) than it had open: {}!!", - number_closed, e, - )); - *e = 0; - } else { - *e -= number_closed; - } - return_value - .as_mut() - .unwrap() - .push_str(&format!(" ({e} left to process after close)")); + FdManagerRequest::FdsLimitHit => { + // sender process hit its fd limit + // TODO react to this + state.fds_limits.get_mut(&km.source.process).map(|limit| { + limit.hit_count += 1; }); - return_value + Some(format!("{} hit its fd limit", km.source.process)) } - FdManagerRequest::Cull { .. } => { - return Err(FdManagerError::FdManagerWasSentCull.into()); + FdManagerRequest::FdsLimit(_) => { + // should only send this, never receive it + return Err(FdManagerError::FdManagerWasSentLimit.into()); } FdManagerRequest::UpdateMaxFdsAsFractionOfUlimitPercentage(new) => { match state.mode { @@ -224,8 +189,8 @@ async fn handle_message( } None } - FdManagerRequest::UpdateCullFractionDenominator(new) => { - state.cull_fraction_denominator = new; + FdManagerRequest::UpdateCullFractionDenominator(_new) => { + // state.cull_fraction_denominator = new; None } FdManagerRequest::GetState => { @@ -237,7 +202,7 @@ async fn handle_message( .message(Message::Response(( lib::core::Response { body: serde_json::to_vec(&FdManagerResponse::GetState( - state.fds.clone(), + state.fds_limits.clone(), )) .unwrap(), inherit: false, @@ -253,7 +218,7 @@ async fn handle_message( } None } - FdManagerRequest::GetProcessFdCount(process) => { + FdManagerRequest::GetProcessFdLimit(process) => { if expects_response.is_some() { KernelMessage::builder() .id(km.id) @@ -261,8 +226,12 @@ async fn handle_message( .target(km.rsvp.unwrap_or(km.source)) .message(Message::Response(( lib::core::Response { - body: serde_json::to_vec(&FdManagerResponse::GetProcessFdCount( - *state.fds.get(&process).unwrap_or(&0), + body: serde_json::to_vec(&FdManagerResponse::GetProcessFdLimit( + state + .fds_limits + .get(&process) + .map(|limit| limit.limit) + .unwrap_or(0), )) .unwrap(), inherit: false, @@ -289,23 +258,19 @@ async fn update_max_fds(state: &mut State) -> anyhow::Result<()> { Ok(()) } -async fn send_cull(our_node: &str, send_to_loop: &MessageSender, state: &State) { - let message = Message::Request(Request { - inherit: false, - expects_response: None, - body: serde_json::to_vec(&FdManagerRequest::Cull { - cull_fraction_denominator: state.cull_fraction_denominator.clone(), - }) - .unwrap(), - metadata: None, - capabilities: vec![], - }); - for process_id in state.fds.keys() { +async fn send_all_fds_limits(our_node: &str, send_to_loop: &MessageSender, state: &State) { + for (process_id, limit) in &state.fds_limits { KernelMessage::builder() .id(rand::random()) .source((our_node, FD_MANAGER_PROCESS_ID.clone())) .target((our_node, process_id.clone())) - .message(message.clone()) + .message(Message::Request(Request { + inherit: false, + expects_response: None, + body: serde_json::to_vec(&FdManagerRequest::FdsLimit(limit.limit)).unwrap(), + metadata: None, + capabilities: vec![], + })) .build() .unwrap() .send(send_to_loop) @@ -327,43 +292,29 @@ fn get_max_fd_limit() -> anyhow::Result { } } -pub async fn send_fd_manager_open( - our: &Address, - number_opened: u64, - send_to_loop: &MessageSender, -) -> anyhow::Result<()> { +pub async fn send_fd_manager_request_fds_limit(our: &Address, send_to_loop: &MessageSender) { let message = Message::Request(Request { inherit: false, expects_response: None, - body: serde_json::to_vec(&FdManagerRequest::OpenFds { number_opened }).unwrap(), + body: serde_json::to_vec(&FdManagerRequest::RequestFdsLimit).unwrap(), metadata: None, capabilities: vec![], }); - send_to_fd_manager(our, message, send_to_loop).await?; - Ok(()) + send_to_fd_manager(our, message, send_to_loop).await } -pub async fn send_fd_manager_close( - our: &Address, - number_closed: u64, - send_to_loop: &MessageSender, -) -> anyhow::Result<()> { +pub async fn send_fd_manager_hit_fds_limit(our: &Address, send_to_loop: &MessageSender) { let message = Message::Request(Request { inherit: false, expects_response: None, - body: serde_json::to_vec(&FdManagerRequest::CloseFds { number_closed }).unwrap(), + body: serde_json::to_vec(&FdManagerRequest::FdsLimitHit).unwrap(), metadata: None, capabilities: vec![], }); - send_to_fd_manager(our, message, send_to_loop).await?; - Ok(()) + send_to_fd_manager(our, message, send_to_loop).await } -async fn send_to_fd_manager( - our: &Address, - message: Message, - send_to_loop: &MessageSender, -) -> anyhow::Result<()> { +async fn send_to_fd_manager(our: &Address, message: Message, send_to_loop: &MessageSender) { KernelMessage::builder() .id(rand::random()) .source(our.clone()) @@ -372,6 +323,5 @@ async fn send_to_fd_manager( .build() .unwrap() .send(send_to_loop) - .await; - Ok(()) + .await } diff --git a/kinode/src/main.rs b/kinode/src/main.rs index 100b2a99..1b97f9fe 100644 --- a/kinode/src/main.rs +++ b/kinode/src/main.rs @@ -49,8 +49,8 @@ const WS_MIN_PORT: u16 = 9_000; const TCP_MIN_PORT: u16 = 10_000; const MAX_PORT: u16 = 65_535; -const DEFAULT_MAX_PEERS: u32 = 32; -const DEFAULT_MAX_PASSTHROUGHS: u32 = 0; +const DEFAULT_MAX_PEERS: u64 = 32; +const DEFAULT_MAX_PASSTHROUGHS: u64 = 0; /// default routers as a eth-provider fallback const DEFAULT_ETH_PROVIDERS: &str = include_str!("eth/default_providers_mainnet.json"); @@ -358,10 +358,10 @@ async fn main() { net_message_receiver, *matches.get_one::("reveal-ip").unwrap_or(&true), *matches - .get_one::("max-peers") + .get_one::("max-peers") .unwrap_or(&DEFAULT_MAX_PEERS), *matches - .get_one::("max-passthroughs") + .get_one::("max-passthroughs") .unwrap_or(&DEFAULT_MAX_PASSTHROUGHS), )); tasks.spawn(state::state_sender( diff --git a/kinode/src/net/mod.rs b/kinode/src/net/mod.rs index 8a1aab10..52bc2216 100644 --- a/kinode/src/net/mod.rs +++ b/kinode/src/net/mod.rs @@ -1,6 +1,9 @@ -use lib::types::core::{ - Identity, KernelMessage, MessageReceiver, MessageSender, NetAction, NetResponse, - NetworkErrorSender, NodeRouting, PrintSender, +use lib::{ + core::Address, + types::core::{ + Identity, KernelMessage, MessageReceiver, MessageSender, NetAction, NetResponse, + NetworkErrorSender, NodeRouting, PrintSender, NET_PROCESS_ID, + }, }; use types::{ ActivePassthroughs, IdentityExt, NetData, OnchainPKI, Peers, PendingPassthroughs, TCP_PROTOCOL, @@ -34,10 +37,16 @@ pub async fn networking( kernel_message_rx: MessageReceiver, // only used if indirect -- TODO use _reveal_ip: bool, - max_peers: u32, + max_peers: u64, // only used by routers - max_passthroughs: u32, + max_passthroughs: u64, ) -> anyhow::Result<()> { + crate::fd_manager::send_fd_manager_request_fds_limit( + &Address::new(&our.name, NET_PROCESS_ID.clone()), + &kernel_message_tx, + ) + .await; + let ext = IdentityExt { our: Arc::new(our), our_ip: Arc::new(our_ip), @@ -62,6 +71,7 @@ pub async fn networking( active_passthroughs, max_peers, max_passthroughs, + fds_limit: 100, // TODO blocking request to fd_manager to get max num of fds at boot }; let mut tasks = JoinSet::>::new(); @@ -116,12 +126,12 @@ pub async fn networking( async fn local_recv( ext: IdentityExt, mut kernel_message_rx: MessageReceiver, - data: NetData, + mut data: NetData, ) -> anyhow::Result<()> { while let Some(km) = kernel_message_rx.recv().await { if km.target.node == ext.our.name { // handle messages sent to us - handle_message(&ext, km, &data).await; + handle_message(&ext, km, &mut data).await; } else { connect::send_to_peer(&ext, &data, km).await; } @@ -129,7 +139,7 @@ async fn local_recv( Err(anyhow::anyhow!("net: kernel message channel was dropped")) } -async fn handle_message(ext: &IdentityExt, km: KernelMessage, data: &NetData) { +async fn handle_message(ext: &IdentityExt, km: KernelMessage, data: &mut NetData) { match &km.message { lib::core::Message::Request(request) => handle_request(ext, &km, &request.body, data).await, lib::core::Message::Response((response, _context)) => { @@ -142,7 +152,7 @@ async fn handle_request( ext: &IdentityExt, km: &KernelMessage, request_body: &[u8], - data: &NetData, + data: &mut NetData, ) { if km.source.node == ext.our.name { handle_local_request(ext, km, request_body, data).await; @@ -158,7 +168,7 @@ async fn handle_local_request( ext: &IdentityExt, km: &KernelMessage, request_body: &[u8], - data: &NetData, + data: &mut NetData, ) { match rmp_serde::from_slice::(request_body) { Err(_e) => { @@ -322,7 +332,7 @@ async fn handle_local_request( } } -async fn handle_fdman(km: &KernelMessage, request_body: &[u8], data: &NetData) { +async fn handle_fdman(km: &KernelMessage, request_body: &[u8], data: &mut NetData) { if km.source.process != *lib::core::FD_MANAGER_PROCESS_ID { return; } @@ -330,12 +340,19 @@ async fn handle_fdman(km: &KernelMessage, request_body: &[u8], data: &NetData) { return; }; match req { - lib::core::FdManagerRequest::Cull { - cull_fraction_denominator, - } => { - // we are requested to cull a fraction of our peers! + lib::core::FdManagerRequest::FdsLimit(fds_limit) => { + data.fds_limit = fds_limit; + if data.max_peers > fds_limit { + data.max_peers = fds_limit; + } + // TODO combine with max_peers check + if data.max_passthroughs > fds_limit { + data.max_passthroughs = fds_limit; + } // TODO cull passthroughs too? - data.peers.cull(cull_fraction_denominator).await; + if data.peers.peers().len() >= data.fds_limit as usize { + data.peers.cull(2).await; + } } _ => return, } diff --git a/kinode/src/net/types.rs b/kinode/src/net/types.rs index 1212ad30..67cb1285 100644 --- a/kinode/src/net/types.rs +++ b/kinode/src/net/types.rs @@ -1,6 +1,6 @@ -use crate::net::utils; use lib::types::core::{ - Identity, KernelMessage, MessageSender, NetworkErrorSender, NodeId, PrintSender, + Address, Identity, KernelMessage, MessageSender, NetworkErrorSender, NodeId, PrintSender, + NET_PROCESS_ID, }; use { dashmap::DashMap, @@ -57,13 +57,13 @@ pub struct RoutingRequest { #[derive(Clone)] pub struct Peers { - max_peers: u32, + max_peers: u64, send_to_loop: MessageSender, peers: Arc>, } impl Peers { - pub fn new(max_peers: u32, send_to_loop: MessageSender) -> Self { + pub fn new(max_peers: u64, send_to_loop: MessageSender) -> Self { Self { max_peers, send_to_loop, @@ -103,13 +103,15 @@ impl Peers { .key() .clone(); self.peers.remove(&oldest); - } else { - utils::send_fd_manager_open(1, &self.send_to_loop).await; + crate::fd_manager::send_fd_manager_hit_fds_limit( + &Address::new("our", NET_PROCESS_ID.clone()), + &self.send_to_loop, + ) + .await; } } pub async fn remove(&self, name: &str) -> Option<(String, Peer)> { - utils::send_fd_manager_close(1, &self.send_to_loop).await; self.peers.remove(name) } @@ -123,7 +125,11 @@ impl Peers { for peer in to_remove { self.peers.remove(&peer.identity.name); } - utils::send_fd_manager_close(num_to_remove as u64, &self.send_to_loop).await; + crate::fd_manager::send_fd_manager_hit_fds_limit( + &Address::new("our", NET_PROCESS_ID.clone()), + &self.send_to_loop, + ) + .await; } } @@ -217,6 +223,7 @@ pub struct NetData { pub pending_passthroughs: PendingPassthroughs, /// only used by routers pub active_passthroughs: ActivePassthroughs, - pub max_peers: u32, - pub max_passthroughs: u32, + pub max_peers: u64, + pub max_passthroughs: u64, + pub fds_limit: u64, } diff --git a/kinode/src/net/utils.rs b/kinode/src/net/utils.rs index 592cf125..8c9d108e 100644 --- a/kinode/src/net/utils.rs +++ b/kinode/src/net/utils.rs @@ -3,9 +3,9 @@ use crate::net::types::{ RoutingRequest, TCP_PROTOCOL, WS_PROTOCOL, }; use lib::types::core::{ - Address, Identity, KernelMessage, KnsUpdate, Message, MessageSender, NetAction, - NetworkErrorSender, NodeId, NodeRouting, PrintSender, Printout, Request, Response, SendError, - SendErrorKind, WrappedSendError, NET_PROCESS_ID, + Identity, KernelMessage, KnsUpdate, Message, MessageSender, NetAction, NetworkErrorSender, + NodeId, NodeRouting, PrintSender, Printout, Request, Response, SendError, SendErrorKind, + WrappedSendError, }; use { futures::{SinkExt, StreamExt}, @@ -427,18 +427,6 @@ pub async fn parse_hello_message( .await; } -/// Send an OpenFds message to the fd_manager. -pub async fn send_fd_manager_open(num_opened: u64, kernel_message_tx: &MessageSender) { - let our: Address = Address::new("our", NET_PROCESS_ID.clone()); - let _ = crate::fd_manager::send_fd_manager_open(&our, num_opened, kernel_message_tx).await; -} - -/// Send a CloseFds message to the fd_manager. -pub async fn send_fd_manager_close(num_closed: u64, kernel_message_tx: &MessageSender) { - let our: Address = Address::new("our", NET_PROCESS_ID.clone()); - let _ = crate::fd_manager::send_fd_manager_close(&our, num_closed, kernel_message_tx).await; -} - /// Create a terminal printout at verbosity level 0. pub async fn print_loud(print_tx: &PrintSender, content: &str) { Printout::new(0, content).send(print_tx).await; diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index f9362aff..87f65d30 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -51,12 +51,14 @@ pub async fn vfs( let files = Files::new( Address::new(our_node.as_str(), VFS_PROCESS_ID.clone()), - send_to_loop.clone(), + send_to_loop, ); let process_queues: HashMap>>> = HashMap::default(); + crate::fd_manager::send_fd_manager_request_fds_limit(&files.our, &files.send_to_loop).await; + while let Some(km) = recv_from_loop.recv().await { if *our_node != km.source.node { Printout::new( @@ -72,10 +74,10 @@ pub async fn vfs( } if km.source.process == *FD_MANAGER_PROCESS_ID { - let files = files.clone(); + let mut files = files.clone(); let send_to_terminal = send_to_terminal.clone(); tokio::spawn(async move { - if let Err(e) = handle_fd_request(km, files).await { + if let Err(e) = handle_fd_request(km, &mut files).await { Printout::new( 1, format!("vfs: got request from fd_manager that errored: {e:?}"), @@ -99,9 +101,8 @@ pub async fn vfs( // Clone Arcs for the new task let our_node = our_node.clone(); - let send_to_loop = send_to_loop.clone(); let send_to_caps_oracle = send_to_caps_oracle.clone(); - let files = files.clone(); + let mut files = files.clone(); let vfs_path = vfs_path.clone(); tokio::spawn(async move { @@ -110,15 +111,8 @@ pub async fn vfs( let (km_id, km_rsvp) = (km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone())); - if let Err(e) = handle_request( - &our_node, - km, - files, - &send_to_loop, - &send_to_caps_oracle, - &vfs_path, - ) - .await + if let Err(e) = + handle_request(&our_node, km, &mut files, &send_to_caps_oracle, &vfs_path).await { KernelMessage::builder() .id(km_id) @@ -135,7 +129,7 @@ pub async fn vfs( ))) .build() .unwrap() - .send(&send_to_loop) + .send(&files.send_to_loop) .await; } } @@ -153,8 +147,9 @@ struct Files { cursor_positions: Arc>, /// access order of files access_order: Arc>>, - our: Address, - send_to_loop: MessageSender, + pub our: Address, + pub send_to_loop: MessageSender, + pub fds_limit: u64, } struct FileEntry { @@ -170,6 +165,7 @@ impl Files { access_order: Arc::new(Mutex::new(UniqueQueue::new())), our, send_to_loop, + fds_limit: 100, // TODO blocking request to fd_manager to get max num of fds at boot } } @@ -200,22 +196,19 @@ impl Files { }, ); self.update_access_order(&path).await; - crate::fd_manager::send_fd_manager_open(&self.our, 1, &self.send_to_loop) - .await - .map_err(|e| VfsError::Other { - error: e.to_string(), - })?; + + // if open files >= fds_limit, close the (limit/2) least recently used files + if self.open_files.len() as u64 >= self.fds_limit { + crate::fd_manager::send_fd_manager_hit_fds_limit(&self.our, &self.send_to_loop).await; + self.close_least_recently_used_files(self.fds_limit / 2) + .await?; + } + Ok(file) } async fn remove_file(&self, path: &Path) -> Result<(), VfsError> { - if self.open_files.remove(path).is_some() { - crate::fd_manager::send_fd_manager_close(&self.our, 1, &self.send_to_loop) - .await - .map_err(|e| VfsError::Other { - error: e.to_string(), - })?; - } + self.open_files.remove(path); Ok(()) } @@ -249,11 +242,6 @@ impl Files { break; // no more files to close } } - crate::fd_manager::send_fd_manager_close(&self.our, closed, &self.send_to_loop) - .await - .map_err(|e| VfsError::Other { - error: e.to_string(), - })?; Ok(()) } @@ -290,8 +278,7 @@ impl Files { async fn handle_request( our_node: &str, km: KernelMessage, - files: Files, - send_to_loop: &MessageSender, + files: &mut Files, send_to_caps_oracle: &CapMessageSender, vfs_path: &PathBuf, ) -> Result<(), VfsError> { @@ -347,7 +334,7 @@ async fn handle_request( ))) .build() .unwrap() - .send(send_to_loop) + .send(&files.send_to_loop) .await; return Ok(()); } else { @@ -661,7 +648,7 @@ async fn handle_request( })) .build() .unwrap() - .send(send_to_loop) + .send(&files.send_to_loop) .await; } @@ -1030,7 +1017,7 @@ fn join_paths_safely(base: &PathBuf, extension: &str) -> PathBuf { base.join(extension_path) } -async fn handle_fd_request(km: KernelMessage, files: Files) -> anyhow::Result<()> { +async fn handle_fd_request(km: KernelMessage, files: &mut Files) -> anyhow::Result<()> { let Message::Request(Request { body, .. }) = km.message else { return Err(anyhow::anyhow!("not a request")); }; @@ -1038,13 +1025,17 @@ async fn handle_fd_request(km: KernelMessage, files: Files) -> anyhow::Result<() let request: FdManagerRequest = serde_json::from_slice(&body)?; match request { - FdManagerRequest::Cull { - cull_fraction_denominator, - } => { - let fraction_to_close = files.open_files.len() as u64 / cull_fraction_denominator; - files - .close_least_recently_used_files(fraction_to_close) - .await?; + FdManagerRequest::FdsLimit(fds_limit) => { + files.fds_limit = fds_limit; + if files.open_files.len() as u64 >= fds_limit { + crate::fd_manager::send_fd_manager_hit_fds_limit(&files.our, &files.send_to_loop) + .await; + files + .close_least_recently_used_files( + (files.open_files.len() as u64 - fds_limit) / 2, + ) + .await?; + } } _ => { return Err(anyhow::anyhow!("non-Cull FdManagerRequest")); diff --git a/lib/src/core.rs b/lib/src/core.rs index 27472fa7..f3fb16ce 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -2077,12 +2077,15 @@ impl KnsUpdate { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum FdManagerRequest { /// other process -> fd_manager - OpenFds { number_opened: u64 }, + /// must send this to fd_manager to get an initial fds_limit + RequestFdsLimit, /// other process -> fd_manager - CloseFds { number_closed: u64 }, + /// send this to notify fd_manager that limit was hit, + /// which may or may not be reacted to + FdsLimitHit, /// fd_manager -> other process - Cull { cull_fraction_denominator: u64 }, + FdsLimit(u64), /// administrative UpdateMaxFdsAsFractionOfUlimitPercentage(u64), @@ -2091,18 +2094,24 @@ pub enum FdManagerRequest { /// administrative UpdateCullFractionDenominator(u64), - /// get a `HashMap` of all `ProcessId`s to their known number of open file descriptors. + /// get a `HashMap` of all `ProcessId`s to their number of allocated file descriptors. GetState, - /// get the `u64` known number of file descriptors used by `ProcessId`. - GetProcessFdCount(ProcessId), + /// get the `u64` number of file descriptors allocated to `ProcessId`. + GetProcessFdLimit(ProcessId), } #[derive(Debug, Serialize, Deserialize)] pub enum FdManagerResponse { /// response to [`FdManagerRequest::GetState`] - GetState(HashMap), - /// response to [`FdManagerRequest::GetProcessFdCount`] - GetProcessFdCount(u64), + GetState(HashMap), + /// response to [`FdManagerRequest::GetProcessFdLimit`] + GetProcessFdLimit(u64), +} + +#[derive(Copy, Clone, Debug, Serialize, Deserialize)] +pub struct FdsLimit { + pub limit: u64, + pub hit_count: u64, } #[derive(Debug, Error)] @@ -2111,6 +2120,6 @@ pub enum FdManagerError { NotARequest, #[error("fd_manager: received a non-FdManangerRequest")] BadRequest, - #[error("fd_manager: received a FdManagerRequest::Cull, but I am the one who culls")] - FdManagerWasSentCull, + #[error("fd_manager: received a FdManagerRequest::FdsLimit, but I am the one who sets limits")] + FdManagerWasSentLimit, }