fd_manager: add initial runtime module

This commit is contained in:
hosted-fornet 2024-09-25 22:07:41 -07:00
parent 74c8458124
commit fea159ab2d
5 changed files with 237 additions and 0 deletions

1
Cargo.lock generated
View File

@ -3665,6 +3665,7 @@ dependencies = [
"kit",
"lazy_static",
"lib",
"libc",
"nohash-hasher",
"open",
"public-ip",

View File

@ -62,6 +62,7 @@ indexmap = "2.4"
jwt = "0.16"
lib = { path = "../lib" }
lazy_static = "1.4.0"
libc = "0.2"
nohash-hasher = "0.2.0"
open = "5.1.4"
public-ip = "0.2.2"

192
kinode/src/fd_manager.rs Normal file
View File

@ -0,0 +1,192 @@
use lib::types::core::{
KernelMessage, Message, MessageReceiver, MessageSender, PrintSender,
Printout, ProcessId, Request, FdManagerRequest, FdManagerError, FD_MANAGER_PROCESS_ID,
};
use std::{
collections::HashMap,
sync::Arc,
};
const DEFAULT_MAX_OPEN_FDS: u64 = 180;
const DEFAULT_FDS_AS_FRACTION_OF_ULIMIT_PERCENTAGE: u64 = 60;
const DEFAULT_UPDATE_ULIMIT_SECS: u64 = 3600;
const DEFAULT_CULL_FRACTION_DENOMINATOR: u64 = 2;
struct State {
fds: HashMap<ProcessId, u64>,
mode: Mode,
total_fds: u64,
max_fds: u64,
cull_fraction_denominator: u64,
}
enum Mode {
/// don't update the max_fds except by user input
StaticMax,
/// check the system's ulimit periodically and update max_fds accordingly
DynamicMax {
max_fds_as_fraction_of_ulimit_percentage: u64,
update_ulimit_secs: u64,
}
}
impl State {
fn new() -> Self {
Self::default()
}
fn default() -> Self {
Self {
fds: HashMap::new(),
mode: Mode::default(),
total_fds: 0,
max_fds: DEFAULT_MAX_OPEN_FDS,
cull_fraction_denominator: DEFAULT_CULL_FRACTION_DENOMINATOR,
}
}
fn update_max_fds_from_ulimit(&mut self, ulimit_max_fds: u64) {
let Mode::DynamicMax { ref max_fds_as_fraction_of_ulimit_percentage, .. } = self.mode else {
return;
};
self.max_fds = ulimit_max_fds * max_fds_as_fraction_of_ulimit_percentage / 100;
}
}
impl Mode {
fn default() -> Self {
Self::DynamicMax {
max_fds_as_fraction_of_ulimit_percentage: DEFAULT_FDS_AS_FRACTION_OF_ULIMIT_PERCENTAGE,
update_ulimit_secs: DEFAULT_UPDATE_ULIMIT_SECS,
}
}
}
/// The fd_manager entrypoint.
pub async fn fd_manager(
our_node: Arc<String>,
send_to_loop: MessageSender,
send_to_terminal: PrintSender,
mut recv_from_loop: MessageReceiver,
) -> anyhow::Result<()> {
let mut state = State::new();
let mut interval = {
// in code block to release the reference into state
let Mode::DynamicMax { ref update_ulimit_secs, .. } = state.mode else {
return Ok(())
};
tokio::time::interval(tokio::time::Duration::from_secs(
update_ulimit_secs.clone()
))
};
let our_node = our_node.as_str();
loop {
tokio::select! {
Some(message) = recv_from_loop.recv() => {
handle_message(message, &mut interval, &mut state)?;
}
_ = interval.tick() => {
update_max_fds(&send_to_terminal, &mut state).await?;
}
}
if state.total_fds >= state.max_fds {
send_cull(our_node, &send_to_loop, &state).await?;
}
}
}
fn handle_message(km: KernelMessage, _interval: &mut tokio::time::Interval, state: &mut State) -> anyhow::Result<()> {
let Message::Request(Request {
body,
..
}) = km.message else {
return Err(FdManagerError::NotARequest.into());
};
let request: FdManagerRequest = serde_json::from_slice(&body)
.map_err(|_e| FdManagerError::BadRequest)?;
match request {
FdManagerRequest::OpenFds { number_opened } => {
state.total_fds += number_opened;
state.fds
.entry(km.source.process)
.and_modify(|e| *e += number_opened)
.or_insert(number_opened);
}
FdManagerRequest::CloseFds { mut number_closed } => {
assert!(state.total_fds >= number_closed);
state.total_fds -= number_closed;
state.fds
.entry(km.source.process)
.and_modify(|e| {
assert!(e >= &mut number_closed);
*e -= number_closed
})
.or_insert(number_closed);
}
FdManagerRequest::Cull { .. } => {
return Err(FdManagerError::FdManagerWasSentCull.into());
}
FdManagerRequest::UpdateMaxFdsAsFractionOfUlimitPercentage(_new) => {
unimplemented!();
}
FdManagerRequest::UpdateUpdateUlimitSecs(_new) => {
unimplemented!();
}
FdManagerRequest::UpdateCullFractionDenominator(_new) => {
unimplemented!();
}
}
Ok(())
}
async fn update_max_fds(send_to_terminal: &PrintSender, state: &mut State) -> anyhow::Result<()> {
let ulimit_max_fds = match get_max_fd_limit() {
Ok(ulimit_max_fds) => ulimit_max_fds,
Err(_) => {
Printout::new(1, "Couldn't update max fd limit: ulimit failed")
.send(send_to_terminal).await;
return Ok(());
}
};
state.update_max_fds_from_ulimit(ulimit_max_fds);
Ok(())
}
async fn send_cull(our_node: &str, send_to_loop: &MessageSender, state: &State) -> anyhow::Result<()> {
let message = Message::Request(Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&FdManagerRequest::Cull {
cull_fraction_denominator: state.cull_fraction_denominator.clone(),
}).unwrap(),
metadata: None,
capabilities: vec![],
});
for process_id in state.fds.keys() {
KernelMessage::builder()
.id(rand::random())
.source((our_node.clone(), FD_MANAGER_PROCESS_ID.clone()))
.target((our_node.clone(), process_id.clone()))
.message(message.clone())
.build()
.unwrap()
.send(send_to_loop)
.await;
}
Ok(())
}
fn get_max_fd_limit() -> anyhow::Result<u64> {
let mut rlim = libc::rlimit {
rlim_cur: 0, // Current limit
rlim_max: 0, // Maximum limit value
};
// RLIMIT_NOFILE is the resource indicating the maximum file descriptor number.
if unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut rlim) } == 0 {
Ok(rlim.rlim_cur as u64)
} else {
Err(anyhow::anyhow!("Failed to get the resource limit."))
}
}

