MVP of limit-based fd_manager

This commit is contained in:
dr-frmr 2024-10-04 16:37:39 -04:00
parent b661e7b593
commit 1da3132392
No known key found for this signature in database
9 changed files with 180 additions and 215 deletions

View File

@ -474,7 +474,10 @@ fn serve_paths(
&our.node().to_string(), &our.node().to_string(),
) { ) {
Ok(_) => { Ok(_) => {
println!("successfully installed package: {:?}", process_package_id); println!(
"successfully installed {}:{}",
process_package_id.package_name, process_package_id.publisher_node
);
Ok((StatusCode::CREATED, None, vec![])) Ok((StatusCode::CREATED, None, vec![]))
} }
Err(e) => Ok(( Err(e) => Ok((

View File

@ -261,8 +261,8 @@ fn handle_local_request(
match utils::install(&package_id, metadata, &version_hash, state, &our.node) { match utils::install(&package_id, metadata, &version_hash, state, &our.node) {
Ok(()) => { Ok(()) => {
println!( println!(
"successfully installed package: {:?}", "successfully installed {}:{}",
&package_id.to_process_lib() package_id.package_name, package_id.publisher_node
); );
LocalResponse::InstallResponse(InstallResponse::Success) LocalResponse::InstallResponse(InstallResponse::Success)
} }

View File

@ -1,5 +1,5 @@
use lib::types::core::{ use lib::types::core::{
Address, FdManagerError, FdManagerRequest, FdManagerResponse, KernelMessage, Message, Address, FdManagerError, FdManagerRequest, FdManagerResponse, FdsLimit, KernelMessage, Message,
MessageReceiver, MessageSender, PrintSender, Printout, ProcessId, Request, MessageReceiver, MessageSender, PrintSender, Printout, ProcessId, Request,
FD_MANAGER_PROCESS_ID, FD_MANAGER_PROCESS_ID,
}; };
@ -8,14 +8,12 @@ use std::{collections::HashMap, sync::Arc};
const DEFAULT_MAX_OPEN_FDS: u64 = 180; const DEFAULT_MAX_OPEN_FDS: u64 = 180;
const DEFAULT_FDS_AS_FRACTION_OF_ULIMIT_PERCENTAGE: u64 = 60; const DEFAULT_FDS_AS_FRACTION_OF_ULIMIT_PERCENTAGE: u64 = 60;
const DEFAULT_UPDATE_ULIMIT_SECS: u64 = 3600; const DEFAULT_UPDATE_ULIMIT_SECS: u64 = 3600;
const DEFAULT_CULL_FRACTION_DENOMINATOR: u64 = 2; const _DEFAULT_CULL_FRACTION_DENOMINATOR: u64 = 2;
struct State { struct State {
fds: HashMap<ProcessId, u64>, fds_limits: HashMap<ProcessId, FdsLimit>,
mode: Mode, mode: Mode,
total_fds: u64,
max_fds: u64, max_fds: u64,
cull_fraction_denominator: u64,
} }
enum Mode { enum Mode {
@ -35,14 +33,12 @@ impl State {
fn default(static_max_fds: Option<u64>) -> Self { fn default(static_max_fds: Option<u64>) -> Self {
Self { Self {
fds: HashMap::new(), fds_limits: HashMap::new(),
mode: Mode::default(static_max_fds), mode: Mode::default(static_max_fds),
total_fds: 0,
max_fds: match static_max_fds { max_fds: match static_max_fds {
Some(max) => max, Some(max) => max,
None => DEFAULT_MAX_OPEN_FDS, None => DEFAULT_MAX_OPEN_FDS,
}, },
cull_fraction_denominator: DEFAULT_CULL_FRACTION_DENOMINATOR,
} }
} }
@ -91,11 +87,11 @@ pub async fn fd_manager(
}; };
tokio::time::interval(tokio::time::Duration::from_secs(*update_ulimit_secs)) tokio::time::interval(tokio::time::Duration::from_secs(*update_ulimit_secs))
}; };
let our_node = our_node.as_str();
loop { loop {
tokio::select! { tokio::select! {
Some(message) = recv_from_loop.recv() => { Some(message) = recv_from_loop.recv() => {
match handle_message( match handle_message(
&our_node,
message, message,
&mut interval, &mut interval,
&mut state, &mut state,
@ -120,23 +116,11 @@ pub async fn fd_manager(
} }
} }
} }
if state.total_fds >= state.max_fds {
Printout::new(
2,
format!(
"Have {} open >= {} max fds; sending Cull Request...",
state.total_fds, state.max_fds,
),
)
.send(&send_to_terminal)
.await;
send_cull(our_node, &send_to_loop, &state).await;
}
} }
} }
async fn handle_message( async fn handle_message(
our_node: &str,
km: KernelMessage, km: KernelMessage,
_interval: &mut tokio::time::Interval, _interval: &mut tokio::time::Interval,
state: &mut State, state: &mut State,
@ -153,56 +137,37 @@ async fn handle_message(
let request: FdManagerRequest = let request: FdManagerRequest =
serde_json::from_slice(&body).map_err(|_e| FdManagerError::BadRequest)?; serde_json::from_slice(&body).map_err(|_e| FdManagerError::BadRequest)?;
let return_value = match request { let return_value = match request {
FdManagerRequest::OpenFds { number_opened } => { FdManagerRequest::RequestFdsLimit => {
state.total_fds += number_opened; // divide max_fds by number of processes requesting fds limits,
state // then send each process its new limit
.fds // TODO can weight different processes differently
.entry(km.source.process) let per_process_limit = state.max_fds / (state.fds_limits.len() + 1) as u64;
.and_modify(|e| *e += number_opened) state.fds_limits.insert(
.or_insert(number_opened); km.source.process,
FdsLimit {
limit: per_process_limit,
hit_count: 0,
},
);
state.fds_limits.iter_mut().for_each(|(_process, limit)| {
limit.limit = per_process_limit;
limit.hit_count = 0;
});
send_all_fds_limits(our_node, send_to_loop, state).await;
None None
} }
FdManagerRequest::CloseFds { mut number_closed } => { FdManagerRequest::FdsLimitHit => {
if !state.fds.contains_key(&km.source.process) { // sender process hit its fd limit
return Err(anyhow::anyhow!( // TODO react to this
"{} attempted to CloseFds {} but does not have any open!", state.fds_limits.get_mut(&km.source.process).map(|limit| {
km.source.process, limit.hit_count += 1;
number_closed,
));
}
let mut return_value = Some(format!(
"{} closed {} of {} total;",
km.source.process, number_closed, state.total_fds,
));
if state.total_fds < number_closed {
return_value.as_mut().unwrap().push_str(&format!(
" !!process claims to have closed more fds ({}) than we have open for all processes ({})!!",
number_closed,
state.total_fds,
));
state.total_fds = 0;
} else {
state.total_fds -= number_closed;
}
state.fds.entry(km.source.process).and_modify(|e| {
if e < &mut number_closed {
return_value.as_mut().unwrap().push_str(&format!(
" !!process claims to have closed more fds ({}) than it had open: {}!!",
number_closed, e,
));
*e = 0;
} else {
*e -= number_closed;
}
return_value
.as_mut()
.unwrap()
.push_str(&format!(" ({e} left to process after close)"));
}); });
return_value Some(format!("{} hit its fd limit", km.source.process))
} }
FdManagerRequest::Cull { .. } => { FdManagerRequest::FdsLimit(_) => {
return Err(FdManagerError::FdManagerWasSentCull.into()); // should only send this, never receive it
return Err(FdManagerError::FdManagerWasSentLimit.into());
} }
FdManagerRequest::UpdateMaxFdsAsFractionOfUlimitPercentage(new) => { FdManagerRequest::UpdateMaxFdsAsFractionOfUlimitPercentage(new) => {
match state.mode { match state.mode {
@ -224,8 +189,8 @@ async fn handle_message(
} }
None None
} }
FdManagerRequest::UpdateCullFractionDenominator(new) => { FdManagerRequest::UpdateCullFractionDenominator(_new) => {
state.cull_fraction_denominator = new; // state.cull_fraction_denominator = new;
None None
} }
FdManagerRequest::GetState => { FdManagerRequest::GetState => {
@ -237,7 +202,7 @@ async fn handle_message(
.message(Message::Response(( .message(Message::Response((
lib::core::Response { lib::core::Response {
body: serde_json::to_vec(&FdManagerResponse::GetState( body: serde_json::to_vec(&FdManagerResponse::GetState(
state.fds.clone(), state.fds_limits.clone(),
)) ))
.unwrap(), .unwrap(),
inherit: false, inherit: false,
@ -253,7 +218,7 @@ async fn handle_message(
} }
None None
} }
FdManagerRequest::GetProcessFdCount(process) => { FdManagerRequest::GetProcessFdLimit(process) => {
if expects_response.is_some() { if expects_response.is_some() {
KernelMessage::builder() KernelMessage::builder()
.id(km.id) .id(km.id)
@ -261,8 +226,12 @@ async fn handle_message(
.target(km.rsvp.unwrap_or(km.source)) .target(km.rsvp.unwrap_or(km.source))
.message(Message::Response(( .message(Message::Response((
lib::core::Response { lib::core::Response {
body: serde_json::to_vec(&FdManagerResponse::GetProcessFdCount( body: serde_json::to_vec(&FdManagerResponse::GetProcessFdLimit(
*state.fds.get(&process).unwrap_or(&0), state
.fds_limits
.get(&process)
.map(|limit| limit.limit)
.unwrap_or(0),
)) ))
.unwrap(), .unwrap(),
inherit: false, inherit: false,
@ -289,23 +258,19 @@ async fn update_max_fds(state: &mut State) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
async fn send_cull(our_node: &str, send_to_loop: &MessageSender, state: &State) { async fn send_all_fds_limits(our_node: &str, send_to_loop: &MessageSender, state: &State) {
let message = Message::Request(Request { for (process_id, limit) in &state.fds_limits {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&FdManagerRequest::Cull {
cull_fraction_denominator: state.cull_fraction_denominator.clone(),
})
.unwrap(),
metadata: None,
capabilities: vec![],
});
for process_id in state.fds.keys() {
KernelMessage::builder() KernelMessage::builder()
.id(rand::random()) .id(rand::random())
.source((our_node, FD_MANAGER_PROCESS_ID.clone())) .source((our_node, FD_MANAGER_PROCESS_ID.clone()))
.target((our_node, process_id.clone())) .target((our_node, process_id.clone()))
.message(message.clone()) .message(Message::Request(Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&FdManagerRequest::FdsLimit(limit.limit)).unwrap(),
metadata: None,
capabilities: vec![],
}))
.build() .build()
.unwrap() .unwrap()
.send(send_to_loop) .send(send_to_loop)
@ -327,43 +292,29 @@ fn get_max_fd_limit() -> anyhow::Result<u64> {
} }
} }
pub async fn send_fd_manager_open( pub async fn send_fd_manager_request_fds_limit(our: &Address, send_to_loop: &MessageSender) {
our: &Address,
number_opened: u64,
send_to_loop: &MessageSender,
) -> anyhow::Result<()> {
let message = Message::Request(Request { let message = Message::Request(Request {
inherit: false, inherit: false,
expects_response: None, expects_response: None,
body: serde_json::to_vec(&FdManagerRequest::OpenFds { number_opened }).unwrap(), body: serde_json::to_vec(&FdManagerRequest::RequestFdsLimit).unwrap(),
metadata: None, metadata: None,
capabilities: vec![], capabilities: vec![],
}); });
send_to_fd_manager(our, message, send_to_loop).await?; send_to_fd_manager(our, message, send_to_loop).await
Ok(())
} }
pub async fn send_fd_manager_close( pub async fn send_fd_manager_hit_fds_limit(our: &Address, send_to_loop: &MessageSender) {
our: &Address,
number_closed: u64,
send_to_loop: &MessageSender,
) -> anyhow::Result<()> {
let message = Message::Request(Request { let message = Message::Request(Request {
inherit: false, inherit: false,
expects_response: None, expects_response: None,
body: serde_json::to_vec(&FdManagerRequest::CloseFds { number_closed }).unwrap(), body: serde_json::to_vec(&FdManagerRequest::FdsLimitHit).unwrap(),
metadata: None, metadata: None,
capabilities: vec![], capabilities: vec![],
}); });
send_to_fd_manager(our, message, send_to_loop).await?; send_to_fd_manager(our, message, send_to_loop).await
Ok(())
} }
async fn send_to_fd_manager( async fn send_to_fd_manager(our: &Address, message: Message, send_to_loop: &MessageSender) {
our: &Address,
message: Message,
send_to_loop: &MessageSender,
) -> anyhow::Result<()> {
KernelMessage::builder() KernelMessage::builder()
.id(rand::random()) .id(rand::random())
.source(our.clone()) .source(our.clone())
@ -372,6 +323,5 @@ async fn send_to_fd_manager(
.build() .build()
.unwrap() .unwrap()
.send(send_to_loop) .send(send_to_loop)
.await; .await
Ok(())
} }

View File

@ -49,8 +49,8 @@ const WS_MIN_PORT: u16 = 9_000;
const TCP_MIN_PORT: u16 = 10_000; const TCP_MIN_PORT: u16 = 10_000;
const MAX_PORT: u16 = 65_535; const MAX_PORT: u16 = 65_535;
const DEFAULT_MAX_PEERS: u32 = 32; const DEFAULT_MAX_PEERS: u64 = 32;
const DEFAULT_MAX_PASSTHROUGHS: u32 = 0; const DEFAULT_MAX_PASSTHROUGHS: u64 = 0;
/// default routers as a eth-provider fallback /// default routers as a eth-provider fallback
const DEFAULT_ETH_PROVIDERS: &str = include_str!("eth/default_providers_mainnet.json"); const DEFAULT_ETH_PROVIDERS: &str = include_str!("eth/default_providers_mainnet.json");
@ -358,10 +358,10 @@ async fn main() {
net_message_receiver, net_message_receiver,
*matches.get_one::<bool>("reveal-ip").unwrap_or(&true), *matches.get_one::<bool>("reveal-ip").unwrap_or(&true),
*matches *matches
.get_one::<u32>("max-peers") .get_one::<u64>("max-peers")
.unwrap_or(&DEFAULT_MAX_PEERS), .unwrap_or(&DEFAULT_MAX_PEERS),
*matches *matches
.get_one::<u32>("max-passthroughs") .get_one::<u64>("max-passthroughs")
.unwrap_or(&DEFAULT_MAX_PASSTHROUGHS), .unwrap_or(&DEFAULT_MAX_PASSTHROUGHS),
)); ));
tasks.spawn(state::state_sender( tasks.spawn(state::state_sender(

View File

@ -1,6 +1,9 @@
use lib::types::core::{ use lib::{
core::Address,
types::core::{
Identity, KernelMessage, MessageReceiver, MessageSender, NetAction, NetResponse, Identity, KernelMessage, MessageReceiver, MessageSender, NetAction, NetResponse,
NetworkErrorSender, NodeRouting, PrintSender, NetworkErrorSender, NodeRouting, PrintSender, NET_PROCESS_ID,
},
}; };
use types::{ use types::{
ActivePassthroughs, IdentityExt, NetData, OnchainPKI, Peers, PendingPassthroughs, TCP_PROTOCOL, ActivePassthroughs, IdentityExt, NetData, OnchainPKI, Peers, PendingPassthroughs, TCP_PROTOCOL,
@ -34,10 +37,16 @@ pub async fn networking(
kernel_message_rx: MessageReceiver, kernel_message_rx: MessageReceiver,
// only used if indirect -- TODO use // only used if indirect -- TODO use
_reveal_ip: bool, _reveal_ip: bool,
max_peers: u32, max_peers: u64,
// only used by routers // only used by routers
max_passthroughs: u32, max_passthroughs: u64,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
crate::fd_manager::send_fd_manager_request_fds_limit(
&Address::new(&our.name, NET_PROCESS_ID.clone()),
&kernel_message_tx,
)
.await;
let ext = IdentityExt { let ext = IdentityExt {
our: Arc::new(our), our: Arc::new(our),
our_ip: Arc::new(our_ip), our_ip: Arc::new(our_ip),
@ -62,6 +71,7 @@ pub async fn networking(
active_passthroughs, active_passthroughs,
max_peers, max_peers,
max_passthroughs, max_passthroughs,
fds_limit: 100, // TODO blocking request to fd_manager to get max num of fds at boot
}; };
let mut tasks = JoinSet::<anyhow::Result<()>>::new(); let mut tasks = JoinSet::<anyhow::Result<()>>::new();
@ -116,12 +126,12 @@ pub async fn networking(
async fn local_recv( async fn local_recv(
ext: IdentityExt, ext: IdentityExt,
mut kernel_message_rx: MessageReceiver, mut kernel_message_rx: MessageReceiver,
data: NetData, mut data: NetData,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
while let Some(km) = kernel_message_rx.recv().await { while let Some(km) = kernel_message_rx.recv().await {
if km.target.node == ext.our.name { if km.target.node == ext.our.name {
// handle messages sent to us // handle messages sent to us
handle_message(&ext, km, &data).await; handle_message(&ext, km, &mut data).await;
} else { } else {
connect::send_to_peer(&ext, &data, km).await; connect::send_to_peer(&ext, &data, km).await;
} }
@ -129,7 +139,7 @@ async fn local_recv(
Err(anyhow::anyhow!("net: kernel message channel was dropped")) Err(anyhow::anyhow!("net: kernel message channel was dropped"))
} }
async fn handle_message(ext: &IdentityExt, km: KernelMessage, data: &NetData) { async fn handle_message(ext: &IdentityExt, km: KernelMessage, data: &mut NetData) {
match &km.message { match &km.message {
lib::core::Message::Request(request) => handle_request(ext, &km, &request.body, data).await, lib::core::Message::Request(request) => handle_request(ext, &km, &request.body, data).await,
lib::core::Message::Response((response, _context)) => { lib::core::Message::Response((response, _context)) => {
@ -142,7 +152,7 @@ async fn handle_request(
ext: &IdentityExt, ext: &IdentityExt,
km: &KernelMessage, km: &KernelMessage,
request_body: &[u8], request_body: &[u8],
data: &NetData, data: &mut NetData,
) { ) {
if km.source.node == ext.our.name { if km.source.node == ext.our.name {
handle_local_request(ext, km, request_body, data).await; handle_local_request(ext, km, request_body, data).await;
@ -158,7 +168,7 @@ async fn handle_local_request(
ext: &IdentityExt, ext: &IdentityExt,
km: &KernelMessage, km: &KernelMessage,
request_body: &[u8], request_body: &[u8],
data: &NetData, data: &mut NetData,
) { ) {
match rmp_serde::from_slice::<NetAction>(request_body) { match rmp_serde::from_slice::<NetAction>(request_body) {
Err(_e) => { Err(_e) => {
@ -322,7 +332,7 @@ async fn handle_local_request(
} }
} }
async fn handle_fdman(km: &KernelMessage, request_body: &[u8], data: &NetData) { async fn handle_fdman(km: &KernelMessage, request_body: &[u8], data: &mut NetData) {
if km.source.process != *lib::core::FD_MANAGER_PROCESS_ID { if km.source.process != *lib::core::FD_MANAGER_PROCESS_ID {
return; return;
} }
@ -330,12 +340,19 @@ async fn handle_fdman(km: &KernelMessage, request_body: &[u8], data: &NetData) {
return; return;
}; };
match req { match req {
lib::core::FdManagerRequest::Cull { lib::core::FdManagerRequest::FdsLimit(fds_limit) => {
cull_fraction_denominator, data.fds_limit = fds_limit;
} => { if data.max_peers > fds_limit {
// we are requested to cull a fraction of our peers! data.max_peers = fds_limit;
}
// TODO combine with max_peers check
if data.max_passthroughs > fds_limit {
data.max_passthroughs = fds_limit;
}
// TODO cull passthroughs too? // TODO cull passthroughs too?
data.peers.cull(cull_fraction_denominator).await; if data.peers.peers().len() >= data.fds_limit as usize {
data.peers.cull(2).await;
}
} }
_ => return, _ => return,
} }

View File

@ -1,6 +1,6 @@
use crate::net::utils;
use lib::types::core::{ use lib::types::core::{
Identity, KernelMessage, MessageSender, NetworkErrorSender, NodeId, PrintSender, Address, Identity, KernelMessage, MessageSender, NetworkErrorSender, NodeId, PrintSender,
NET_PROCESS_ID,
}; };
use { use {
dashmap::DashMap, dashmap::DashMap,
@ -57,13 +57,13 @@ pub struct RoutingRequest {
#[derive(Clone)] #[derive(Clone)]
pub struct Peers { pub struct Peers {
max_peers: u32, max_peers: u64,
send_to_loop: MessageSender, send_to_loop: MessageSender,
peers: Arc<DashMap<String, Peer>>, peers: Arc<DashMap<String, Peer>>,
} }
impl Peers { impl Peers {
pub fn new(max_peers: u32, send_to_loop: MessageSender) -> Self { pub fn new(max_peers: u64, send_to_loop: MessageSender) -> Self {
Self { Self {
max_peers, max_peers,
send_to_loop, send_to_loop,
@ -103,13 +103,15 @@ impl Peers {
.key() .key()
.clone(); .clone();
self.peers.remove(&oldest); self.peers.remove(&oldest);
} else { crate::fd_manager::send_fd_manager_hit_fds_limit(
utils::send_fd_manager_open(1, &self.send_to_loop).await; &Address::new("our", NET_PROCESS_ID.clone()),
&self.send_to_loop,
)
.await;
} }
} }
pub async fn remove(&self, name: &str) -> Option<(String, Peer)> { pub async fn remove(&self, name: &str) -> Option<(String, Peer)> {
utils::send_fd_manager_close(1, &self.send_to_loop).await;
self.peers.remove(name) self.peers.remove(name)
} }
@ -123,7 +125,11 @@ impl Peers {
for peer in to_remove { for peer in to_remove {
self.peers.remove(&peer.identity.name); self.peers.remove(&peer.identity.name);
} }
utils::send_fd_manager_close(num_to_remove as u64, &self.send_to_loop).await; crate::fd_manager::send_fd_manager_hit_fds_limit(
&Address::new("our", NET_PROCESS_ID.clone()),
&self.send_to_loop,
)
.await;
} }
} }
@ -217,6 +223,7 @@ pub struct NetData {
pub pending_passthroughs: PendingPassthroughs, pub pending_passthroughs: PendingPassthroughs,
/// only used by routers /// only used by routers
pub active_passthroughs: ActivePassthroughs, pub active_passthroughs: ActivePassthroughs,
pub max_peers: u32, pub max_peers: u64,
pub max_passthroughs: u32, pub max_passthroughs: u64,
pub fds_limit: u64,
} }

View File

@ -3,9 +3,9 @@ use crate::net::types::{
RoutingRequest, TCP_PROTOCOL, WS_PROTOCOL, RoutingRequest, TCP_PROTOCOL, WS_PROTOCOL,
}; };
use lib::types::core::{ use lib::types::core::{
Address, Identity, KernelMessage, KnsUpdate, Message, MessageSender, NetAction, Identity, KernelMessage, KnsUpdate, Message, MessageSender, NetAction, NetworkErrorSender,
NetworkErrorSender, NodeId, NodeRouting, PrintSender, Printout, Request, Response, SendError, NodeId, NodeRouting, PrintSender, Printout, Request, Response, SendError, SendErrorKind,
SendErrorKind, WrappedSendError, NET_PROCESS_ID, WrappedSendError,
}; };
use { use {
futures::{SinkExt, StreamExt}, futures::{SinkExt, StreamExt},
@ -427,18 +427,6 @@ pub async fn parse_hello_message(
.await; .await;
} }
/// Send an OpenFds message to the fd_manager.
pub async fn send_fd_manager_open(num_opened: u64, kernel_message_tx: &MessageSender) {
let our: Address = Address::new("our", NET_PROCESS_ID.clone());
let _ = crate::fd_manager::send_fd_manager_open(&our, num_opened, kernel_message_tx).await;
}
/// Send a CloseFds message to the fd_manager.
pub async fn send_fd_manager_close(num_closed: u64, kernel_message_tx: &MessageSender) {
let our: Address = Address::new("our", NET_PROCESS_ID.clone());
let _ = crate::fd_manager::send_fd_manager_close(&our, num_closed, kernel_message_tx).await;
}
/// Create a terminal printout at verbosity level 0. /// Create a terminal printout at verbosity level 0.
pub async fn print_loud(print_tx: &PrintSender, content: &str) { pub async fn print_loud(print_tx: &PrintSender, content: &str) {
Printout::new(0, content).send(print_tx).await; Printout::new(0, content).send(print_tx).await;

View File

@ -51,12 +51,14 @@ pub async fn vfs(
let files = Files::new( let files = Files::new(
Address::new(our_node.as_str(), VFS_PROCESS_ID.clone()), Address::new(our_node.as_str(), VFS_PROCESS_ID.clone()),
send_to_loop.clone(), send_to_loop,
); );
let process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> = let process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> =
HashMap::default(); HashMap::default();
crate::fd_manager::send_fd_manager_request_fds_limit(&files.our, &files.send_to_loop).await;
while let Some(km) = recv_from_loop.recv().await { while let Some(km) = recv_from_loop.recv().await {
if *our_node != km.source.node { if *our_node != km.source.node {
Printout::new( Printout::new(
@ -72,10 +74,10 @@ pub async fn vfs(
} }
if km.source.process == *FD_MANAGER_PROCESS_ID { if km.source.process == *FD_MANAGER_PROCESS_ID {
let files = files.clone(); let mut files = files.clone();
let send_to_terminal = send_to_terminal.clone(); let send_to_terminal = send_to_terminal.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = handle_fd_request(km, files).await { if let Err(e) = handle_fd_request(km, &mut files).await {
Printout::new( Printout::new(
1, 1,
format!("vfs: got request from fd_manager that errored: {e:?}"), format!("vfs: got request from fd_manager that errored: {e:?}"),
@ -99,9 +101,8 @@ pub async fn vfs(
// Clone Arcs for the new task // Clone Arcs for the new task
let our_node = our_node.clone(); let our_node = our_node.clone();
let send_to_loop = send_to_loop.clone();
let send_to_caps_oracle = send_to_caps_oracle.clone(); let send_to_caps_oracle = send_to_caps_oracle.clone();
let files = files.clone(); let mut files = files.clone();
let vfs_path = vfs_path.clone(); let vfs_path = vfs_path.clone();
tokio::spawn(async move { tokio::spawn(async move {
@ -110,15 +111,8 @@ pub async fn vfs(
let (km_id, km_rsvp) = let (km_id, km_rsvp) =
(km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone())); (km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone()));
if let Err(e) = handle_request( if let Err(e) =
&our_node, handle_request(&our_node, km, &mut files, &send_to_caps_oracle, &vfs_path).await
km,
files,
&send_to_loop,
&send_to_caps_oracle,
&vfs_path,
)
.await
{ {
KernelMessage::builder() KernelMessage::builder()
.id(km_id) .id(km_id)
@ -135,7 +129,7 @@ pub async fn vfs(
))) )))
.build() .build()
.unwrap() .unwrap()
.send(&send_to_loop) .send(&files.send_to_loop)
.await; .await;
} }
} }
@ -153,8 +147,9 @@ struct Files {
cursor_positions: Arc<DashMap<PathBuf, u64>>, cursor_positions: Arc<DashMap<PathBuf, u64>>,
/// access order of files /// access order of files
access_order: Arc<Mutex<UniqueQueue<PathBuf>>>, access_order: Arc<Mutex<UniqueQueue<PathBuf>>>,
our: Address, pub our: Address,
send_to_loop: MessageSender, pub send_to_loop: MessageSender,
pub fds_limit: u64,
} }
struct FileEntry { struct FileEntry {
@ -170,6 +165,7 @@ impl Files {
access_order: Arc::new(Mutex::new(UniqueQueue::new())), access_order: Arc::new(Mutex::new(UniqueQueue::new())),
our, our,
send_to_loop, send_to_loop,
fds_limit: 100, // TODO blocking request to fd_manager to get max num of fds at boot
} }
} }
@ -200,22 +196,19 @@ impl Files {
}, },
); );
self.update_access_order(&path).await; self.update_access_order(&path).await;
crate::fd_manager::send_fd_manager_open(&self.our, 1, &self.send_to_loop)
.await // if open files >= fds_limit, close the (limit/2) least recently used files
.map_err(|e| VfsError::Other { if self.open_files.len() as u64 >= self.fds_limit {
error: e.to_string(), crate::fd_manager::send_fd_manager_hit_fds_limit(&self.our, &self.send_to_loop).await;
})?; self.close_least_recently_used_files(self.fds_limit / 2)
.await?;
}
Ok(file) Ok(file)
} }
async fn remove_file(&self, path: &Path) -> Result<(), VfsError> { async fn remove_file(&self, path: &Path) -> Result<(), VfsError> {
if self.open_files.remove(path).is_some() { self.open_files.remove(path);
crate::fd_manager::send_fd_manager_close(&self.our, 1, &self.send_to_loop)
.await
.map_err(|e| VfsError::Other {
error: e.to_string(),
})?;
}
Ok(()) Ok(())
} }
@ -249,11 +242,6 @@ impl Files {
break; // no more files to close break; // no more files to close
} }
} }
crate::fd_manager::send_fd_manager_close(&self.our, closed, &self.send_to_loop)
.await
.map_err(|e| VfsError::Other {
error: e.to_string(),
})?;
Ok(()) Ok(())
} }
@ -290,8 +278,7 @@ impl Files {
async fn handle_request( async fn handle_request(
our_node: &str, our_node: &str,
km: KernelMessage, km: KernelMessage,
files: Files, files: &mut Files,
send_to_loop: &MessageSender,
send_to_caps_oracle: &CapMessageSender, send_to_caps_oracle: &CapMessageSender,
vfs_path: &PathBuf, vfs_path: &PathBuf,
) -> Result<(), VfsError> { ) -> Result<(), VfsError> {
@ -347,7 +334,7 @@ async fn handle_request(
))) )))
.build() .build()
.unwrap() .unwrap()
.send(send_to_loop) .send(&files.send_to_loop)
.await; .await;
return Ok(()); return Ok(());
} else { } else {
@ -661,7 +648,7 @@ async fn handle_request(
})) }))
.build() .build()
.unwrap() .unwrap()
.send(send_to_loop) .send(&files.send_to_loop)
.await; .await;
} }
@ -1030,7 +1017,7 @@ fn join_paths_safely(base: &PathBuf, extension: &str) -> PathBuf {
base.join(extension_path) base.join(extension_path)
} }
async fn handle_fd_request(km: KernelMessage, files: Files) -> anyhow::Result<()> { async fn handle_fd_request(km: KernelMessage, files: &mut Files) -> anyhow::Result<()> {
let Message::Request(Request { body, .. }) = km.message else { let Message::Request(Request { body, .. }) = km.message else {
return Err(anyhow::anyhow!("not a request")); return Err(anyhow::anyhow!("not a request"));
}; };
@ -1038,14 +1025,18 @@ async fn handle_fd_request(km: KernelMessage, files: Files) -> anyhow::Result<()
let request: FdManagerRequest = serde_json::from_slice(&body)?; let request: FdManagerRequest = serde_json::from_slice(&body)?;
match request { match request {
FdManagerRequest::Cull { FdManagerRequest::FdsLimit(fds_limit) => {
cull_fraction_denominator, files.fds_limit = fds_limit;
} => { if files.open_files.len() as u64 >= fds_limit {
let fraction_to_close = files.open_files.len() as u64 / cull_fraction_denominator; crate::fd_manager::send_fd_manager_hit_fds_limit(&files.our, &files.send_to_loop)
.await;
files files
.close_least_recently_used_files(fraction_to_close) .close_least_recently_used_files(
(files.open_files.len() as u64 - fds_limit) / 2,
)
.await?; .await?;
} }
}
_ => { _ => {
return Err(anyhow::anyhow!("non-Cull FdManagerRequest")); return Err(anyhow::anyhow!("non-Cull FdManagerRequest"));
} }

View File

@ -2077,12 +2077,15 @@ impl KnsUpdate {
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub enum FdManagerRequest { pub enum FdManagerRequest {
/// other process -> fd_manager /// other process -> fd_manager
OpenFds { number_opened: u64 }, /// must send this to fd_manager to get an initial fds_limit
RequestFdsLimit,
/// other process -> fd_manager /// other process -> fd_manager
CloseFds { number_closed: u64 }, /// send this to notify fd_manager that limit was hit,
/// which may or may not be reacted to
FdsLimitHit,
/// fd_manager -> other process /// fd_manager -> other process
Cull { cull_fraction_denominator: u64 }, FdsLimit(u64),
/// administrative /// administrative
UpdateMaxFdsAsFractionOfUlimitPercentage(u64), UpdateMaxFdsAsFractionOfUlimitPercentage(u64),
@ -2091,18 +2094,24 @@ pub enum FdManagerRequest {
/// administrative /// administrative
UpdateCullFractionDenominator(u64), UpdateCullFractionDenominator(u64),
/// get a `HashMap` of all `ProcessId`s to their known number of open file descriptors. /// get a `HashMap` of all `ProcessId`s to their number of allocated file descriptors.
GetState, GetState,
/// get the `u64` known number of file descriptors used by `ProcessId`. /// get the `u64` number of file descriptors allocated to `ProcessId`.
GetProcessFdCount(ProcessId), GetProcessFdLimit(ProcessId),
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum FdManagerResponse { pub enum FdManagerResponse {
/// response to [`FdManagerRequest::GetState`] /// response to [`FdManagerRequest::GetState`]
GetState(HashMap<ProcessId, u64>), GetState(HashMap<ProcessId, FdsLimit>),
/// response to [`FdManagerRequest::GetProcessFdCount`] /// response to [`FdManagerRequest::GetProcessFdLimit`]
GetProcessFdCount(u64), GetProcessFdLimit(u64),
}
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub struct FdsLimit {
pub limit: u64,
pub hit_count: u64,
} }
#[derive(Debug, Error)] #[derive(Debug, Error)]
@ -2111,6 +2120,6 @@ pub enum FdManagerError {
NotARequest, NotARequest,
#[error("fd_manager: received a non-FdManangerRequest")] #[error("fd_manager: received a non-FdManangerRequest")]
BadRequest, BadRequest,
#[error("fd_manager: received a FdManagerRequest::Cull, but I am the one who culls")] #[error("fd_manager: received a FdManagerRequest::FdsLimit, but I am the one who sets limits")]
FdManagerWasSentCull, FdManagerWasSentLimit,
} }