mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-19 14:41:42 +03:00
fix timer bugs, add our
replacement in kernel event loop
This commit is contained in:
parent
986ad845c8
commit
653341352b
@ -1951,7 +1951,15 @@ async fn make_event_loop(
|
||||
}
|
||||
},
|
||||
kernel_message = recv_in_loop.recv() => {
|
||||
let kernel_message = kernel_message.expect("fatal: event loop died");
|
||||
let mut kernel_message = kernel_message.expect("fatal: event loop died");
|
||||
// the kernel treats the node-string "our" as a special case,
|
||||
// and replaces it with the name of the node this kernel is running.
|
||||
if kernel_message.source.node == "our" {
|
||||
kernel_message.source.node = our_name.clone();
|
||||
}
|
||||
if kernel_message.target.node == "our" {
|
||||
kernel_message.target.node = our_name.clone();
|
||||
}
|
||||
//
|
||||
// here: are the special kernel-level capabilities checks!
|
||||
//
|
||||
|
@ -228,7 +228,7 @@ async fn main() {
|
||||
|
||||
// the boolean flag determines whether the runtime module is *public* or not,
|
||||
// where public means that any process can always message it.
|
||||
let mut runtime_extensions = vec![
|
||||
let runtime_extensions = vec![
|
||||
(
|
||||
ProcessId::new(Some("filesystem"), "sys", "uqbar"),
|
||||
fs_message_sender,
|
||||
|
42
src/timer.rs
42
src/timer.rs
@ -79,33 +79,52 @@ pub async fn timer_service(
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(km) = timer_message_receiver.recv() => {
|
||||
// we only handle Requests which contain a little-endian u64 as IPC
|
||||
// ignore Requests sent from other nodes
|
||||
if km.source.node != our { continue };
|
||||
// we only handle Requests which contain a little-endian u64 as IPC,
|
||||
// except for a special "debug" message, which prints the current state
|
||||
let Message::Request(req) = km.message else { continue };
|
||||
if req.ipc == "debug".as_bytes() {
|
||||
let _ = print_tx.send(Printout {
|
||||
verbosity: 0,
|
||||
content: format!("timer service active timers ({}):", timer_map.timers.len()),
|
||||
}).await;
|
||||
for (k, v) in timer_map.timers.iter() {
|
||||
let _ = print_tx.send(Printout {
|
||||
verbosity: 0,
|
||||
content: format!("{}: {:?}", k, v),
|
||||
}).await;
|
||||
}
|
||||
continue
|
||||
}
|
||||
let Ok(bytes): Result<[u8; 8], _> = req.ipc.try_into() else { continue };
|
||||
let time = u64::from_le_bytes(bytes);
|
||||
// if the timer is set to pop in the past, we immediately respond
|
||||
let timer_secs = u64::from_le_bytes(bytes);
|
||||
// if the timer is set to pop in 0 seconds, we immediately respond
|
||||
// otherwise, store in our persisted map, and spawn a task that
|
||||
// sleeps for the given time, then sends the response
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs();
|
||||
if time <= now {
|
||||
let pop_time = now + timer_secs;
|
||||
if timer_secs == 0 {
|
||||
send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await;
|
||||
} else if timer_map.contains(time) {
|
||||
timer_map.insert(time, km.id, km.rsvp.unwrap_or(km.source));
|
||||
} else {
|
||||
timer_map.insert(time, km.id, km.rsvp.unwrap_or(km.source));
|
||||
continue
|
||||
}
|
||||
if !timer_map.contains(pop_time) {
|
||||
timer_tasks.spawn(async move {
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(time - now)).await;
|
||||
return time
|
||||
tokio::time::sleep(std::time::Duration::from_secs(timer_secs)).await;
|
||||
return pop_time
|
||||
});
|
||||
}
|
||||
timer_map.insert(pop_time, km.id, km.rsvp.unwrap_or(km.source));
|
||||
persist_state(&our, &timer_map, &kernel_message_sender).await;
|
||||
}
|
||||
Some(Ok(time)) = timer_tasks.join_next() => {
|
||||
// when a timer pops, we send the response to the process(es) that set
|
||||
// the timer(s), and then remove it from our persisted map
|
||||
let Some(timers) = timer_map.remove(time) else { continue };
|
||||
persist_state(&our, &timer_map, &kernel_message_sender).await;
|
||||
for (id, addr) in timers {
|
||||
send_response(&our, id, addr, &kernel_message_sender).await;
|
||||
}
|
||||
@ -126,7 +145,8 @@ impl TimerMap {
|
||||
fn insert(&mut self, pop_time: u64, id: u64, addr: Address) {
|
||||
self.timers
|
||||
.entry(pop_time)
|
||||
.and_modify(|v| v.push((id, addr)));
|
||||
.or_insert(vec![])
|
||||
.push((id, addr));
|
||||
}
|
||||
|
||||
fn contains(&mut self, pop_time: u64) -> bool {
|
||||
|
Loading…
Reference in New Issue
Block a user