From 1b903d3831c70d7e7badb0051ee52b732de2b209 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Mon, 5 Feb 2024 19:23:21 -0300 Subject: [PATCH 1/3] WIP --- kinode/src/kernel/mod.rs | 5 +-- kinode/src/kernel/process.rs | 59 +++++++++++++++++------------- kinode/src/kernel/standard_host.rs | 4 +- lib/src/core.rs | 8 ---- 4 files changed, 37 insertions(+), 39 deletions(-) diff --git a/kinode/src/kernel/mod.rs b/kinode/src/kernel/mod.rs index 016845fd..b114decc 100644 --- a/kinode/src/kernel/mod.rs +++ b/kinode/src/kernel/mod.rs @@ -43,7 +43,6 @@ enum ProcessSender { } /// persist kernel's process_map state for next bootup -/// and (TODO) wait for filesystem to respond in the affirmative async fn persist_state( our_name: &str, send_to_loop: &t::MessageSender, @@ -76,7 +75,7 @@ async fn persist_state( Ok(()) } -/// handle messages sent directly to kernel. source is always our own node. +/// handle commands inside messages sent directly to kernel. source is always our own node. async fn handle_kernel_request( our_name: String, keypair: Arc, @@ -528,7 +527,7 @@ async fn handle_kernel_request( } } -// double check immediate run +/// spawn a process loop and insert the process in the relevant kernel state maps async fn start_process( our_name: String, keypair: Arc, diff --git a/kinode/src/kernel/process.rs b/kinode/src/kernel/process.rs index 02d35594..979f5ac5 100644 --- a/kinode/src/kernel/process.rs +++ b/kinode/src/kernel/process.rs @@ -1,8 +1,10 @@ use crate::kernel::{ProcessMessageReceiver, ProcessMessageSender}; use crate::KERNEL_PROCESS_ID; use anyhow::Result; -//pub use kinode::process::standard as wit; -//pub use kinode::process::standard::Host as StandardHost; +use lib::types::core as t; +pub use lib::wit; +pub use lib::wit::Host as StandardHost; +pub use lib::Process; use ring::signature::{self, KeyPair}; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; @@ -11,26 +13,31 @@ use wasmtime::component::*; use wasmtime::{Engine, Store}; use wasmtime_wasi::preview2::{pipe::MemoryOutputPipe, Table, WasiCtx, WasiCtxBuilder, WasiView}; -use lib::types::core as t; -pub use lib::wit; -pub use lib::wit::Host as StandardHost; -pub use lib::Process; - -// bindgen!({ -// path: "wit", -// world: "process", -// async: true, -// }); +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ProcessContext { + // store ultimate in order to set prompting message if needed + pub message: Option, + // can be empty if a request doesn't set context, but still needs to inherit + pub context: Option, +} pub struct ProcessState { + /// our node's networking keypair pub keypair: Arc, + /// information about ourself pub metadata: t::ProcessMetadata, + /// pipe from which we get messages from the main event loop pub recv_in_process: ProcessMessageReceiver, + /// pipe to send messages to ourself (received in `recv_in_process`) pub self_sender: ProcessMessageSender, + /// pipe for sending messages to the main event loop pub send_to_loop: t::MessageSender, + /// pipe for sending [`t::Printout`]s to the terminal pub send_to_terminal: t::PrintSender, - pub prompting_message: Option, - pub last_blob: Option, + /// store the nested request, if any + pub nested_request: Option, + /// store the current incoming message that we've gotten from receive() + pub current_incoming_message: Option, pub contexts: HashMap)>, pub message_queue: VecDeque>, pub caps_oracle: t::CapMessageSender, @@ -124,9 +131,9 @@ impl ProcessState { // otherwise, id is generated randomly let request_id: u64 = if request.inherit && request.expects_response.is_none() - && self.prompting_message.is_some() + && self.current_incoming_message.is_some() { - self.prompting_message.as_ref().unwrap().id + self.current_incoming_message.as_ref().unwrap().id } else { loop { let id = rand::random(); @@ -178,7 +185,7 @@ impl ProcessState { rsvp: match ( request.inherit, request.expects_response, - &self.prompting_message, + &self.current_incoming_message, ) { // this request expects response, so receives any response // make sure to use the real source, not a fake injected-by-kernel source @@ -219,8 +226,8 @@ impl ProcessState { request_id, ( t::ProcessContext { - prompting_message: if self.prompting_message.is_some() { - self.prompting_message.clone() + prompting_message: if self.current_incoming_message.is_some() { + self.current_incoming_message.clone() } else { None }, @@ -341,21 +348,21 @@ impl ProcessState { Ok(km) => match &km.message { t::Message::Request(_) => { self.last_blob = km.lazy_load_blob.clone(); - self.prompting_message = Some(km.clone()); + self.current_incoming_message = Some(km.clone()); (None, km) } t::Message::Response(_) => { if let Some((context, timeout_handle)) = self.contexts.remove(&km.id) { timeout_handle.abort(); self.last_blob = km.lazy_load_blob.clone(); - self.prompting_message = match context.prompting_message { + self.current_incoming_message = match context.prompting_message { None => Some(km.clone()), Some(prompting_message) => Some(prompting_message), }; (context.context, km) } else { self.last_blob = km.lazy_load_blob.clone(); - self.prompting_message = Some(km.clone()); + self.current_incoming_message = Some(km.clone()); (None, km) } } @@ -364,7 +371,7 @@ impl ProcessState { None => return Err((t::en_wit_send_error(e.error), None)), Some((context, timeout_handle)) => { timeout_handle.abort(); - self.prompting_message = context.prompting_message; + self.current_incoming_message = context.prompting_message; return Err((t::en_wit_send_error(e.error), context.context)); } }, @@ -431,7 +438,7 @@ impl ProcessState { /// a response it emits should have. This takes into /// account the `rsvp` of the prompting message, if any. async fn make_response_id_target(&self) -> Option<(u64, t::Address)> { - let Some(ref prompting_message) = self.prompting_message else { + let Some(ref prompting_message) = self.current_incoming_message else { println!("need non-None prompting_message to handle Response"); return None; }; @@ -513,8 +520,8 @@ pub async fn make_process_loop( self_sender: send_to_process, send_to_loop: send_to_loop.clone(), send_to_terminal: send_to_terminal.clone(), - prompting_message: None, - last_blob: None, + nested_request: None, + current_incoming_message: None, contexts: HashMap::new(), message_queue: VecDeque::new(), caps_oracle: caps_oracle.clone(), diff --git a/kinode/src/kernel/standard_host.rs b/kinode/src/kernel/standard_host.rs index 3e9c30f4..52ae8089 100644 --- a/kinode/src/kernel/standard_host.rs +++ b/kinode/src/kernel/standard_host.rs @@ -1,6 +1,4 @@ use crate::kernel::process; -//use crate::kernel::process::kinode::process::standard as wit; -//use crate::kernel::process::StandardHost; use crate::KERNEL_PROCESS_ID; use crate::VFS_PROCESS_ID; use anyhow::Result; @@ -27,6 +25,7 @@ impl StandardHost for process::ProcessWasi { // // system utils: // + async fn print_to_terminal(&mut self, verbosity: u8, content: String) -> Result<()> { self.process .send_to_terminal @@ -376,6 +375,7 @@ impl StandardHost for process::ProcessWasi { print_debug(&self.process, "spawned a new process").await; Ok(Ok(new_process_id.en_wit().to_owned())) } + // // capabilities management // diff --git a/lib/src/core.rs b/lib/src/core.rs index d9885be7..398f6ab8 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -1030,14 +1030,6 @@ impl std::fmt::Display for PersistedProcess { } } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ProcessContext { - // store ultimate in order to set prompting message if needed - pub prompting_message: Option, - // can be empty if a request doesn't set context, but still needs to inherit - pub context: Option, -} - pub type PackageVersion = (u32, u32, u32); /// the type that gets deserialized from `metadata.json` in a package From f80c36717aac2de6610e480f25606d510582b4fc Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 6 Feb 2024 17:22:25 -0300 Subject: [PATCH 2/3] WIP; minor performance enhancements --- kinode/src/kernel/mod.rs | 9 +- kinode/src/kernel/process.rs | 290 +++++++++++++++++++++-------------- 2 files changed, 174 insertions(+), 125 deletions(-) diff --git a/kinode/src/kernel/mod.rs b/kinode/src/kernel/mod.rs index b114decc..1083da6a 100644 --- a/kinode/src/kernel/mod.rs +++ b/kinode/src/kernel/mod.rs @@ -1096,11 +1096,11 @@ pub async fn kernel( match process_map.get(&on) { None => vec![], Some(p) => { - caps.iter().filter_map(|cap| { + caps.into_iter().filter_map(|cap| { // if issuer is message source, then sign the cap if cap.issuer.process == on { Some(( - cap.clone(), + cap, keypair .sign(&rmp_serde::to_vec(&cap).unwrap()) .as_ref() @@ -1109,10 +1109,7 @@ pub async fn kernel( // otherwise, only attach previously saved caps // NOTE we don't need to verify the sigs! } else { - match p.capabilities.get(cap) { - None => None, - Some(sig) => Some((cap.clone(), sig.clone())) - } + p.capabilities.get(&cap).map(|sig| (cap, sig.clone())) } }).collect() }, diff --git a/kinode/src/kernel/process.rs b/kinode/src/kernel/process.rs index 979f5ac5..76494026 100644 --- a/kinode/src/kernel/process.rs +++ b/kinode/src/kernel/process.rs @@ -13,6 +13,8 @@ use wasmtime::component::*; use wasmtime::{Engine, Store}; use wasmtime_wasi::preview2::{pipe::MemoryOutputPipe, Table, WasiCtx, WasiCtxBuilder, WasiView}; +const STACK_TRACE_SIZE: usize = 5000; + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ProcessContext { // store ultimate in order to set prompting message if needed @@ -38,8 +40,12 @@ pub struct ProcessState { pub nested_request: Option, /// store the current incoming message that we've gotten from receive() pub current_incoming_message: Option, + /// store the contexts and timeout task of all outstanding requests pub contexts: HashMap)>, + /// store the messages that we've gotten from event loop but haven't processed yet + /// TODO make this an ordered map for O(1) retrieval by ID pub message_queue: VecDeque>, + /// pipe for getting info about capabilities pub caps_oracle: t::CapMessageSender, } @@ -64,8 +70,6 @@ impl WasiView for ProcessWasi { } } -const STACK_TRACE_SIZE: usize = 5000; - pub async fn send_and_await_response( process: &mut ProcessWasi, source: Option, @@ -98,42 +102,95 @@ pub async fn send_and_await_response( } impl ProcessState { - /// Ingest latest message directed to this process, and mark it as the prompting message. + /// Ingest latest message directed to this process, and save it as the current message. /// If there is no message in the queue, wait async until one is received. - /// The message will only be saved as the prompting-message if it's a Request. pub async fn get_next_message_for_process( &mut self, ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { let res = match self.message_queue.pop_front() { Some(message_from_queue) => message_from_queue, - None => self.recv_in_process.recv().await.unwrap(), + None => self + .recv_in_process + .recv() + .await + .expect("fatal: process couldn't receive next message"), }; self.kernel_message_to_process_receive(res) } + /// instead of ingesting latest, wait for a specific ID and queue all others + async fn get_specific_message_for_process( + &mut self, + awaited_message_id: u64, + ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { + // first, check if the awaited message is already in the queue and handle if so + for (i, message) in self.message_queue.iter().enumerate() { + match message { + Ok(ref km) if km.id == awaited_message_id => { + let km = self.message_queue.remove(i).unwrap(); + return self.kernel_message_to_process_receive(km); + } + _ => continue, + } + } + // next, wait for the awaited message to arrive + loop { + let res = self + .recv_in_process + .recv() + .await + .expect("fatal: process couldn't receive next message"); + let id = match &res { + Ok(km) => km.id, + Err(e) => e.id, + }; + if id == awaited_message_id { + return self.kernel_message_to_process_receive(res); + } else { + self.message_queue.push_back(res); + } + } + } + /// takes Request generated by a process and sends it to the main event loop. /// will only fail if process does not have capability to send to target. /// if the request has a timeout (expects response), start a task to track /// that timeout and return timeout error if it expires. pub async fn send_request( &mut self, - fake_source: Option, // only used when kernel steps in to get/set state + /// only used when kernel steps in to get/set state + fake_source: Option, target: wit::Address, request: wit::Request, new_context: Option, blob: Option, ) -> Result { - let source = match &fake_source { - Some(_) => fake_source.unwrap(), - None => self.metadata.our.clone(), + let source = fake_source.unwrap_or(self.metadata.our.clone()); + let mut request = t::de_wit_request(request); + + // if request chooses to inherit, it means to take the lazy_load_blob, if any, + // from the last request it ingested. if current_incoming_message is a request, + // it will be the inherited one. if not, we check nested_request and inherit + // from that if it exists. + // if neither exist as requests, inherit flag will be ignored. + let predecessor_request = match &self.current_incoming_message { + Some(t::KernelMessage { + message: t::Message::Request(request), + .. + }) => self.current_incoming_message.as_ref(), + _ => match &self.nested_request { + Some(t::KernelMessage { + message: t::Message::Request(request), + .. + }) => self.nested_request.as_ref(), + _ => None, + }, }; - // if request chooses to inherit context, match id to prompting_message + + // if request chooses to inherit, match id to precedessor // otherwise, id is generated randomly - let request_id: u64 = if request.inherit - && request.expects_response.is_none() - && self.current_incoming_message.is_some() - { - self.current_incoming_message.as_ref().unwrap().id + let request_id: u64 = if request.inherit && predecessor_request.is_some() { + predecessor_request.unwrap().id } else { loop { let id = rand::random(); @@ -143,81 +200,57 @@ impl ProcessState { } }; + // if a blob is provided, it will be used; otherwise, if inherit is true, + // and a predecessor exists, its blob will be used; otherwise, no blob will be used. let blob = match blob { Some(p) => Some(t::LazyLoadBlob { mime: p.mime, bytes: p.bytes, }), None => match request.inherit { - true => self.last_blob.clone(), + true => predecessor_request + .and_then(|km| km.lazy_load_blob.clone()) + .or(None), false => None, }, }; - let mut inner_request = t::de_wit_request(request.clone()); + if !request.capabilities.is_empty() { + request.capabilities = { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.caps_oracle + .send(t::CapMessage::FilterCaps { + on: self.metadata.our.process.clone(), + caps: request + .capabilities + .iter() + .map(|cap| t::de_wit_capability(cap.clone()).0) + .collect(), + responder: tx, + }) + .await?; + rx.await? + }; + } - inner_request.capabilities = { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.caps_oracle - .send(t::CapMessage::FilterCaps { - on: self.metadata.our.process.clone(), - caps: request - .capabilities - .iter() - .map(|cap| t::de_wit_capability(cap.clone()).0) - .collect(), - responder: tx, - }) - .await?; - rx.await? - }; - - // rsvp is set if there was a Request expecting Response - // followed by inheriting Request(s) not expecting Response; - // this is done such that the ultimate request handler knows that, - // in fact, a Response *is* expected. - // could also be None if entire chain of Requests are - // not expecting Response - let kernel_message = t::KernelMessage { - id: request_id, - source: source.clone(), - target: t::Address::de_wit(target.clone()), - rsvp: match ( - request.inherit, - request.expects_response, - &self.current_incoming_message, - ) { - // this request expects response, so receives any response - // make sure to use the real source, not a fake injected-by-kernel source - (_, Some(_), _) => Some(self.metadata.our.clone()), - // this request inherits, so response will be routed to prompting message - (true, None, Some(ref prompt)) => prompt.rsvp.clone(), - // this request doesn't inherit, and doesn't itself want a response - (false, None, _) => None, - // no rsvp because neither prompting message nor this request wants a response - (_, None, None) => None, - }, - message: t::Message::Request(inner_request), - lazy_load_blob: blob.clone(), - }; - - // modify the process' context map as needed. - // if there is a prompting message, we need to store the ultimate - // even if there is no new context string. - // TODO optimize this significantly + // if the request expects a response, modify the process' context map as needed + // and set a timer. + // TODO optimize this SIGNIFICANTLY: stop spawning tasks + // and use a global clock + garbage collect step to check for timeouts if let Some(timeout_secs) = request.expects_response { let self_sender = self.self_sender.clone(); + let original_target = t::Address::de_wit(target.clone()); let timeout_handle = tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await; let _ = self_sender .send(Err(t::WrappedSendError { id: request_id, - source: t::Address::de_wit(target.clone()), // TODO check this + source: original_target.clone(), error: t::SendError { kind: t::SendErrorKind::Timeout, - target: t::Address::de_wit(target), - message: t::Message::Request(t::de_wit_request(request.clone())), - lazy_load_blob: blob, + target: original_target, + message: t::Message::Request(request.clone()), + lazy_load_blob: blob.clone(), }, })) .await; @@ -225,12 +258,8 @@ impl ProcessState { self.contexts.insert( request_id, ( - t::ProcessContext { - prompting_message: if self.current_incoming_message.is_some() { - self.current_incoming_message.clone() - } else { - None - }, + ProcessContext { + message: predecessor_request.cloned(), context: new_context, }, timeout_handle, @@ -238,6 +267,34 @@ impl ProcessState { ); } + // rsvp is set based on this priority: + // 1. whether this request expects a response -- if so, rsvp = our address, always + // 2. whether this request inherits -- if so, rsvp = prompting message's rsvp + // 3. if neither, rsvp = None + let kernel_message = t::KernelMessage { + id: request_id, + source, + target: t::Address::de_wit(target), + rsvp: match ( + request.expects_response, + request.inherit, + &self.current_incoming_message, + ) { + (Some(_), _, _) => { + // this request expects response, so receives any response + // make sure to use the real source, not a fake injected-by-kernel source + Some(self.metadata.our.clone()) + } + (None, true, Some(ref prompt)) => { + // this request inherits, so response will be routed to prompting message + prompt.rsvp.clone() + } + _ => None, + }, + message: t::Message::Request(request), + lazy_load_blob: blob, + }; + self.send_to_loop .send(kernel_message) .await @@ -307,39 +364,10 @@ impl ProcessState { .expect("fatal: kernel couldn't send response"); } - /// instead of ingesting latest, wait for a specific ID and queue all others - async fn get_specific_message_for_process( - &mut self, - awaited_message_id: u64, - ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { - // first, check if the awaited message is already in the queue and handle if so - for (i, message) in self.message_queue.iter().enumerate() { - match message { - Ok(ref km) if km.id == awaited_message_id => { - let km = self.message_queue.remove(i).unwrap(); - return self.kernel_message_to_process_receive(km.clone()); - } - _ => continue, - } - } - // next, wait for the awaited message to arrive - loop { - let res = self.recv_in_process.recv().await.unwrap(); - match res { - Ok(ref km) if km.id == awaited_message_id => { - return self.kernel_message_to_process_receive(Ok(km.clone())) - } - Ok(km) => self.message_queue.push_back(Ok(km)), - Err(e) if e.id == awaited_message_id => { - return self.kernel_message_to_process_receive(Err(e)) - } - Err(e) => self.message_queue.push_back(Err(e)), - } - } - } - - /// convert a message from the main event loop into a result for the process to receive - /// if the message is a response or error, get context if we have one + /// Convert a message from the main event loop into a result for the process to receive. + /// If the message is a response or error, get context if we have one. + /// If there exists a current message before this, if it is a Request, it will get + /// saved as nested_request. fn kernel_message_to_process_receive( &mut self, res: Result, @@ -436,19 +464,43 @@ impl ProcessState { /// Given the current process state, return the id and target that /// a response it emits should have. This takes into - /// account the `rsvp` of the prompting message, if any. + /// account the `rsvp` of the current or nested message, if any. async fn make_response_id_target(&self) -> Option<(u64, t::Address)> { - let Some(ref prompting_message) = self.current_incoming_message else { - println!("need non-None prompting_message to handle Response"); + let Some(ref current_incoming_message) = self.current_incoming_message else { + println!("need non-None incoming_message to handle Response\r"); return None; }; - Some(( - prompting_message.id, - match &prompting_message.rsvp { - None => prompting_message.source.clone(), - Some(address) => address.clone(), - }, - )) + match current_incoming_message.message { + t::Message::Request(ref request) => { + // if the current message is a Request, the response will always go there. + Some(( + current_incoming_message.id, + match ¤t_incoming_message.rsvp { + None => current_incoming_message.source.clone(), + Some(address) => address.clone(), + }, + )) + } + t::Message::Response((ref response, ref _maybe_context)) => { + // if the current message is a Response, we look at the nested request + // to see where the response should go. + match &self.nested_request { + None => { + println!( + "need non-None incoming message or nested_request to handle Response\r" + ); + return None; + } + Some(ref nested) => Some(( + nested.id, + match &nested.rsvp { + None => nested.source.clone(), + Some(address) => address.clone(), + }, + )), + } + } + } } } From 05e6e422b12c760b844514cdb94b8ae1fcb58c00 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Thu, 8 Feb 2024 16:08:57 -0300 Subject: [PATCH 3/3] refactor process.rs for less copying and legibility --- kinode/src/kernel/mod.rs | 9 +- kinode/src/kernel/process.rs | 310 +++++++++++++++-------------------- 2 files changed, 137 insertions(+), 182 deletions(-) diff --git a/kinode/src/kernel/mod.rs b/kinode/src/kernel/mod.rs index 1083da6a..08b9293c 100644 --- a/kinode/src/kernel/mod.rs +++ b/kinode/src/kernel/mod.rs @@ -1099,13 +1099,8 @@ pub async fn kernel( caps.into_iter().filter_map(|cap| { // if issuer is message source, then sign the cap if cap.issuer.process == on { - Some(( - cap, - keypair - .sign(&rmp_serde::to_vec(&cap).unwrap()) - .as_ref() - .to_vec() - )) + let sig = keypair.sign(&rmp_serde::to_vec(&cap).unwrap()); + Some((cap, sig.as_ref().to_vec())) // otherwise, only attach previously saved caps // NOTE we don't need to verify the sigs! } else { diff --git a/kinode/src/kernel/process.rs b/kinode/src/kernel/process.rs index 76494026..0b9dbe43 100644 --- a/kinode/src/kernel/process.rs +++ b/kinode/src/kernel/process.rs @@ -15,12 +15,11 @@ use wasmtime_wasi::preview2::{pipe::MemoryOutputPipe, Table, WasiCtx, WasiCtxBui const STACK_TRACE_SIZE: usize = 5000; -#[derive(Clone, Debug, Serialize, Deserialize)] pub struct ProcessContext { - // store ultimate in order to set prompting message if needed - pub message: Option, + // store predecessor in order to set prompting message when popped + pub prompting_message: Option, // can be empty if a request doesn't set context, but still needs to inherit - pub context: Option, + pub context: Option, } pub struct ProcessState { @@ -36,12 +35,16 @@ pub struct ProcessState { pub send_to_loop: t::MessageSender, /// pipe for sending [`t::Printout`]s to the terminal pub send_to_terminal: t::PrintSender, - /// store the nested request, if any - pub nested_request: Option, - /// store the current incoming message that we've gotten from receive() - pub current_incoming_message: Option, + /// store the current incoming message that we've gotten from receive(), if it + /// is a request. if it is a response, the context map will be used to set this + /// as the message it was when the outgoing request for that response was made. + /// however, the blob stored here will **always** be the blob of the last message + /// received from the event loop. + /// the prompting_message won't have a blob, rather it is stored in last_blob. + pub prompting_message: Option, + pub last_blob: Option, /// store the contexts and timeout task of all outstanding requests - pub contexts: HashMap)>, + pub contexts: HashMap)>, /// store the messages that we've gotten from event loop but haven't processed yet /// TODO make this an ordered map for O(1) retrieval by ID pub message_queue: VecDeque>, @@ -158,7 +161,7 @@ impl ProcessState { /// that timeout and return timeout error if it expires. pub async fn send_request( &mut self, - /// only used when kernel steps in to get/set state + // only used when kernel steps in to get/set state fake_source: Option, target: wit::Address, request: wit::Request, @@ -168,29 +171,13 @@ impl ProcessState { let source = fake_source.unwrap_or(self.metadata.our.clone()); let mut request = t::de_wit_request(request); - // if request chooses to inherit, it means to take the lazy_load_blob, if any, - // from the last request it ingested. if current_incoming_message is a request, - // it will be the inherited one. if not, we check nested_request and inherit - // from that if it exists. - // if neither exist as requests, inherit flag will be ignored. - let predecessor_request = match &self.current_incoming_message { - Some(t::KernelMessage { - message: t::Message::Request(request), - .. - }) => self.current_incoming_message.as_ref(), - _ => match &self.nested_request { - Some(t::KernelMessage { - message: t::Message::Request(request), - .. - }) => self.nested_request.as_ref(), - _ => None, - }, - }; + // if request chooses to inherit, it means to take the ID and lazy_load_blob, + // if any, from the last message it ingested // if request chooses to inherit, match id to precedessor // otherwise, id is generated randomly - let request_id: u64 = if request.inherit && predecessor_request.is_some() { - predecessor_request.unwrap().id + let request_id: u64 = if request.inherit && self.prompting_message.is_some() { + self.prompting_message.as_ref().unwrap().id } else { loop { let id = rand::random(); @@ -208,9 +195,7 @@ impl ProcessState { bytes: p.bytes, }), None => match request.inherit { - true => predecessor_request - .and_then(|km| km.lazy_load_blob.clone()) - .or(None), + true => self.last_blob.clone(), false => None, }, }; @@ -223,13 +208,15 @@ impl ProcessState { on: self.metadata.our.process.clone(), caps: request .capabilities - .iter() - .map(|cap| t::de_wit_capability(cap.clone()).0) + .into_iter() + .map(|(cap, _)| cap) .collect(), responder: tx, }) - .await?; - rx.await? + .await + .expect("fatal: process couldn't access capabilities oracle"); + rx.await + .expect("fatal: process couldn't receive capabilities") }; } @@ -238,6 +225,8 @@ impl ProcessState { // TODO optimize this SIGNIFICANTLY: stop spawning tasks // and use a global clock + garbage collect step to check for timeouts if let Some(timeout_secs) = request.expects_response { + let this_request = request.clone(); + let this_blob = blob.clone(); let self_sender = self.self_sender.clone(); let original_target = t::Address::de_wit(target.clone()); let timeout_handle = tokio::spawn(async move { @@ -249,8 +238,8 @@ impl ProcessState { error: t::SendError { kind: t::SendErrorKind::Timeout, target: original_target, - message: t::Message::Request(request.clone()), - lazy_load_blob: blob.clone(), + message: t::Message::Request(this_request), + lazy_load_blob: this_blob, }, })) .await; @@ -259,7 +248,7 @@ impl ProcessState { request_id, ( ProcessContext { - message: predecessor_request.cloned(), + prompting_message: self.prompting_message.clone(), context: new_context, }, timeout_handle, @@ -278,7 +267,7 @@ impl ProcessState { rsvp: match ( request.expects_response, request.inherit, - &self.current_incoming_message, + &self.prompting_message, ) { (Some(_), _, _) => { // this request expects response, so receives any response @@ -309,43 +298,53 @@ impl ProcessState { response: wit::Response, blob: Option, ) { - let (id, target) = match self.make_response_id_target().await { - Some(r) => r, - None => { - let _ = self - .send_to_terminal - .send(t::Printout { - verbosity: 2, - content: format!("kernel: dropping Response {:?}", response), - }) - .await; - return; - } + let mut response = t::de_wit_response(response); + + // the process requires a prompting_message in order to issue a response + let Some(ref prompting_message) = self.prompting_message else { + print( + &self.send_to_terminal, + 0, + format!("kernel: need non-None prompting_message to handle Response {response:?}"), + ) + .await; + return; }; + // given the current process state, produce the id and target that + // a response it emits should have. + let (id, target) = ( + prompting_message.id, + match &prompting_message.rsvp { + None => prompting_message.source.clone(), + Some(rsvp) => rsvp.clone(), + }, + ); + let blob = match response.inherit { true => self.last_blob.clone(), false => t::de_wit_blob(blob), }; - let mut inner_response = t::de_wit_response(response.clone()); - - inner_response.capabilities = { - let (tx, rx) = tokio::sync::oneshot::channel(); - let _ = self - .caps_oracle - .send(t::CapMessage::FilterCaps { - on: self.metadata.our.process.clone(), - caps: response - .capabilities - .iter() - .map(|cap| t::de_wit_capability(cap.clone()).0) - .collect(), - responder: tx, - }) - .await; - rx.await.expect("fatal: process couldn't get caps") - }; + if !response.capabilities.is_empty() { + response.capabilities = { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.caps_oracle + .send(t::CapMessage::FilterCaps { + on: self.metadata.our.process.clone(), + caps: response + .capabilities + .into_iter() + .map(|(cap, _)| cap) + .collect(), + responder: tx, + }) + .await + .expect("fatal: process couldn't access capabilities oracle"); + rx.await + .expect("fatal: process couldn't receive capabilities") + }; + } self.send_to_loop .send(t::KernelMessage { @@ -354,7 +353,7 @@ impl ProcessState { target, rsvp: None, message: t::Message::Response(( - inner_response, + response, // the context will be set by the process receiving this Response. None, )), @@ -366,32 +365,30 @@ impl ProcessState { /// Convert a message from the main event loop into a result for the process to receive. /// If the message is a response or error, get context if we have one. - /// If there exists a current message before this, if it is a Request, it will get - /// saved as nested_request. fn kernel_message_to_process_receive( &mut self, - res: Result, + incoming: Result, ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { - let (context, km) = match res { - Ok(km) => match &km.message { + let (mut km, context) = match incoming { + Ok(mut km) => match km.message { t::Message::Request(_) => { - self.last_blob = km.lazy_load_blob.clone(); - self.current_incoming_message = Some(km.clone()); - (None, km) + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; + self.prompting_message = Some(km.clone()); + (km, None) } t::Message::Response(_) => { if let Some((context, timeout_handle)) = self.contexts.remove(&km.id) { timeout_handle.abort(); - self.last_blob = km.lazy_load_blob.clone(); - self.current_incoming_message = match context.prompting_message { - None => Some(km.clone()), - Some(prompting_message) => Some(prompting_message), - }; - (context.context, km) + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; + self.prompting_message = context.prompting_message; + (km, context.context) } else { - self.last_blob = km.lazy_load_blob.clone(); - self.current_incoming_message = Some(km.clone()); - (None, km) + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; + self.prompting_message = Some(km.clone()); + (km, None) } } }, @@ -399,7 +396,7 @@ impl ProcessState { None => return Err((t::en_wit_send_error(e.error), None)), Some((context, timeout_handle)) => { timeout_handle.abort(); - self.current_incoming_message = context.prompting_message; + self.prompting_message = context.prompting_message; return Err((t::en_wit_send_error(e.error), context.context)); } }, @@ -410,98 +407,54 @@ impl ProcessState { self.keypair.as_ref().public_key(), ); + // prune any invalid capabilities before handing to process + // where invalid = supposedly issued by us, but not signed properly by us + match &mut km.message { + t::Message::Request(request) => { + request.capabilities.retain(|(cap, sig)| { + // The only time we verify a cap's signature is when a foreign node + // sends us a cap that we (allegedly) issued + if km.source.node != self.metadata.our.node + && cap.issuer.node == self.metadata.our.node + { + match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { + Ok(_) => true, + Err(_) => false, + } + } else { + return true; + } + }); + } + t::Message::Response((response, _)) => { + response.capabilities.retain(|(cap, sig)| { + // The only time we verify a cap's signature is when a foreign node + // sends us a cap that we (allegedly) issued + if km.source.node != self.metadata.our.node + && cap.issuer.node == self.metadata.our.node + { + match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { + Ok(_) => true, + Err(_) => false, + } + } else { + return true; + } + }); + } + }; + Ok(( km.source.en_wit(), match km.message { - t::Message::Request(mut request) => { - // prune any invalid caps before sending - request.capabilities = request - .capabilities - .iter() - .filter_map(|(cap, sig)| { - // The only time we verify a cap's signature is when a foreign node - // sends us a cap that we (allegedly) issued - if km.source.node != self.metadata.our.node - && cap.issuer.node == self.metadata.our.node - { - match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { - Ok(_) => Some((cap.clone(), sig.clone())), - Err(_) => None, - } - } else { - return Some((cap.clone(), sig.clone())); - } - }) - .collect::)>>(); - wit::Message::Request(t::en_wit_request(request)) - } + t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)), // NOTE: we throw away whatever context came from the sender, that's not ours - t::Message::Response((mut response, _context)) => { - // prune any invalid caps before sending - response.capabilities = response - .capabilities - .iter() - .filter_map(|(cap, sig)| { - // The only time we verify a cap's signature is when a foreign node - // sends us a cap that we (allegedly) issued - if km.source.node != self.metadata.our.node - && cap.issuer.node == self.metadata.our.node - { - match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { - Ok(_) => Some((cap.clone(), sig.clone())), - Err(_) => None, - } - } else { - return Some((cap.clone(), sig.clone())); - } - }) - .collect::)>>(); + t::Message::Response((response, _sent_context)) => { wit::Message::Response((t::en_wit_response(response), context)) } }, )) } - - /// Given the current process state, return the id and target that - /// a response it emits should have. This takes into - /// account the `rsvp` of the current or nested message, if any. - async fn make_response_id_target(&self) -> Option<(u64, t::Address)> { - let Some(ref current_incoming_message) = self.current_incoming_message else { - println!("need non-None incoming_message to handle Response\r"); - return None; - }; - match current_incoming_message.message { - t::Message::Request(ref request) => { - // if the current message is a Request, the response will always go there. - Some(( - current_incoming_message.id, - match ¤t_incoming_message.rsvp { - None => current_incoming_message.source.clone(), - Some(address) => address.clone(), - }, - )) - } - t::Message::Response((ref response, ref _maybe_context)) => { - // if the current message is a Response, we look at the nested request - // to see where the response should go. - match &self.nested_request { - None => { - println!( - "need non-None incoming message or nested_request to handle Response\r" - ); - return None; - } - Some(ref nested) => Some(( - nested.id, - match &nested.rsvp { - None => nested.source.clone(), - Some(address) => address.clone(), - }, - )), - } - } - } - } } /// create a specific process, and generate a task that will run it. @@ -572,8 +525,8 @@ pub async fn make_process_loop( self_sender: send_to_process, send_to_loop: send_to_loop.clone(), send_to_terminal: send_to_terminal.clone(), - nested_request: None, - current_incoming_message: None, + prompting_message: None, + last_blob: None, contexts: HashMap::new(), message_queue: VecDeque::new(), caps_oracle: caps_oracle.clone(), @@ -799,3 +752,10 @@ pub async fn make_process_loop( } Ok(()) } + +async fn print(sender: &t::PrintSender, verbosity: u8, content: String) { + let _ = sender + .send(t::Printout { verbosity, content }) + .await + .expect("fatal: kernel terminal print pipe died!"); +}