Merge pull request #66 from uqbar-dao/dr/timer-ms

timer -> millisecond granularity
This commit is contained in:
dr-frmr 2023-11-16 12:28:10 -05:00 committed by GitHub
commit 483f6decdc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 18 additions and 20 deletions

7
Cargo.lock generated
View File

@ -2921,6 +2921,12 @@ version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb" checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb"
[[package]]
name = "nohash-hasher"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451"
[[package]] [[package]]
name = "num-bigint" name = "num-bigint"
version = "0.4.4" version = "0.4.4"
@ -5113,6 +5119,7 @@ dependencies = [
"jwt", "jwt",
"lazy_static", "lazy_static",
"log", "log",
"nohash-hasher",
"num-traits", "num-traits",
"open", "open",
"public-ip", "public-ip",

View File

@ -41,6 +41,7 @@ http = "0.2.9"
jwt = "0.16" jwt = "0.16"
lazy_static = "1.4.0" lazy_static = "1.4.0"
log = "*" log = "*"
nohash-hasher = "0.2.0"
num-traits = "0.2" num-traits = "0.2"
open = "5.0.0" open = "5.0.0"
public-ip = "0.2.2" public-ip = "0.2.2"

View File

@ -4,7 +4,6 @@ use crate::types::{
}; };
use anyhow::Result; use anyhow::Result;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
/// A runtime module that allows processes to set timers. Interacting with the /// A runtime module that allows processes to set timers. Interacting with the
/// timer is done with a simple Request/Response pattern, and the timer module /// timer is done with a simple Request/Response pattern, and the timer module
@ -13,7 +12,7 @@ use std::collections::BTreeMap;
/// ///
/// The interface of the timer module is as follows: /// The interface of the timer module is as follows:
/// One kind of request is accepted: the IPC must be a little-endian byte-representation /// One kind of request is accepted: the IPC must be a little-endian byte-representation
/// of an unsigned 64-bit integer, in seconds. This request should always expect a Response. /// of an unsigned 64-bit integer, in milliseconds. This request should always expect a Response.
/// If the request does not expect a Response, the timer will not be set. /// If the request does not expect a Response, the timer will not be set.
/// ///
/// A proper Request will trigger the timer module to send a Response. The Response will be /// A proper Request will trigger the timer module to send a Response. The Response will be
@ -28,7 +27,7 @@ pub async fn timer_service(
) -> Result<()> { ) -> Result<()> {
// if we have a persisted state file, load it // if we have a persisted state file, load it
let mut timer_map = TimerMap { let mut timer_map = TimerMap {
timers: BTreeMap::new(), timers: nohash_hasher::IntMap::default(),
}; };
// joinset holds 1 active timer per expiration-time // joinset holds 1 active timer per expiration-time
let mut timer_tasks = tokio::task::JoinSet::<u64>::new(); let mut timer_tasks = tokio::task::JoinSet::<u64>::new();
@ -54,22 +53,22 @@ pub async fn timer_service(
continue continue
} }
let Ok(bytes): Result<[u8; 8], _> = req.ipc.try_into() else { continue }; let Ok(bytes): Result<[u8; 8], _> = req.ipc.try_into() else { continue };
let timer_secs = u64::from_le_bytes(bytes); let timer_millis = u64::from_le_bytes(bytes);
// if the timer is set to pop in 0 seconds, we immediately respond // if the timer is set to pop in 0 millis, we immediately respond
// otherwise, store in our persisted map, and spawn a task that // otherwise, store in our persisted map, and spawn a task that
// sleeps for the given time, then sends the response // sleeps for the given time, then sends the response
let now = std::time::SystemTime::now() let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
.unwrap() .unwrap()
.as_secs(); .as_millis() as u64;
let pop_time = now + timer_secs; let pop_time = now + timer_millis;
if timer_secs == 0 { if timer_millis == 0 {
send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await; send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await;
continue continue
} }
if !timer_map.contains(pop_time) { if !timer_map.contains(pop_time) {
timer_tasks.spawn(async move { timer_tasks.spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(timer_secs)).await; tokio::time::sleep(std::time::Duration::from_millis(timer_millis - 1)).await;
return pop_time return pop_time
}); });
} }
@ -89,10 +88,10 @@ pub async fn timer_service(
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
struct TimerMap { struct TimerMap {
// key: the unix timestamp at which the timer pops // key: the unix timestamp in milliseconds at which the timer pops
// value: a vector of KernelMessage ids and who to send Response to // value: a vector of KernelMessage ids and who to send Response to
// this is because multiple processes can set timers for the same time // this is because multiple processes can set timers for the same time
timers: BTreeMap<u64, Vec<(u64, Address)>>, timers: nohash_hasher::IntMap<u64, Vec<(u64, Address)>>,
} }
impl TimerMap { impl TimerMap {
@ -110,15 +109,6 @@ impl TimerMap {
fn remove(&mut self, pop_time: u64) -> Option<Vec<(u64, Address)>> { fn remove(&mut self, pop_time: u64) -> Option<Vec<(u64, Address)>> {
self.timers.remove(&pop_time) self.timers.remove(&pop_time)
} }
fn _drain_expired(&mut self, time: u64) -> Vec<(u64, Address)> {
return self
.timers
.extract_if(|k, _| *k <= time)
.map(|(_, v)| v)
.flatten()
.collect();
}
} }
async fn send_response(our_node: &str, id: u64, target: Address, send_to_loop: &MessageSender) { async fn send_response(our_node: &str, id: u64, target: Address, send_to_loop: &MessageSender) {