From 11a0c40cd7da06f57639e8f881829b2fc6427b6f Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Thu, 16 Nov 2023 12:27:16 -0500 Subject: [PATCH] timer -> millisecond granularity --- Cargo.lock | 7 +++++++ Cargo.toml | 1 + src/timer.rs | 30 ++++++++++-------------------- 3 files changed, 18 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aeede84c..8f39d09a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2921,6 +2921,12 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb" +[[package]] +name = "nohash-hasher" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" + [[package]] name = "num-bigint" version = "0.4.4" @@ -5113,6 +5119,7 @@ dependencies = [ "jwt", "lazy_static", "log", + "nohash-hasher", "num-traits", "open", "public-ip", diff --git a/Cargo.toml b/Cargo.toml index 7c947b7e..07be5539 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ http = "0.2.9" jwt = "0.16" lazy_static = "1.4.0" log = "*" +nohash-hasher = "0.2.0" num-traits = "0.2" open = "5.0.0" public-ip = "0.2.2" diff --git a/src/timer.rs b/src/timer.rs index 81efaf26..cdd7cefd 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -4,7 +4,6 @@ use crate::types::{ }; use anyhow::Result; use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; /// A runtime module that allows processes to set timers. Interacting with the /// timer is done with a simple Request/Response pattern, and the timer module @@ -13,7 +12,7 @@ use std::collections::BTreeMap; /// /// The interface of the timer module is as follows: /// One kind of request is accepted: the IPC must be a little-endian byte-representation -/// of an unsigned 64-bit integer, in seconds. This request should always expect a Response. +/// of an unsigned 64-bit integer, in milliseconds. This request should always expect a Response. /// If the request does not expect a Response, the timer will not be set. /// /// A proper Request will trigger the timer module to send a Response. The Response will be @@ -28,7 +27,7 @@ pub async fn timer_service( ) -> Result<()> { // if we have a persisted state file, load it let mut timer_map = TimerMap { - timers: BTreeMap::new(), + timers: nohash_hasher::IntMap::default(), }; // joinset holds 1 active timer per expiration-time let mut timer_tasks = tokio::task::JoinSet::::new(); @@ -54,22 +53,22 @@ pub async fn timer_service( continue } let Ok(bytes): Result<[u8; 8], _> = req.ipc.try_into() else { continue }; - let timer_secs = u64::from_le_bytes(bytes); - // if the timer is set to pop in 0 seconds, we immediately respond + let timer_millis = u64::from_le_bytes(bytes); + // if the timer is set to pop in 0 millis, we immediately respond // otherwise, store in our persisted map, and spawn a task that // sleeps for the given time, then sends the response let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() - .as_secs(); - let pop_time = now + timer_secs; - if timer_secs == 0 { + .as_millis() as u64; + let pop_time = now + timer_millis; + if timer_millis == 0 { send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await; continue } if !timer_map.contains(pop_time) { timer_tasks.spawn(async move { - tokio::time::sleep(std::time::Duration::from_secs(timer_secs)).await; + tokio::time::sleep(std::time::Duration::from_millis(timer_millis - 1)).await; return pop_time }); } @@ -89,10 +88,10 @@ pub async fn timer_service( #[derive(Serialize, Deserialize, Debug)] struct TimerMap { - // key: the unix timestamp at which the timer pops + // key: the unix timestamp in milliseconds at which the timer pops // value: a vector of KernelMessage ids and who to send Response to // this is because multiple processes can set timers for the same time - timers: BTreeMap>, + timers: nohash_hasher::IntMap>, } impl TimerMap { @@ -110,15 +109,6 @@ impl TimerMap { fn remove(&mut self, pop_time: u64) -> Option> { self.timers.remove(&pop_time) } - - fn _drain_expired(&mut self, time: u64) -> Vec<(u64, Address)> { - return self - .timers - .extract_if(|k, _| *k <= time) - .map(|(_, v)| v) - .flatten() - .collect(); - } } async fn send_response(our_node: &str, id: u64, target: Address, send_to_loop: &MessageSender) {