From fea159ab2dbee45fc1f76da208c6b8628cf52ee6 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Wed, 25 Sep 2024 22:07:41 -0700 Subject: [PATCH] fd_manager: add initial runtime module --- Cargo.lock | 1 + kinode/Cargo.toml | 1 + kinode/src/fd_manager.rs | 192 +++++++++++++++++++++++++++++++++++++++ kinode/src/main.rs | 17 ++++ lib/src/core.rs | 26 ++++++ 5 files changed, 237 insertions(+) create mode 100644 kinode/src/fd_manager.rs diff --git a/Cargo.lock b/Cargo.lock index c1af03ec..b9f99d0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3665,6 +3665,7 @@ dependencies = [ "kit", "lazy_static", "lib", + "libc", "nohash-hasher", "open", "public-ip", diff --git a/kinode/Cargo.toml b/kinode/Cargo.toml index b6834c61..31c17650 100644 --- a/kinode/Cargo.toml +++ b/kinode/Cargo.toml @@ -62,6 +62,7 @@ indexmap = "2.4" jwt = "0.16" lib = { path = "../lib" } lazy_static = "1.4.0" +libc = "0.2" nohash-hasher = "0.2.0" open = "5.1.4" public-ip = "0.2.2" diff --git a/kinode/src/fd_manager.rs b/kinode/src/fd_manager.rs new file mode 100644 index 00000000..8cbf3693 --- /dev/null +++ b/kinode/src/fd_manager.rs @@ -0,0 +1,192 @@ +use lib::types::core::{ + KernelMessage, Message, MessageReceiver, MessageSender, PrintSender, + Printout, ProcessId, Request, FdManagerRequest, FdManagerError, FD_MANAGER_PROCESS_ID, +}; +use std::{ + collections::HashMap, + sync::Arc, +}; + +const DEFAULT_MAX_OPEN_FDS: u64 = 180; +const DEFAULT_FDS_AS_FRACTION_OF_ULIMIT_PERCENTAGE: u64 = 60; +const DEFAULT_UPDATE_ULIMIT_SECS: u64 = 3600; +const DEFAULT_CULL_FRACTION_DENOMINATOR: u64 = 2; + +struct State { + fds: HashMap, + mode: Mode, + total_fds: u64, + max_fds: u64, + cull_fraction_denominator: u64, +} + +enum Mode { + /// don't update the max_fds except by user input + StaticMax, + /// check the system's ulimit periodically and update max_fds accordingly + DynamicMax { + max_fds_as_fraction_of_ulimit_percentage: u64, + update_ulimit_secs: u64, + } +} + +impl State { + fn new() -> Self { + Self::default() + } + + fn default() -> Self { + Self { + fds: HashMap::new(), + mode: Mode::default(), + total_fds: 0, + max_fds: DEFAULT_MAX_OPEN_FDS, + cull_fraction_denominator: DEFAULT_CULL_FRACTION_DENOMINATOR, + } + } + + fn update_max_fds_from_ulimit(&mut self, ulimit_max_fds: u64) { + let Mode::DynamicMax { ref max_fds_as_fraction_of_ulimit_percentage, .. } = self.mode else { + return; + }; + self.max_fds = ulimit_max_fds * max_fds_as_fraction_of_ulimit_percentage / 100; + } +} + +impl Mode { + fn default() -> Self { + Self::DynamicMax { + max_fds_as_fraction_of_ulimit_percentage: DEFAULT_FDS_AS_FRACTION_OF_ULIMIT_PERCENTAGE, + update_ulimit_secs: DEFAULT_UPDATE_ULIMIT_SECS, + } + } +} + +/// The fd_manager entrypoint. +pub async fn fd_manager( + our_node: Arc, + send_to_loop: MessageSender, + send_to_terminal: PrintSender, + mut recv_from_loop: MessageReceiver, +) -> anyhow::Result<()> { + let mut state = State::new(); + let mut interval = { + // in code block to release the reference into state + let Mode::DynamicMax { ref update_ulimit_secs, .. } = state.mode else { + return Ok(()) + }; + tokio::time::interval(tokio::time::Duration::from_secs( + update_ulimit_secs.clone() + )) + }; + let our_node = our_node.as_str(); + loop { + tokio::select! { + Some(message) = recv_from_loop.recv() => { + handle_message(message, &mut interval, &mut state)?; + } + _ = interval.tick() => { + update_max_fds(&send_to_terminal, &mut state).await?; + } + } + + if state.total_fds >= state.max_fds { + send_cull(our_node, &send_to_loop, &state).await?; + } + } +} + +fn handle_message(km: KernelMessage, _interval: &mut tokio::time::Interval, state: &mut State) -> anyhow::Result<()> { + let Message::Request(Request { + body, + .. + }) = km.message else { + return Err(FdManagerError::NotARequest.into()); + }; + let request: FdManagerRequest = serde_json::from_slice(&body) + .map_err(|_e| FdManagerError::BadRequest)?; + match request { + FdManagerRequest::OpenFds { number_opened } => { + state.total_fds += number_opened; + state.fds + .entry(km.source.process) + .and_modify(|e| *e += number_opened) + .or_insert(number_opened); + } + FdManagerRequest::CloseFds { mut number_closed } => { + assert!(state.total_fds >= number_closed); + state.total_fds -= number_closed; + state.fds + .entry(km.source.process) + .and_modify(|e| { + assert!(e >= &mut number_closed); + *e -= number_closed + }) + .or_insert(number_closed); + } + FdManagerRequest::Cull { .. } => { + return Err(FdManagerError::FdManagerWasSentCull.into()); + } + FdManagerRequest::UpdateMaxFdsAsFractionOfUlimitPercentage(_new) => { + unimplemented!(); + } + FdManagerRequest::UpdateUpdateUlimitSecs(_new) => { + unimplemented!(); + } + FdManagerRequest::UpdateCullFractionDenominator(_new) => { + unimplemented!(); + } + } + Ok(()) +} + +async fn update_max_fds(send_to_terminal: &PrintSender, state: &mut State) -> anyhow::Result<()> { + let ulimit_max_fds = match get_max_fd_limit() { + Ok(ulimit_max_fds) => ulimit_max_fds, + Err(_) => { + Printout::new(1, "Couldn't update max fd limit: ulimit failed") + .send(send_to_terminal).await; + return Ok(()); + } + }; + state.update_max_fds_from_ulimit(ulimit_max_fds); + Ok(()) +} + +async fn send_cull(our_node: &str, send_to_loop: &MessageSender, state: &State) -> anyhow::Result<()> { + let message = Message::Request(Request { + inherit: false, + expects_response: None, + body: serde_json::to_vec(&FdManagerRequest::Cull { + cull_fraction_denominator: state.cull_fraction_denominator.clone(), + }).unwrap(), + metadata: None, + capabilities: vec![], + }); + for process_id in state.fds.keys() { + KernelMessage::builder() + .id(rand::random()) + .source((our_node.clone(), FD_MANAGER_PROCESS_ID.clone())) + .target((our_node.clone(), process_id.clone())) + .message(message.clone()) + .build() + .unwrap() + .send(send_to_loop) + .await; + } + Ok(()) +} + +fn get_max_fd_limit() -> anyhow::Result { + let mut rlim = libc::rlimit { + rlim_cur: 0, // Current limit + rlim_max: 0, // Maximum limit value + }; + + // RLIMIT_NOFILE is the resource indicating the maximum file descriptor number. + if unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut rlim) } == 0 { + Ok(rlim.rlim_cur as u64) + } else { + Err(anyhow::anyhow!("Failed to get the resource limit.")) + } +} diff --git a/kinode/src/main.rs b/kinode/src/main.rs index 61619479..50c4fe7e 100644 --- a/kinode/src/main.rs +++ b/kinode/src/main.rs @@ -17,6 +17,7 @@ use tokio::sync::mpsc; mod eth; #[cfg(feature = "simulation-mode")] mod fakenet; +mod fd_manager; mod http; mod kernel; mod keygen; @@ -42,6 +43,7 @@ const VFS_CHANNEL_CAPACITY: usize = 1_000; const CAP_CHANNEL_CAPACITY: usize = 1_000; const KV_CHANNEL_CAPACITY: usize = 1_000; const SQLITE_CHANNEL_CAPACITY: usize = 1_000; +const FD_MANAGER_CHANNEL_CAPACITY: usize = 1_000; const VERSION: &str = env!("CARGO_PKG_VERSION"); const WS_MIN_PORT: u16 = 9_000; const TCP_MIN_PORT: u16 = 10_000; @@ -175,6 +177,9 @@ async fn main() { // vfs maintains metadata about files in fs for processes let (vfs_message_sender, vfs_message_receiver): (MessageSender, MessageReceiver) = mpsc::channel(VFS_CHANNEL_CAPACITY); + // fd_manager makes sure we don't overrun the `ulimit -n`: max number of file descriptors + let (fd_manager_sender, fd_manager_receiver): (MessageSender, MessageReceiver) = + mpsc::channel(FD_MANAGER_CHANNEL_CAPACITY); // terminal receives prints via this channel, all other modules send prints let (print_sender, print_receiver): (PrintSender, PrintReceiver) = mpsc::channel(TERMINAL_CHANNEL_CAPACITY); @@ -282,6 +287,12 @@ async fn main() { None, false, ), + ( + ProcessId::new(Some("fd_manager"), "distro", "sys"), + fd_manager_sender, + None, + false, + ), ]; /* @@ -351,6 +362,12 @@ async fn main() { db, home_directory_path.clone(), )); + tasks.spawn(fd_manager::fd_manager( + our_name_arc.clone(), + kernel_message_sender.clone(), + print_sender.clone(), + fd_manager_receiver, + )); tasks.spawn(kv::kv( our_name_arc.clone(), kernel_message_sender.clone(), diff --git a/lib/src/core.rs b/lib/src/core.rs index b8a28676..a626fc86 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -17,6 +17,7 @@ lazy_static::lazy_static! { pub static ref STATE_PROCESS_ID: ProcessId = ProcessId::new(Some("state"), "distro", "sys"); pub static ref KV_PROCESS_ID: ProcessId = ProcessId::new(Some("kv"), "distro", "sys"); pub static ref SQLITE_PROCESS_ID: ProcessId = ProcessId::new(Some("sqlite"), "distro", "sys"); + pub static ref FD_MANAGER_PROCESS_ID: ProcessId = ProcessId::new(Some("fd_manager"), "distro", "sys"); } // @@ -2068,3 +2069,28 @@ impl KnsUpdate { self.ports.get(protocol) } } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum FdManagerRequest { + /// other process -> fd_manager + OpenFds { number_opened: u64 }, + CloseFds { number_closed: u64 }, + + /// fd_manager -> other process + Cull { cull_fraction_denominator: u64 }, + + /// administrative + UpdateMaxFdsAsFractionOfUlimitPercentage(u64), + UpdateUpdateUlimitSecs(u64), + UpdateCullFractionDenominator(u64), +} + +#[derive(Debug, Error)] +pub enum FdManagerError { + #[error("fd_manager: received a non-Request message")] + NotARequest, + #[error("fd_manager: received a non-FdManangerRequest")] + BadRequest, + #[error("fd_manager: received a FdManagerRequest::Cull, but I am the one who culls")] + FdManagerWasSentCull, +}