From c84245531449362519750caf6caed1c61653e9b7 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Mon, 17 Jun 2024 23:28:25 -0400 Subject: [PATCH] add builders for `KernelMessage` and `Printout`, WIP use them everywhere --- kinode/src/net/utils.rs | 14 +---- kinode/src/terminal/mod.rs | 47 ++++++-------- kinode/src/timer.rs | 54 ++++++---------- kinode/src/vfs.rs | 126 ++++++++++++++++--------------------- lib/src/core.rs | 85 +++++++++++++++++++++++++ 5 files changed, 178 insertions(+), 148 deletions(-) diff --git a/kinode/src/net/utils.rs b/kinode/src/net/utils.rs index cb46b343..22d87abc 100644 --- a/kinode/src/net/utils.rs +++ b/kinode/src/net/utils.rs @@ -361,20 +361,10 @@ pub async fn parse_hello_message( /// Create a terminal printout at verbosity level 0. pub async fn print_loud(print_tx: &PrintSender, content: &str) { - let _ = print_tx - .send(Printout { - verbosity: 0, - content: content.into(), - }) - .await; + Printout::new(0, content).send(print_tx).await; } /// Create a terminal printout at verbosity level 2. pub async fn print_debug(print_tx: &PrintSender, content: &str) { - let _ = print_tx - .send(Printout { - verbosity: 2, - content: content.into(), - }) - .await; + Printout::new(2, content).send(print_tx).await; } diff --git a/kinode/src/terminal/mod.rs b/kinode/src/terminal/mod.rs index 9c5a9eb1..998a8c08 100644 --- a/kinode/src/terminal/mod.rs +++ b/kinode/src/terminal/mod.rs @@ -218,17 +218,12 @@ pub async fn terminal( 2 => verbose_mode = 3, _ => verbose_mode = 0, } - let _ = print_tx.send( - Printout { - verbosity: 0, - content: match verbose_mode { - 0 => "verbose mode: off".into(), - 1 => "verbose mode: debug".into(), - 2 => "verbose mode: super-debug".into(), - _ => "verbose mode: full event loop".into(), - } - } - ).await; + Printout::new(0, format!("verbose mode: {}", match verbose_mode { + 0 => "off", + 1 => "debug", + 2 => "super-debug", + _ => "full event loop", + })).send(&print_tx).await; if verbose_mode == 3 { let _ = debug_event_loop.send(DebugCommand::ToggleEventLoop).await; } @@ -243,15 +238,12 @@ pub async fn terminal( }) => { let _ = debug_event_loop.send(DebugCommand::ToggleStepthrough).await; in_step_through = !in_step_through; - let _ = print_tx.send( - Printout { - verbosity: 0, - content: match in_step_through { - false => "debug mode off".into(), - true => "debug mode on: use CTRL+S to step through events".into(), - } - } - ).await; + Printout::new(0, format!("debug mode {}", match in_step_through { + false => "off", + true => "on: use CTRL+S to step through events", + })) + .send(&print_tx) + .await; }, // @@ -273,15 +265,12 @@ pub async fn terminal( .. }) => { logging_mode = !logging_mode; - let _ = print_tx.send( - Printout { - verbosity: 0, - content: match logging_mode { - true => "logging mode: on".into(), - false => "logging mode: off".into(), - } - } - ).await; + Printout::new( + 0, + format!("logging mode: {}", if logging_mode { "on" } else { "off" }) + ) + .send(&print_tx) + .await; }, // // UP / CTRL+P: go up one command in history diff --git a/kinode/src/timer.rs b/kinode/src/timer.rs index b4fe5c6f..2a01282f 100644 --- a/kinode/src/timer.rs +++ b/kinode/src/timer.rs @@ -39,23 +39,14 @@ pub async fn timer_service( // we only handle Requests let Message::Request(req) = km.message else { continue }; let Ok(timer_action) = serde_json::from_slice::(&req.body) else { - let _ = print_tx.send(Printout { - verbosity: 1, - content: "timer service received a request with an invalid body".to_string(), - }).await; + Printout::new(1, "timer service received a request with an invalid body").send(&print_tx).await; continue }; match timer_action { TimerAction::Debug => { - let _ = print_tx.send(Printout { - verbosity: 0, - content: format!("timer service active timers ({}):", timer_map.timers.len()), - }).await; + Printout::new(0, format!("timer service active timers ({}):", timer_map.timers.len())).send(&print_tx).await; for (k, v) in timer_map.timers.iter() { - let _ = print_tx.send(Printout { - verbosity: 0, - content: format!("{}: {:?}", k, v), - }).await; + Printout::new(0, format!("{k}: {v:?}")).send(&print_tx).await; } continue } @@ -72,10 +63,7 @@ pub async fn timer_service( send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await; continue } - let _ = print_tx.send(Printout { - verbosity: 3, - content: format!("set timer to pop in {}ms", timer_millis), - }).await; + Printout::new(3, format!("set timer to pop in {timer_millis}ms")).send(&print_tx).await; if !timer_map.contains(pop_time) { timer_tasks.spawn(async move { tokio::time::sleep(std::time::Duration::from_millis(timer_millis - 1)).await; @@ -121,25 +109,21 @@ impl TimerMap { } async fn send_response(our_node: &str, id: u64, target: Address, send_to_loop: &MessageSender) { - let _ = send_to_loop - .send(KernelMessage { - id, - source: Address { - node: our_node.to_string(), - process: TIMER_PROCESS_ID.clone(), + KernelMessage::builder() + .id(id) + .source((our_node, TIMER_PROCESS_ID.clone())) + .target(target) + .message(Message::Response(( + Response { + inherit: false, + body: vec![], + metadata: None, + capabilities: vec![], }, - target, - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - body: vec![], - metadata: None, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - }) + None, + ))) + .build() + .unwrap() + .send(send_to_loop) .await; } diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index c1e5d8d9..bed0f695 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -36,15 +36,15 @@ pub async fn vfs( while let Some(km) = recv_from_loop.recv().await { if *our_node != km.source.node { - let _ = send_to_terminal - .send(Printout { - verbosity: 1, - content: format!( - "vfs: got request from {}, but requests must come from our node {our_node}\r", - km.source.node, + Printout::new( + 1, + format!( + "vfs: got request from {}, but requests must come from our node {our_node}", + km.source.node ), - }) - .await; + ) + .send(&send_to_terminal) + .await; continue; } @@ -81,20 +81,10 @@ pub async fn vfs( ) .await { - let _ = send_to_terminal - .send(Printout { - verbosity: 1, - content: format!("vfs: {e}\r"), - }) - .await; - let _ = send_to_loop - .send(make_error_message( - our_node.to_string(), - km_id, - km_source, - e, - )) + Printout::new(1, format!("vfs: {e}")) + .send(&send_to_terminal) .await; + make_error_message(&our_node, km_id, km_source, e, &send_to_loop).await; } } }); @@ -152,15 +142,11 @@ async fn handle_request( entries.push(dir_entry); } - let response = KernelMessage { - id: km.id, - source: Address { - node: our_node.to_string(), - process: VFS_PROCESS_ID.clone(), - }, - target: km.source, - rsvp: None, - message: Message::Response(( + KernelMessage::builder() + .id(km.id) + .source((our_node, VFS_PROCESS_ID.clone())) + .target(km.source) + .message(Message::Response(( Response { inherit: false, body: serde_json::to_vec(&VfsResponse::ReadDir(entries)).unwrap(), @@ -168,11 +154,11 @@ async fn handle_request( capabilities: vec![], }, None, - )), - lazy_load_blob: None, - }; - - let _ = send_to_loop.send(response).await; + ))) + .build() + .unwrap() + .send(send_to_loop) + .await; return Ok(()); } else { return Err(VfsError::NoCap { @@ -451,29 +437,26 @@ async fn handle_request( process: km.source.process, }) }) { - let _ = send_to_loop - .send(KernelMessage { - id: km.id, - source: Address { - node: our_node.to_string(), - process: VFS_PROCESS_ID.clone(), + KernelMessage::builder() + .id(km.id) + .source((our_node, VFS_PROCESS_ID.clone())) + .target(target) + .message(Message::Response(( + Response { + inherit: false, + body: serde_json::to_vec(&response_body).unwrap(), + metadata, + capabilities: vec![], }, - target, - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - body: serde_json::to_vec(&response_body).unwrap(), - metadata, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: bytes.map(|bytes| LazyLoadBlob { - mime: Some("application/octet-stream".into()), - bytes, - }), - }) + None, + ))) + .lazy_load_blob(bytes.map(|bytes| LazyLoadBlob { + mime: Some("application/octet-stream".into()), + bytes, + })) + .build() + .unwrap() + .send(send_to_loop) .await; } @@ -822,21 +805,18 @@ fn join_paths_safely(base: &PathBuf, extension: &str) -> PathBuf { base.join(extension_path) } -fn make_error_message( - our_node: String, +async fn make_error_message( + our_node: &str, id: u64, source: Address, error: VfsError, -) -> KernelMessage { - KernelMessage { - id, - source: Address { - node: our_node, - process: VFS_PROCESS_ID.clone(), - }, - target: source, - rsvp: None, - message: Message::Response(( + send_to_loop: &MessageSender, +) { + KernelMessage::builder() + .id(id) + .source((our_node, VFS_PROCESS_ID.clone())) + .target(source) + .message(Message::Response(( Response { inherit: false, body: serde_json::to_vec(&VfsResponse::Err(error)).unwrap(), @@ -844,7 +824,9 @@ fn make_error_message( capabilities: vec![], }, None, - )), - lazy_load_blob: None, - } + ))) + .build() + .unwrap() + .send(send_to_loop) + .await; } diff --git a/lib/src/core.rs b/lib/src/core.rs index a675e778..b97a7c61 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -1138,6 +1138,75 @@ pub struct KernelMessage { pub lazy_load_blob: Option, } +impl KernelMessage { + pub fn builder() -> KernelMessageBuilder { + KernelMessageBuilder::default() + } + + pub async fn send(self, sender: &MessageSender) { + sender.send(self).await.expect("kernel message sender died"); + } +} + +#[derive(Default)] +pub struct KernelMessageBuilder { + id: u64, + source: Option
, + target: Option
, + rsvp: Rsvp, + message: Option, + lazy_load_blob: Option, +} + +impl KernelMessageBuilder { + pub fn id(mut self, id: u64) -> Self { + self.id = id; + self + } + + pub fn source(mut self, source: T) -> Self + where + T: Into
, + { + self.source = Some(source.into()); + self + } + + pub fn target(mut self, target: T) -> Self + where + T: Into
, + { + self.target = Some(target.into()); + self + } + + pub fn rsvp(mut self, rsvp: Rsvp) -> Self { + self.rsvp = rsvp; + self + } + + pub fn message(mut self, message: Message) -> Self { + self.message = Some(message); + self + } + + pub fn lazy_load_blob(mut self, blob: Option) -> Self { + self.lazy_load_blob = blob; + self + } + + pub fn build(self) -> Result { + Ok(KernelMessage { + id: self.id, + source: self.source.ok_or("Source address is required")?, + target: self.target.ok_or("Target address is required")?, + rsvp: self.rsvp, + message: self.message.ok_or("Message is required")?, + lazy_load_blob: self.lazy_load_blob, + }) + } +} + impl std::fmt::Display for KernelMessage { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!( @@ -1173,6 +1242,22 @@ pub struct Printout { pub content: String, } +impl Printout { + pub fn new(verbosity: u8, content: T) -> Self + where + T: Into, + { + Self { + verbosity, + content: content.into(), + } + } + + pub async fn send(self, sender: &PrintSender) { + sender.send(self).await.expect("print sender died"); + } +} + // kernel sets in case, e.g., // A requests response from B does not request response from C // -> kernel sets `Some(A) = Rsvp` for B's request to C