Merge pull request #63 from uqbar-dao/dr/timer-module

Timer runtime module
This commit is contained in:
dr-frmr 2023-11-15 12:25:44 -05:00 committed by GitHub
commit 6198e873ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 176 additions and 3 deletions

View File

@ -1951,7 +1951,15 @@ async fn make_event_loop(
}
},
kernel_message = recv_in_loop.recv() => {
let kernel_message = kernel_message.expect("fatal: event loop died");
let mut kernel_message = kernel_message.expect("fatal: event loop died");
// the kernel treats the node-string "our" as a special case,
// and replaces it with the name of the node this kernel is running.
if kernel_message.source.node == "our" {
kernel_message.source.node = our_name.clone();
}
if kernel_message.target.node == "our" {
kernel_message.target.node = our_name.clone();
}
//
// here: are the special kernel-level capabilities checks!
//

View File

@ -1,3 +1,5 @@
#![feature(btree_extract_if)]
use crate::types::*;
use anyhow::Result;
use clap::{arg, Command};
@ -16,6 +18,7 @@ mod keygen;
mod net;
mod register;
mod terminal;
mod timer;
mod types;
mod vfs;
@ -95,9 +98,11 @@ async fn main() {
// http server channel w/ websockets (eyre)
let (http_server_sender, http_server_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(HTTP_CHANNEL_CAPACITY);
// http client performs http requests on behalf of processes
let (timer_service_sender, timer_service_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(HTTP_CHANNEL_CAPACITY);
let (eth_rpc_sender, eth_rpc_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(ETH_RPC_CHANNEL_CAPACITY);
// http client performs http requests on behalf of processes
let (http_client_sender, http_client_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(HTTP_CLIENT_CHANNEL_CAPACITY);
// vfs maintains metadata about files in fs for processes
@ -226,7 +231,9 @@ async fn main() {
println!("registration complete!");
let mut runtime_extensions = vec![
// the boolean flag determines whether the runtime module is *public* or not,
// where public means that any process can always message it.
let runtime_extensions = vec![
(
ProcessId::new(Some("filesystem"), "sys", "uqbar"),
fs_message_sender,
@ -242,6 +249,11 @@ async fn main() {
http_client_sender,
false,
),
(
ProcessId::new(Some("timer"), "sys", "uqbar"),
timer_service_sender,
true,
),
(
ProcessId::new(Some("eth_rpc"), "sys", "uqbar"),
eth_rpc_sender,
@ -346,6 +358,12 @@ async fn main() {
http_client_receiver,
print_sender.clone(),
));
tasks.spawn(timer::timer_service(
our.name.clone(),
kernel_message_sender.clone(),
timer_service_receiver,
print_sender.clone(),
));
tasks.spawn(eth_rpc::eth_rpc(
our.name.clone(),
rpc_url.clone(),

146
src/timer.rs Normal file
View File

@ -0,0 +1,146 @@
use crate::types::{
Address, KernelMessage, Message, MessageReceiver, MessageSender, PrintSender, Printout,
Response, TIMER_PROCESS_ID,
};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
/// A runtime module that allows processes to set timers. Interacting with the
/// timer is done with a simple Request/Response pattern, and the timer module
/// is public, so it can be used by any local process. It will not respond to
/// requests made by other nodes.
///
/// The interface of the timer module is as follows:
/// One kind of request is accepted: the IPC must be a little-endian byte-representation
/// of an unsigned 64-bit integer, in seconds. This request should always expect a Response.
/// If the request does not expect a Response, the timer will not be set.
///
/// A proper Request will trigger the timer module to send a Response. The Response will be
/// empty, so the user should either `send_and_await` the Request, or attach a `context` so
/// they can match the Response with their purpose.
///
pub async fn timer_service(
our: String,
kernel_message_sender: MessageSender,
mut timer_message_receiver: MessageReceiver,
print_tx: PrintSender,
) -> Result<()> {
// if we have a persisted state file, load it
let mut timer_map = TimerMap {
timers: BTreeMap::new(),
};
// joinset holds 1 active timer per expiration-time
let mut timer_tasks = tokio::task::JoinSet::<u64>::new();
loop {
tokio::select! {
Some(km) = timer_message_receiver.recv() => {
// ignore Requests sent from other nodes
if km.source.node != our { continue };
// we only handle Requests which contain a little-endian u64 as IPC,
// except for a special "debug" message, which prints the current state
let Message::Request(req) = km.message else { continue };
if req.ipc == "debug".as_bytes() {
let _ = print_tx.send(Printout {
verbosity: 0,
content: format!("timer service active timers ({}):", timer_map.timers.len()),
}).await;
for (k, v) in timer_map.timers.iter() {
let _ = print_tx.send(Printout {
verbosity: 0,
content: format!("{}: {:?}", k, v),
}).await;
}
continue
}
let Ok(bytes): Result<[u8; 8], _> = req.ipc.try_into() else { continue };
let timer_secs = u64::from_le_bytes(bytes);
// if the timer is set to pop in 0 seconds, we immediately respond
// otherwise, store in our persisted map, and spawn a task that
// sleeps for the given time, then sends the response
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let pop_time = now + timer_secs;
if timer_secs == 0 {
send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await;
continue
}
if !timer_map.contains(pop_time) {
timer_tasks.spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(timer_secs)).await;
return pop_time
});
}
timer_map.insert(pop_time, km.id, km.rsvp.unwrap_or(km.source));
}
Some(Ok(time)) = timer_tasks.join_next() => {
// when a timer pops, we send the response to the process(es) that set
// the timer(s), and then remove it from our persisted map
let Some(timers) = timer_map.remove(time) else { continue };
for (id, addr) in timers {
send_response(&our, id, addr, &kernel_message_sender).await;
}
}
}
}
}
#[derive(Serialize, Deserialize, Debug)]
struct TimerMap {
// key: the unix timestamp at which the timer pops
// value: a vector of KernelMessage ids and who to send Response to
// this is because multiple processes can set timers for the same time
timers: BTreeMap<u64, Vec<(u64, Address)>>,
}
impl TimerMap {
fn insert(&mut self, pop_time: u64, id: u64, addr: Address) {
self.timers
.entry(pop_time)
.or_insert(vec![])
.push((id, addr));
}
fn contains(&mut self, pop_time: u64) -> bool {
self.timers.contains_key(&pop_time)
}
fn remove(&mut self, pop_time: u64) -> Option<Vec<(u64, Address)>> {
self.timers.remove(&pop_time)
}
fn _drain_expired(&mut self, time: u64) -> Vec<(u64, Address)> {
return self
.timers
.extract_if(|k, _| *k <= time)
.map(|(_, v)| v)
.flatten()
.collect();
}
}
async fn send_response(our_node: &str, id: u64, target: Address, send_to_loop: &MessageSender) {
let _ = send_to_loop
.send(KernelMessage {
id,
source: Address {
node: our_node.to_string(),
process: TIMER_PROCESS_ID.clone(),
},
target,
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: vec![],
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
})
.await;
}

View File

@ -12,6 +12,7 @@ lazy_static::lazy_static! {
pub static ref HTTP_SERVER_PROCESS_ID: ProcessId = ProcessId::new(Some("http_server"), "sys", "uqbar");
pub static ref KERNEL_PROCESS_ID: ProcessId = ProcessId::new(Some("kernel"), "sys", "uqbar");
pub static ref TERMINAL_PROCESS_ID: ProcessId = ProcessId::new(Some("terminal"), "terminal", "uqbar");
pub static ref TIMER_PROCESS_ID: ProcessId = ProcessId::new(Some("timer"), "sys", "uqbar");
pub static ref VFS_PROCESS_ID: ProcessId = ProcessId::new(Some("vfs"), "sys", "uqbar");
}