From 86c02bc3e404a2e85f28a971e651407f98e835f5 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 14 Nov 2023 15:34:17 -0500 Subject: [PATCH 1/3] new timer module --- src/main.rs | 20 ++++- src/timer.rs | 248 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/types.rs | 1 + 3 files changed, 268 insertions(+), 1 deletion(-) create mode 100644 src/timer.rs diff --git a/src/main.rs b/src/main.rs index 146cf69e..ecb8a6c4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +#![feature(btree_extract_if)] + use crate::types::*; use anyhow::Result; use clap::{arg, Command}; @@ -16,6 +18,7 @@ mod keygen; mod net; mod register; mod terminal; +mod timer; mod types; mod vfs; @@ -89,9 +92,11 @@ async fn main() { // http server channel w/ websockets (eyre) let (http_server_sender, http_server_receiver): (MessageSender, MessageReceiver) = mpsc::channel(HTTP_CHANNEL_CAPACITY); - // http client performs http requests on behalf of processes + let (timer_service_sender, timer_service_receiver): (MessageSender, MessageReceiver) = + mpsc::channel(HTTP_CHANNEL_CAPACITY); let (eth_rpc_sender, eth_rpc_receiver): (MessageSender, MessageReceiver) = mpsc::channel(ETH_RPC_CHANNEL_CAPACITY); + // http client performs http requests on behalf of processes let (http_client_sender, http_client_receiver): (MessageSender, MessageReceiver) = mpsc::channel(HTTP_CLIENT_CHANNEL_CAPACITY); // vfs maintains metadata about files in fs for processes @@ -219,6 +224,8 @@ async fn main() { println!("registration complete!"); + // the boolean flag determines whether the runtime module is *public* or not, + // where public means that any process can always message it. let mut runtime_extensions = vec![ ( ProcessId::new(Some("filesystem"), "sys", "uqbar"), @@ -235,6 +242,11 @@ async fn main() { http_client_sender, false, ), + ( + ProcessId::new(Some("timer"), "sys", "uqbar"), + timer_service_sender, + true, + ), ( ProcessId::new(Some("eth_rpc"), "sys", "uqbar"), eth_rpc_sender, @@ -339,6 +351,12 @@ async fn main() { http_client_receiver, print_sender.clone(), )); + tasks.spawn(timer::timer_service( + our.name.clone(), + kernel_message_sender.clone(), + timer_service_receiver, + print_sender.clone(), + )); tasks.spawn(eth_rpc::eth_rpc( our.name.clone(), rpc_url.clone(), diff --git a/src/timer.rs b/src/timer.rs new file mode 100644 index 00000000..b6c59a72 --- /dev/null +++ b/src/timer.rs @@ -0,0 +1,248 @@ +use crate::types::{ + Address, FsAction, FsError, FsResponse, KernelMessage, Message, MessageReceiver, MessageSender, + Payload, PrintSender, Printout, Request, Response, FILESYSTEM_PROCESS_ID, TIMER_PROCESS_ID, +}; +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; + +/// A runtime module that allows processes to set timers. Interacting with the +/// timer is done with a simple Request/Response pattern, and the timer module +/// is public, so it can be used by any local process. It will not respond to +/// requests made by other nodes. +/// +/// The interface of the timer module is as follows: +/// One kind of request is accepted: the IPC must be a little-endian byte-representation +/// of an unsigned 64-bit integer, in seconds. This request should always expect a Response. +/// If the request does not expect a Response, the timer will not be set. +/// +/// A proper Request will trigger the timer module to send a Response. The Response will be +/// empty, so the user should either `send_and_await` the Request, or attach a `context` so +/// they can match the Response with their purpose. +/// +pub async fn timer_service( + our: String, + kernel_message_sender: MessageSender, + mut timer_message_receiver: MessageReceiver, + print_tx: PrintSender, +) -> Result<()> { + // if we have a persisted state file, load it + let mut timer_map = + match load_state_from_reboot(&our, &kernel_message_sender, &mut timer_message_receiver) + .await + { + Ok(timer_map) => timer_map, + Err(e) => { + let _ = print_tx + .send(Printout { + verbosity: 1, + content: format!("Failed to load state from reboot: {:?}", e), + }) + .await; + TimerMap { + timers: BTreeMap::new(), + } + } + }; + // for any persisted timers that have popped, send their responses + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + for (id, addr) in timer_map.drain_expired(now) { + let _ = kernel_message_sender + .send(KernelMessage { + id, + source: Address { + node: our.clone(), + process: TIMER_PROCESS_ID.clone(), + }, + target: addr, + rsvp: None, + message: Message::Response(( + Response { + inherit: false, + ipc: vec![], + metadata: None, + }, + None, + )), + payload: None, + signed_capabilities: None, + }) + .await; + } + // and then re-persist the new state of the timer map + persist_state(&our, &timer_map, &kernel_message_sender).await; + // joinset holds active in-mem timers + let mut timer_tasks = tokio::task::JoinSet::::new(); + loop { + tokio::select! { + Some(km) = timer_message_receiver.recv() => { + // we only handle Requests which contain a little-endian u64 as IPC + let Message::Request(req) = km.message else { continue }; + let Ok(bytes): Result<[u8; 8], _> = req.ipc.try_into() else { continue }; + let time = u64::from_le_bytes(bytes); + // if the timer is set to pop in the past, we immediately respond + // otherwise, store in our persisted map, and spawn a task that + // sleeps for the given time, then sends the response + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + if time <= now { + send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await; + } else if timer_map.contains(time) { + timer_map.insert(time, km.id, km.rsvp.unwrap_or(km.source)); + } else { + timer_map.insert(time, km.id, km.rsvp.unwrap_or(km.source)); + timer_tasks.spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_secs(time - now)).await; + return time + }); + } + } + Some(Ok(time)) = timer_tasks.join_next() => { + // when a timer pops, we send the response to the process(es) that set + // the timer(s), and then remove it from our persisted map + let Some(timers) = timer_map.remove(time) else { continue }; + for (id, addr) in timers { + send_response(&our, id, addr, &kernel_message_sender).await; + } + } + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +struct TimerMap { + // key: the unix timestamp at which the timer pops + // value: a vector of KernelMessage ids and who to send Response to + // this is because multiple processes can set timers for the same time + timers: BTreeMap>, +} + +impl TimerMap { + fn insert(&mut self, pop_time: u64, id: u64, addr: Address) { + self.timers + .entry(pop_time) + .and_modify(|v| v.push((id, addr))); + } + + fn contains(&mut self, pop_time: u64) -> bool { + self.timers.contains_key(&pop_time) + } + + fn remove(&mut self, pop_time: u64) -> Option> { + self.timers.remove(&pop_time) + } + + fn drain_expired(&mut self, time: u64) -> Vec<(u64, Address)> { + return self + .timers + .extract_if(|k, _| *k <= time) + .map(|(_, v)| v) + .flatten() + .collect(); + } +} + +async fn send_response(our_node: &str, id: u64, target: Address, send_to_loop: &MessageSender) { + let _ = send_to_loop + .send(KernelMessage { + id, + source: Address { + node: our_node.to_string(), + process: TIMER_PROCESS_ID.clone(), + }, + target, + rsvp: None, + message: Message::Response(( + Response { + inherit: false, + ipc: vec![], + metadata: None, + }, + None, + )), + payload: None, + signed_capabilities: None, + }) + .await; +} + +async fn persist_state(our_node: &str, state: &TimerMap, send_to_loop: &MessageSender) { + let _ = send_to_loop + .send(KernelMessage { + id: rand::random(), + source: Address { + node: our_node.to_string(), + process: TIMER_PROCESS_ID.clone(), + }, + target: Address { + node: our_node.to_string(), + process: FILESYSTEM_PROCESS_ID.clone(), + }, + rsvp: None, + message: Message::Request(Request { + inherit: false, + expects_response: None, + ipc: serde_json::to_vec(&FsAction::SetState(TIMER_PROCESS_ID.clone())).unwrap(), + metadata: None, + }), + payload: Some(Payload { + mime: None, + bytes: bincode::serialize(&state).unwrap(), + }), + signed_capabilities: None, + }) + .await; +} + +async fn load_state_from_reboot( + our_node: &str, + send_to_loop: &MessageSender, + recv_from_loop: &mut MessageReceiver, +) -> Result { + let _ = send_to_loop + .send(KernelMessage { + id: rand::random(), + source: Address { + node: our_node.to_string(), + process: TIMER_PROCESS_ID.clone(), + }, + target: Address { + node: our_node.to_string(), + process: FILESYSTEM_PROCESS_ID.clone(), + }, + rsvp: None, + message: Message::Request(Request { + inherit: true, + expects_response: Some(5), // TODO evaluate + ipc: serde_json::to_vec(&FsAction::GetState(TIMER_PROCESS_ID.clone())).unwrap(), + metadata: None, + }), + payload: None, + signed_capabilities: None, + }) + .await; + let km = recv_from_loop.recv().await; + let Some(km) = km else { + return Err(anyhow::anyhow!("Failed to load state from reboot!")); + }; + + let KernelMessage { + message, payload, .. + } = km; + let Message::Response((Response { ipc, .. }, None)) = message else { + return Err(anyhow::anyhow!("Failed to load state from reboot!")); + }; + let Ok(Ok(FsResponse::GetState)) = serde_json::from_slice::>(&ipc) + else { + return Err(anyhow::anyhow!("Failed to load state from reboot!")); + }; + let Some(payload) = payload else { + return Err(anyhow::anyhow!("Failed to load state from reboot!")); + }; + return Ok(bincode::deserialize::(&payload.bytes)?); +} diff --git a/src/types.rs b/src/types.rs index 7fe2018e..cd613c26 100644 --- a/src/types.rs +++ b/src/types.rs @@ -12,6 +12,7 @@ lazy_static::lazy_static! { pub static ref HTTP_SERVER_PROCESS_ID: ProcessId = ProcessId::new(Some("http_server"), "sys", "uqbar"); pub static ref KERNEL_PROCESS_ID: ProcessId = ProcessId::new(Some("kernel"), "sys", "uqbar"); pub static ref TERMINAL_PROCESS_ID: ProcessId = ProcessId::new(Some("terminal"), "terminal", "uqbar"); + pub static ref TIMER_PROCESS_ID: ProcessId = ProcessId::new(Some("timer"), "sys", "uqbar"); pub static ref VFS_PROCESS_ID: ProcessId = ProcessId::new(Some("vfs"), "sys", "uqbar"); } From 653341352bc093cb0ed3fd6148b66f9d4a44996f Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 14 Nov 2023 22:23:15 -0500 Subject: [PATCH 2/3] fix timer bugs, add `our` replacement in kernel event loop --- src/kernel/mod.rs | 10 +++++++++- src/main.rs | 2 +- src/timer.rs | 42 +++++++++++++++++++++++++++++++----------- 3 files changed, 41 insertions(+), 13 deletions(-) diff --git a/src/kernel/mod.rs b/src/kernel/mod.rs index e60c5f2a..937cd47a 100644 --- a/src/kernel/mod.rs +++ b/src/kernel/mod.rs @@ -1951,7 +1951,15 @@ async fn make_event_loop( } }, kernel_message = recv_in_loop.recv() => { - let kernel_message = kernel_message.expect("fatal: event loop died"); + let mut kernel_message = kernel_message.expect("fatal: event loop died"); + // the kernel treats the node-string "our" as a special case, + // and replaces it with the name of the node this kernel is running. + if kernel_message.source.node == "our" { + kernel_message.source.node = our_name.clone(); + } + if kernel_message.target.node == "our" { + kernel_message.target.node = our_name.clone(); + } // // here: are the special kernel-level capabilities checks! // diff --git a/src/main.rs b/src/main.rs index c40892c7..d08fad73 100644 --- a/src/main.rs +++ b/src/main.rs @@ -228,7 +228,7 @@ async fn main() { // the boolean flag determines whether the runtime module is *public* or not, // where public means that any process can always message it. - let mut runtime_extensions = vec![ + let runtime_extensions = vec![ ( ProcessId::new(Some("filesystem"), "sys", "uqbar"), fs_message_sender, diff --git a/src/timer.rs b/src/timer.rs index b6c59a72..72e210a1 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -79,33 +79,52 @@ pub async fn timer_service( loop { tokio::select! { Some(km) = timer_message_receiver.recv() => { - // we only handle Requests which contain a little-endian u64 as IPC + // ignore Requests sent from other nodes + if km.source.node != our { continue }; + // we only handle Requests which contain a little-endian u64 as IPC, + // except for a special "debug" message, which prints the current state let Message::Request(req) = km.message else { continue }; + if req.ipc == "debug".as_bytes() { + let _ = print_tx.send(Printout { + verbosity: 0, + content: format!("timer service active timers ({}):", timer_map.timers.len()), + }).await; + for (k, v) in timer_map.timers.iter() { + let _ = print_tx.send(Printout { + verbosity: 0, + content: format!("{}: {:?}", k, v), + }).await; + } + continue + } let Ok(bytes): Result<[u8; 8], _> = req.ipc.try_into() else { continue }; - let time = u64::from_le_bytes(bytes); - // if the timer is set to pop in the past, we immediately respond + let timer_secs = u64::from_le_bytes(bytes); + // if the timer is set to pop in 0 seconds, we immediately respond // otherwise, store in our persisted map, and spawn a task that // sleeps for the given time, then sends the response let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); - if time <= now { + let pop_time = now + timer_secs; + if timer_secs == 0 { send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await; - } else if timer_map.contains(time) { - timer_map.insert(time, km.id, km.rsvp.unwrap_or(km.source)); - } else { - timer_map.insert(time, km.id, km.rsvp.unwrap_or(km.source)); + continue + } + if !timer_map.contains(pop_time) { timer_tasks.spawn(async move { - tokio::time::sleep(tokio::time::Duration::from_secs(time - now)).await; - return time + tokio::time::sleep(std::time::Duration::from_secs(timer_secs)).await; + return pop_time }); } + timer_map.insert(pop_time, km.id, km.rsvp.unwrap_or(km.source)); + persist_state(&our, &timer_map, &kernel_message_sender).await; } Some(Ok(time)) = timer_tasks.join_next() => { // when a timer pops, we send the response to the process(es) that set // the timer(s), and then remove it from our persisted map let Some(timers) = timer_map.remove(time) else { continue }; + persist_state(&our, &timer_map, &kernel_message_sender).await; for (id, addr) in timers { send_response(&our, id, addr, &kernel_message_sender).await; } @@ -126,7 +145,8 @@ impl TimerMap { fn insert(&mut self, pop_time: u64, id: u64, addr: Address) { self.timers .entry(pop_time) - .and_modify(|v| v.push((id, addr))); + .or_insert(vec![]) + .push((id, addr)); } fn contains(&mut self, pop_time: u64) -> bool { From 4d05fd9718c0756563b2c484019bf77eb6250b6e Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Wed, 15 Nov 2023 10:51:09 -0500 Subject: [PATCH 3/3] remove all persisted aspects of timer service --- src/timer.rs | 136 +++------------------------------------------------ 1 file changed, 7 insertions(+), 129 deletions(-) diff --git a/src/timer.rs b/src/timer.rs index 72e210a1..81efaf26 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -1,6 +1,6 @@ use crate::types::{ - Address, FsAction, FsError, FsResponse, KernelMessage, Message, MessageReceiver, MessageSender, - Payload, PrintSender, Printout, Request, Response, FILESYSTEM_PROCESS_ID, TIMER_PROCESS_ID, + Address, KernelMessage, Message, MessageReceiver, MessageSender, PrintSender, Printout, + Response, TIMER_PROCESS_ID, }; use anyhow::Result; use serde::{Deserialize, Serialize}; @@ -27,54 +27,10 @@ pub async fn timer_service( print_tx: PrintSender, ) -> Result<()> { // if we have a persisted state file, load it - let mut timer_map = - match load_state_from_reboot(&our, &kernel_message_sender, &mut timer_message_receiver) - .await - { - Ok(timer_map) => timer_map, - Err(e) => { - let _ = print_tx - .send(Printout { - verbosity: 1, - content: format!("Failed to load state from reboot: {:?}", e), - }) - .await; - TimerMap { - timers: BTreeMap::new(), - } - } - }; - // for any persisted timers that have popped, send their responses - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(); - for (id, addr) in timer_map.drain_expired(now) { - let _ = kernel_message_sender - .send(KernelMessage { - id, - source: Address { - node: our.clone(), - process: TIMER_PROCESS_ID.clone(), - }, - target: addr, - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - ipc: vec![], - metadata: None, - }, - None, - )), - payload: None, - signed_capabilities: None, - }) - .await; - } - // and then re-persist the new state of the timer map - persist_state(&our, &timer_map, &kernel_message_sender).await; - // joinset holds active in-mem timers + let mut timer_map = TimerMap { + timers: BTreeMap::new(), + }; + // joinset holds 1 active timer per expiration-time let mut timer_tasks = tokio::task::JoinSet::::new(); loop { tokio::select! { @@ -118,13 +74,11 @@ pub async fn timer_service( }); } timer_map.insert(pop_time, km.id, km.rsvp.unwrap_or(km.source)); - persist_state(&our, &timer_map, &kernel_message_sender).await; } Some(Ok(time)) = timer_tasks.join_next() => { // when a timer pops, we send the response to the process(es) that set // the timer(s), and then remove it from our persisted map let Some(timers) = timer_map.remove(time) else { continue }; - persist_state(&our, &timer_map, &kernel_message_sender).await; for (id, addr) in timers { send_response(&our, id, addr, &kernel_message_sender).await; } @@ -157,7 +111,7 @@ impl TimerMap { self.timers.remove(&pop_time) } - fn drain_expired(&mut self, time: u64) -> Vec<(u64, Address)> { + fn _drain_expired(&mut self, time: u64) -> Vec<(u64, Address)> { return self .timers .extract_if(|k, _| *k <= time) @@ -190,79 +144,3 @@ async fn send_response(our_node: &str, id: u64, target: Address, send_to_loop: & }) .await; } - -async fn persist_state(our_node: &str, state: &TimerMap, send_to_loop: &MessageSender) { - let _ = send_to_loop - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our_node.to_string(), - process: TIMER_PROCESS_ID.clone(), - }, - target: Address { - node: our_node.to_string(), - process: FILESYSTEM_PROCESS_ID.clone(), - }, - rsvp: None, - message: Message::Request(Request { - inherit: false, - expects_response: None, - ipc: serde_json::to_vec(&FsAction::SetState(TIMER_PROCESS_ID.clone())).unwrap(), - metadata: None, - }), - payload: Some(Payload { - mime: None, - bytes: bincode::serialize(&state).unwrap(), - }), - signed_capabilities: None, - }) - .await; -} - -async fn load_state_from_reboot( - our_node: &str, - send_to_loop: &MessageSender, - recv_from_loop: &mut MessageReceiver, -) -> Result { - let _ = send_to_loop - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our_node.to_string(), - process: TIMER_PROCESS_ID.clone(), - }, - target: Address { - node: our_node.to_string(), - process: FILESYSTEM_PROCESS_ID.clone(), - }, - rsvp: None, - message: Message::Request(Request { - inherit: true, - expects_response: Some(5), // TODO evaluate - ipc: serde_json::to_vec(&FsAction::GetState(TIMER_PROCESS_ID.clone())).unwrap(), - metadata: None, - }), - payload: None, - signed_capabilities: None, - }) - .await; - let km = recv_from_loop.recv().await; - let Some(km) = km else { - return Err(anyhow::anyhow!("Failed to load state from reboot!")); - }; - - let KernelMessage { - message, payload, .. - } = km; - let Message::Response((Response { ipc, .. }, None)) = message else { - return Err(anyhow::anyhow!("Failed to load state from reboot!")); - }; - let Ok(Ok(FsResponse::GetState)) = serde_json::from_slice::>(&ipc) - else { - return Err(anyhow::anyhow!("Failed to load state from reboot!")); - }; - let Some(payload) = payload else { - return Err(anyhow::anyhow!("Failed to load state from reboot!")); - }; - return Ok(bincode::deserialize::(&payload.bytes)?); -}