new timer module

This commit is contained in:
dr-frmr 2023-11-14 15:34:17 -05:00
parent cab7fea897
commit 86c02bc3e4
No known key found for this signature in database
3 changed files with 268 additions and 1 deletions

View File

@ -1,3 +1,5 @@
#![feature(btree_extract_if)]
use crate::types::*;
use anyhow::Result;
use clap::{arg, Command};
@ -16,6 +18,7 @@ mod keygen;
mod net;
mod register;
mod terminal;
mod timer;
mod types;
mod vfs;
@ -89,9 +92,11 @@ async fn main() {
// http server channel w/ websockets (eyre)
let (http_server_sender, http_server_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(HTTP_CHANNEL_CAPACITY);
// http client performs http requests on behalf of processes
let (timer_service_sender, timer_service_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(HTTP_CHANNEL_CAPACITY);
let (eth_rpc_sender, eth_rpc_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(ETH_RPC_CHANNEL_CAPACITY);
// http client performs http requests on behalf of processes
let (http_client_sender, http_client_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(HTTP_CLIENT_CHANNEL_CAPACITY);
// vfs maintains metadata about files in fs for processes
@ -219,6 +224,8 @@ async fn main() {
println!("registration complete!");
// the boolean flag determines whether the runtime module is *public* or not,
// where public means that any process can always message it.
let mut runtime_extensions = vec![
(
ProcessId::new(Some("filesystem"), "sys", "uqbar"),
@ -235,6 +242,11 @@ async fn main() {
http_client_sender,
false,
),
(
ProcessId::new(Some("timer"), "sys", "uqbar"),
timer_service_sender,
true,
),
(
ProcessId::new(Some("eth_rpc"), "sys", "uqbar"),
eth_rpc_sender,
@ -339,6 +351,12 @@ async fn main() {
http_client_receiver,
print_sender.clone(),
));
tasks.spawn(timer::timer_service(
our.name.clone(),
kernel_message_sender.clone(),
timer_service_receiver,
print_sender.clone(),
));
tasks.spawn(eth_rpc::eth_rpc(
our.name.clone(),
rpc_url.clone(),

248
src/timer.rs Normal file
View File

@ -0,0 +1,248 @@
use crate::types::{
Address, FsAction, FsError, FsResponse, KernelMessage, Message, MessageReceiver, MessageSender,
Payload, PrintSender, Printout, Request, Response, FILESYSTEM_PROCESS_ID, TIMER_PROCESS_ID,
};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
/// A runtime module that allows processes to set timers. Interacting with the
/// timer is done with a simple Request/Response pattern, and the timer module
/// is public, so it can be used by any local process. It will not respond to
/// requests made by other nodes.
///
/// The interface of the timer module is as follows:
/// One kind of request is accepted: the IPC must be a little-endian byte-representation
/// of an unsigned 64-bit integer, in seconds. This request should always expect a Response.
/// If the request does not expect a Response, the timer will not be set.
///
/// A proper Request will trigger the timer module to send a Response. The Response will be
/// empty, so the user should either `send_and_await` the Request, or attach a `context` so
/// they can match the Response with their purpose.
///
pub async fn timer_service(
our: String,
kernel_message_sender: MessageSender,
mut timer_message_receiver: MessageReceiver,
print_tx: PrintSender,
) -> Result<()> {
// if we have a persisted state file, load it
let mut timer_map =
match load_state_from_reboot(&our, &kernel_message_sender, &mut timer_message_receiver)
.await
{
Ok(timer_map) => timer_map,
Err(e) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("Failed to load state from reboot: {:?}", e),
})
.await;
TimerMap {
timers: BTreeMap::new(),
}
}
};
// for any persisted timers that have popped, send their responses
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
for (id, addr) in timer_map.drain_expired(now) {
let _ = kernel_message_sender
.send(KernelMessage {
id,
source: Address {
node: our.clone(),
process: TIMER_PROCESS_ID.clone(),
},
target: addr,
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: vec![],
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
})
.await;
}
// and then re-persist the new state of the timer map
persist_state(&our, &timer_map, &kernel_message_sender).await;
// joinset holds active in-mem timers
let mut timer_tasks = tokio::task::JoinSet::<u64>::new();
loop {
tokio::select! {
Some(km) = timer_message_receiver.recv() => {
// we only handle Requests which contain a little-endian u64 as IPC
let Message::Request(req) = km.message else { continue };
let Ok(bytes): Result<[u8; 8], _> = req.ipc.try_into() else { continue };
let time = u64::from_le_bytes(bytes);
// if the timer is set to pop in the past, we immediately respond
// otherwise, store in our persisted map, and spawn a task that
// sleeps for the given time, then sends the response
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
if time <= now {
send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await;
} else if timer_map.contains(time) {
timer_map.insert(time, km.id, km.rsvp.unwrap_or(km.source));
} else {
timer_map.insert(time, km.id, km.rsvp.unwrap_or(km.source));
timer_tasks.spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_secs(time - now)).await;
return time
});
}
}
Some(Ok(time)) = timer_tasks.join_next() => {
// when a timer pops, we send the response to the process(es) that set
// the timer(s), and then remove it from our persisted map
let Some(timers) = timer_map.remove(time) else { continue };
for (id, addr) in timers {
send_response(&our, id, addr, &kernel_message_sender).await;
}
}
}
}
}
#[derive(Serialize, Deserialize, Debug)]
struct TimerMap {
// key: the unix timestamp at which the timer pops
// value: a vector of KernelMessage ids and who to send Response to
// this is because multiple processes can set timers for the same time
timers: BTreeMap<u64, Vec<(u64, Address)>>,
}
impl TimerMap {
fn insert(&mut self, pop_time: u64, id: u64, addr: Address) {
self.timers
.entry(pop_time)
.and_modify(|v| v.push((id, addr)));
}
fn contains(&mut self, pop_time: u64) -> bool {
self.timers.contains_key(&pop_time)
}
fn remove(&mut self, pop_time: u64) -> Option<Vec<(u64, Address)>> {
self.timers.remove(&pop_time)
}
fn drain_expired(&mut self, time: u64) -> Vec<(u64, Address)> {
return self
.timers
.extract_if(|k, _| *k <= time)
.map(|(_, v)| v)
.flatten()
.collect();
}
}
async fn send_response(our_node: &str, id: u64, target: Address, send_to_loop: &MessageSender) {
let _ = send_to_loop
.send(KernelMessage {
id,
source: Address {
node: our_node.to_string(),
process: TIMER_PROCESS_ID.clone(),
},
target,
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: vec![],
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
})
.await;
}
async fn persist_state(our_node: &str, state: &TimerMap, send_to_loop: &MessageSender) {
let _ = send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our_node.to_string(),
process: TIMER_PROCESS_ID.clone(),
},
target: Address {
node: our_node.to_string(),
process: FILESYSTEM_PROCESS_ID.clone(),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
ipc: serde_json::to_vec(&FsAction::SetState(TIMER_PROCESS_ID.clone())).unwrap(),
metadata: None,
}),
payload: Some(Payload {
mime: None,
bytes: bincode::serialize(&state).unwrap(),
}),
signed_capabilities: None,
})
.await;
}
async fn load_state_from_reboot(
our_node: &str,
send_to_loop: &MessageSender,
recv_from_loop: &mut MessageReceiver,
) -> Result<TimerMap> {
let _ = send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our_node.to_string(),
process: TIMER_PROCESS_ID.clone(),
},
target: Address {
node: our_node.to_string(),
process: FILESYSTEM_PROCESS_ID.clone(),
},
rsvp: None,
message: Message::Request(Request {
inherit: true,
expects_response: Some(5), // TODO evaluate
ipc: serde_json::to_vec(&FsAction::GetState(TIMER_PROCESS_ID.clone())).unwrap(),
metadata: None,
}),
payload: None,
signed_capabilities: None,
})
.await;
let km = recv_from_loop.recv().await;
let Some(km) = km else {
return Err(anyhow::anyhow!("Failed to load state from reboot!"));
};
let KernelMessage {
message, payload, ..
} = km;
let Message::Response((Response { ipc, .. }, None)) = message else {
return Err(anyhow::anyhow!("Failed to load state from reboot!"));
};
let Ok(Ok(FsResponse::GetState)) = serde_json::from_slice::<Result<FsResponse, FsError>>(&ipc)
else {
return Err(anyhow::anyhow!("Failed to load state from reboot!"));
};
let Some(payload) = payload else {
return Err(anyhow::anyhow!("Failed to load state from reboot!"));
};
return Ok(bincode::deserialize::<TimerMap>(&payload.bytes)?);
}

View File

@ -12,6 +12,7 @@ lazy_static::lazy_static! {
pub static ref HTTP_SERVER_PROCESS_ID: ProcessId = ProcessId::new(Some("http_server"), "sys", "uqbar");
pub static ref KERNEL_PROCESS_ID: ProcessId = ProcessId::new(Some("kernel"), "sys", "uqbar");
pub static ref TERMINAL_PROCESS_ID: ProcessId = ProcessId::new(Some("terminal"), "terminal", "uqbar");
pub static ref TIMER_PROCESS_ID: ProcessId = ProcessId::new(Some("timer"), "sys", "uqbar");
pub static ref VFS_PROCESS_ID: ProcessId = ProcessId::new(Some("vfs"), "sys", "uqbar");
}