diff --git a/src/kernel/mod.rs b/src/kernel/mod.rs index e60c5f2a..937cd47a 100644 --- a/src/kernel/mod.rs +++ b/src/kernel/mod.rs @@ -1951,7 +1951,15 @@ async fn make_event_loop( } }, kernel_message = recv_in_loop.recv() => { - let kernel_message = kernel_message.expect("fatal: event loop died"); + let mut kernel_message = kernel_message.expect("fatal: event loop died"); + // the kernel treats the node-string "our" as a special case, + // and replaces it with the name of the node this kernel is running. + if kernel_message.source.node == "our" { + kernel_message.source.node = our_name.clone(); + } + if kernel_message.target.node == "our" { + kernel_message.target.node = our_name.clone(); + } // // here: are the special kernel-level capabilities checks! // diff --git a/src/main.rs b/src/main.rs index c40892c7..d08fad73 100644 --- a/src/main.rs +++ b/src/main.rs @@ -228,7 +228,7 @@ async fn main() { // the boolean flag determines whether the runtime module is *public* or not, // where public means that any process can always message it. - let mut runtime_extensions = vec![ + let runtime_extensions = vec![ ( ProcessId::new(Some("filesystem"), "sys", "uqbar"), fs_message_sender, diff --git a/src/timer.rs b/src/timer.rs index b6c59a72..72e210a1 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -79,33 +79,52 @@ pub async fn timer_service( loop { tokio::select! { Some(km) = timer_message_receiver.recv() => { - // we only handle Requests which contain a little-endian u64 as IPC + // ignore Requests sent from other nodes + if km.source.node != our { continue }; + // we only handle Requests which contain a little-endian u64 as IPC, + // except for a special "debug" message, which prints the current state let Message::Request(req) = km.message else { continue }; + if req.ipc == "debug".as_bytes() { + let _ = print_tx.send(Printout { + verbosity: 0, + content: format!("timer service active timers ({}):", timer_map.timers.len()), + }).await; + for (k, v) in timer_map.timers.iter() { + let _ = print_tx.send(Printout { + verbosity: 0, + content: format!("{}: {:?}", k, v), + }).await; + } + continue + } let Ok(bytes): Result<[u8; 8], _> = req.ipc.try_into() else { continue }; - let time = u64::from_le_bytes(bytes); - // if the timer is set to pop in the past, we immediately respond + let timer_secs = u64::from_le_bytes(bytes); + // if the timer is set to pop in 0 seconds, we immediately respond // otherwise, store in our persisted map, and spawn a task that // sleeps for the given time, then sends the response let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); - if time <= now { + let pop_time = now + timer_secs; + if timer_secs == 0 { send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await; - } else if timer_map.contains(time) { - timer_map.insert(time, km.id, km.rsvp.unwrap_or(km.source)); - } else { - timer_map.insert(time, km.id, km.rsvp.unwrap_or(km.source)); + continue + } + if !timer_map.contains(pop_time) { timer_tasks.spawn(async move { - tokio::time::sleep(tokio::time::Duration::from_secs(time - now)).await; - return time + tokio::time::sleep(std::time::Duration::from_secs(timer_secs)).await; + return pop_time }); } + timer_map.insert(pop_time, km.id, km.rsvp.unwrap_or(km.source)); + persist_state(&our, &timer_map, &kernel_message_sender).await; } Some(Ok(time)) = timer_tasks.join_next() => { // when a timer pops, we send the response to the process(es) that set // the timer(s), and then remove it from our persisted map let Some(timers) = timer_map.remove(time) else { continue }; + persist_state(&our, &timer_map, &kernel_message_sender).await; for (id, addr) in timers { send_response(&our, id, addr, &kernel_message_sender).await; } @@ -126,7 +145,8 @@ impl TimerMap { fn insert(&mut self, pop_time: u64, id: u64, addr: Address) { self.timers .entry(pop_time) - .and_modify(|v| v.push((id, addr))); + .or_insert(vec![]) + .push((id, addr)); } fn contains(&mut self, pop_time: u64) -> bool {