View File

@ -17,6 +17,7 @@ use tokio::sync::mpsc;
mod eth;
#[cfg(feature = "simulation-mode")]
mod fakenet;
mod fd_manager;
mod http;
mod kernel;
mod keygen;
@ -42,6 +43,7 @@ const VFS_CHANNEL_CAPACITY: usize = 1_000;
const CAP_CHANNEL_CAPACITY: usize = 1_000;
const KV_CHANNEL_CAPACITY: usize = 1_000;
const SQLITE_CHANNEL_CAPACITY: usize = 1_000;
const FD_MANAGER_CHANNEL_CAPACITY: usize = 1_000;
const VERSION: &str = env!("CARGO_PKG_VERSION");
const WS_MIN_PORT: u16 = 9_000;
const TCP_MIN_PORT: u16 = 10_000;
@ -175,6 +177,9 @@ async fn main() {
// vfs maintains metadata about files in fs for processes
let (vfs_message_sender, vfs_message_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(VFS_CHANNEL_CAPACITY);
// fd_manager makes sure we don't overrun the `ulimit -n`: max number of file descriptors
let (fd_manager_sender, fd_manager_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(FD_MANAGER_CHANNEL_CAPACITY);
// terminal receives prints via this channel, all other modules send prints
let (print_sender, print_receiver): (PrintSender, PrintReceiver) =
mpsc::channel(TERMINAL_CHANNEL_CAPACITY);
@ -282,6 +287,12 @@ async fn main() {
None,
false,
),
(
ProcessId::new(Some("fd_manager"), "distro", "sys"),
fd_manager_sender,
None,
false,
),
];
/*
@ -351,6 +362,12 @@ async fn main() {
db,
home_directory_path.clone(),
));
tasks.spawn(fd_manager::fd_manager(
our_name_arc.clone(),
kernel_message_sender.clone(),
print_sender.clone(),
fd_manager_receiver,
));
tasks.spawn(kv::kv(
our_name_arc.clone(),
kernel_message_sender.clone(),

View File

@ -17,6 +17,7 @@ lazy_static::lazy_static! {
pub static ref STATE_PROCESS_ID: ProcessId = ProcessId::new(Some("state"), "distro", "sys");
pub static ref KV_PROCESS_ID: ProcessId = ProcessId::new(Some("kv"), "distro", "sys");
pub static ref SQLITE_PROCESS_ID: ProcessId = ProcessId::new(Some("sqlite"), "distro", "sys");
pub static ref FD_MANAGER_PROCESS_ID: ProcessId = ProcessId::new(Some("fd_manager"), "distro", "sys");
}
//
@ -2068,3 +2069,28 @@ impl KnsUpdate {
self.ports.get(protocol)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum FdManagerRequest {
/// other process -> fd_manager
OpenFds { number_opened: u64 },
CloseFds { number_closed: u64 },
/// fd_manager -> other process
Cull { cull_fraction_denominator: u64 },
/// administrative
UpdateMaxFdsAsFractionOfUlimitPercentage(u64),
UpdateUpdateUlimitSecs(u64),
UpdateCullFractionDenominator(u64),
}
#[derive(Debug, Error)]
pub enum FdManagerError {
#[error("fd_manager: received a non-Request message")]
NotARequest,
#[error("fd_manager: received a non-FdManangerRequest")]
BadRequest,
#[error("fd_manager: received a FdManagerRequest::Cull, but I am the one who culls")]
FdManagerWasSentCull,
}