From cb22abc4dc1a9c7e666c5e7d53da574a34ab5741 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 2 Jul 2024 16:13:33 +0200 Subject: [PATCH] ...spring cleaning --- README.md | 1 - kinode/packages/chess/chess/src/lib.rs | 12 +- kinode/packages/terminal/terminal/src/lib.rs | 211 ++- kinode/src/eth/mod.rs | 11 +- kinode/src/kernel/mod.rs | 1226 ++++++++---------- kinode/src/kernel/process.rs | 475 +++---- kinode/src/kernel/standard_host.rs | 126 +- kinode/src/kernel/standard_host_v0.rs | 33 +- kinode/src/kv.rs | 16 +- kinode/src/sqlite.rs | 34 +- kinode/src/state.rs | 19 +- kinode/src/vfs.rs | 70 +- lib/src/core.rs | 135 +- 13 files changed, 1035 insertions(+), 1334 deletions(-) diff --git a/README.md b/README.md index f227845f..b7311aa4 100644 --- a/README.md +++ b/README.md @@ -129,7 +129,6 @@ In order to call a script with shorthand, a user may apply an *alias* using the alias ``` Subsequent use of the shorthand will then be interpolated as the process ID. -Aliases are not currently persisted between boots, although this may change. A list of the other terminal scripts included in this distro: diff --git a/kinode/packages/chess/chess/src/lib.rs b/kinode/packages/chess/chess/src/lib.rs index 2e8fac50..d0fdeeec 100644 --- a/kinode/packages/chess/chess/src/lib.rs +++ b/kinode/packages/chess/chess/src/lib.rs @@ -49,10 +49,14 @@ fn load_chess_state() -> ChessState { games, clients: HashSet::new(), }, - None => ChessState { - games: HashMap::new(), - clients: HashSet::new(), - }, + None => { + let state = ChessState { + games: HashMap::new(), + clients: HashSet::new(), + }; + save_chess_state(&state); + state + } } } diff --git a/kinode/packages/terminal/terminal/src/lib.rs b/kinode/packages/terminal/terminal/src/lib.rs index fb876f04..8e7bd9e4 100644 --- a/kinode/packages/terminal/terminal/src/lib.rs +++ b/kinode/packages/terminal/terminal/src/lib.rs @@ -2,8 +2,8 @@ use anyhow::anyhow; use kinode_process_lib::kernel_types as kt; use kinode_process_lib::kinode::process::standard as wit; use kinode_process_lib::{ - call_init, get_blob, get_typed_state, our_capabilities, print_to_terminal, println, set_state, - vfs, Address, Capability, ProcessId, Request, + call_init, get_blob, get_typed_state, our_capabilities, println, set_state, vfs, Address, + Capability, ProcessId, Request, }; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; @@ -42,69 +42,70 @@ fn parse_command(state: &mut TerminalState, line: &str) -> anyhow::Result<()> { }, }; - match handle_run(&state.our, &process, args.to_string()) { - Ok(_) => Ok(()), // TODO clean up process - Err(e) => Err(anyhow!("failed to instantiate script: {}", e)), - } + handle_run(&state.our, &process, args.to_string()) } call_init!(init); fn init(our: Address) { let mut state: TerminalState = match get_typed_state(|bytes| Ok(bincode::deserialize(bytes)?)) { Some(s) => s, - None => TerminalState { - our, - aliases: HashMap::from([ - ( - "alias".to_string(), - ProcessId::new(Some("alias"), "terminal", "sys"), - ), - ( - "cat".to_string(), - ProcessId::new(Some("cat"), "terminal", "sys"), - ), - ( - "echo".to_string(), - ProcessId::new(Some("echo"), "terminal", "sys"), - ), - ( - "hi".to_string(), - ProcessId::new(Some("hi"), "terminal", "sys"), - ), - ( - "kill".to_string(), - ProcessId::new(Some("kill"), "terminal", "sys"), - ), - ( - "kfetch".to_string(), - ProcessId::new(Some("kfetch"), "terminal", "sys"), - ), - ( - "m".to_string(), - ProcessId::new(Some("m"), "terminal", "sys"), - ), - ( - "namehash_to_name".to_string(), - ProcessId::new(Some("namehash_to_name"), "terminal", "sys"), - ), - ( - "net_diagnostics".to_string(), - ProcessId::new(Some("net_diagnostics"), "terminal", "sys"), - ), - ( - "peer".to_string(), - ProcessId::new(Some("peer"), "terminal", "sys"), - ), - ( - "peers".to_string(), - ProcessId::new(Some("peers"), "terminal", "sys"), - ), - ( - "top".to_string(), - ProcessId::new(Some("top"), "terminal", "sys"), - ), - ]), - }, + None => { + let state = TerminalState { + our, + aliases: HashMap::from([ + ( + "alias".to_string(), + ProcessId::new(Some("alias"), "terminal", "sys"), + ), + ( + "cat".to_string(), + ProcessId::new(Some("cat"), "terminal", "sys"), + ), + ( + "echo".to_string(), + ProcessId::new(Some("echo"), "terminal", "sys"), + ), + ( + "hi".to_string(), + ProcessId::new(Some("hi"), "terminal", "sys"), + ), + ( + "kill".to_string(), + ProcessId::new(Some("kill"), "terminal", "sys"), + ), + ( + "kfetch".to_string(), + ProcessId::new(Some("kfetch"), "terminal", "sys"), + ), + ( + "m".to_string(), + ProcessId::new(Some("m"), "terminal", "sys"), + ), + ( + "namehash_to_name".to_string(), + ProcessId::new(Some("namehash_to_name"), "terminal", "sys"), + ), + ( + "net_diagnostics".to_string(), + ProcessId::new(Some("net_diagnostics"), "terminal", "sys"), + ), + ( + "peer".to_string(), + ProcessId::new(Some("peer"), "terminal", "sys"), + ), + ( + "peers".to_string(), + ProcessId::new(Some("peers"), "terminal", "sys"), + ), + ( + "top".to_string(), + ProcessId::new(Some("top"), "terminal", "sys"), + ), + ]), + }; + set_state(&bincode::serialize(&state).unwrap()); + state + } }; loop { @@ -126,7 +127,7 @@ fn init(our: Address) { // checks for a request from a terminal script (different process, same package) } else if state.our.node == source.node && state.our.package() == source.package() { let Ok(action) = serde_json::from_slice::(&body) else { - println!("failed to parse action from: {}", source); + println!("failed to parse action from {source}"); continue; }; match action { @@ -138,7 +139,7 @@ fn init(our: Address) { } } } else { - println!("ignoring message from: {}", source); + println!("ignoring message from {source}"); continue; } } @@ -154,26 +155,16 @@ fn init(our: Address) { } fn handle_run(our: &Address, process: &ProcessId, args: String) -> anyhow::Result<()> { - let wasm_path = format!("{}.wasm", process.process()); - let package = format!("{}:{}", process.package(), process.publisher()); - let drive_path = format!("/{}/pkg", package); + let drive_path = format!("/{}:{}/pkg", process.package(), process.publisher()); let Ok(entry) = get_entry(process) else { return Err(anyhow::anyhow!("script not in scripts.json file")); }; - let wasm_path = if wasm_path.starts_with("/") { - wasm_path - } else { - format!("/{}", wasm_path) - }; - let wasm_path = format!("{}{}", drive_path, wasm_path); - // build initial caps - let process_id = format!("{}:{}", rand::random::(), package); // all scripts are given random process IDs - let Ok(parsed_new_process_id) = process_id.parse::() else { - return Err(anyhow::anyhow!("invalid process id!")); - }; + let wasm_path = format!("{drive_path}/{}.wasm", process.process()); - let _bytes_response = Request::new() - .target(("our", "vfs", "distro", "sys")) + // all scripts are given random process IDs + let process_id = ProcessId::new(None, process.package(), process.publisher()); + + Request::to(("our", "vfs", "distro", "sys")) .body(serde_json::to_vec(&vfs::VfsRequest { path: wasm_path.clone(), action: vfs::VfsAction::Read, @@ -191,7 +182,7 @@ fn handle_run(our: &Address, process: &ProcessId, args: String) -> anyhow::Resul Capability { issuer: Address { node: our.node.clone(), - process: parsed_new_process_id.clone(), + process: process_id.clone(), }, params: "\"messaging\"".into(), }, @@ -211,7 +202,7 @@ fn handle_run(our: &Address, process: &ProcessId, args: String) -> anyhow::Resul Capability { issuer: Address { node: our.node.clone(), - process: parsed_new_process_id.clone(), + process: process_id.clone(), }, params: params.to_string(), }, @@ -227,8 +218,7 @@ fn handle_run(our: &Address, process: &ProcessId, args: String) -> anyhow::Resul } } for (process, cap) in granted_caps.into_iter() { - Request::new() - .target(("our", "kernel", "distro", "sys")) + Request::to(("our", "kernel", "distro", "sys")) .body(serde_json::to_vec(&kt::KernelCommand::GrantCapabilities { target: process, capabilities: vec![kt::de_wit_capability(cap)], @@ -237,10 +227,9 @@ fn handle_run(our: &Address, process: &ProcessId, args: String) -> anyhow::Resul } // inherits the blob from the previous request, `_bytes_response`, // containing the wasm byte code of the process - Request::new() - .target(("our", "kernel", "distro", "sys")) + Request::to(("our", "kernel", "distro", "sys")) .body(serde_json::to_vec(&kt::KernelCommand::InitializeProcess { - id: parsed_new_process_id.clone(), + id: process_id.clone(), wasm_bytes_handle: wasm_path.clone(), wit_version: entry.wit_version, on_exit: kt::OnExit::None, @@ -305,42 +294,20 @@ fn handle_run(our: &Address, process: &ProcessId, args: String) -> anyhow::Resul requested_caps.push(kt::de_wit_capability(cap.clone())); } } - print_to_terminal( - 3, - &format!( - "{}: Process {{\n wasm_bytes_handle: {},\n on_exit: {:?},\n public: {}\n capabilities: {}\n}}", - parsed_new_process_id.clone(), - wasm_path.clone(), - kt::OnExit::None, - entry.public, - { - let mut caps_string = "[".to_string(); - for cap in requested_caps.iter() { - caps_string += &format!("\n {}({})", cap.issuer.to_string(), cap.params); - } - caps_string + "\n ]" - }, - ), - ); - Request::new() - .target(("our", "kernel", "distro", "sys")) + Request::to(("our", "kernel", "distro", "sys")) .body(serde_json::to_vec(&kt::KernelCommand::GrantCapabilities { - target: parsed_new_process_id.clone(), + target: process_id.clone(), capabilities: requested_caps, })?) .send()?; - let _ = Request::new() - .target(("our", "kernel", "distro", "sys")) + Request::to(("our", "kernel", "distro", "sys")) .body(serde_json::to_vec(&kt::KernelCommand::RunProcess( - parsed_new_process_id.clone(), + process_id.clone(), ))?) .send_and_await_response(5)??; - let req = Request::new() - .target(("our", parsed_new_process_id)) - .body(args.into_bytes()); - - req.send().unwrap(); - + Request::to(("our", process_id)) + .body(args.into_bytes()) + .send()?; Ok(()) } @@ -351,20 +318,15 @@ fn handle_alias_change( ) -> anyhow::Result<()> { match process { Some(process) => { - // first check to make sure the script is actually a script - let Ok(_) = get_entry(&process) else { - return Err(anyhow!("process {} not found", process)); - }; - - state.aliases.insert(alias.clone(), process.clone()); - println!("alias {} set to {}", alias, process); + println!("alias {alias} set for {process}"); + state.aliases.insert(alias, process); } None => { if state.aliases.contains_key(&alias) { state.aliases.remove(&alias); - println!("alias {} removed", alias); + println!("alias {alias} removed"); } else { - println!("alias {} not found", alias); + println!("alias {alias} not found"); } } } @@ -374,10 +336,9 @@ fn handle_alias_change( fn get_entry(process: &ProcessId) -> anyhow::Result { let drive_path = format!("/{}:{}/pkg", process.package(), process.publisher()); - Request::new() - .target(("our", "vfs", "distro", "sys")) + Request::to(("our", "vfs", "distro", "sys")) .body(serde_json::to_vec(&vfs::VfsRequest { - path: format!("{}/scripts.json", drive_path), + path: format!("{drive_path}/scripts.json"), action: vfs::VfsAction::Read, })?) .send_and_await_response(5)??; diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index a9999059..2ca7d26d 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -908,16 +908,7 @@ async fn check_for_root_cap( caps_oracle .send(CapMessage::Has { on: process.clone(), - cap: Capability { - issuer: Address { - node: our.to_string(), - process: ETH_PROCESS_ID.clone(), - }, - params: serde_json::to_string(&serde_json::json!({ - "root": true, - })) - .unwrap(), - }, + cap: Capability::new((our, ETH_PROCESS_ID.clone()), "{\"root\":true}"), responder: send_cap_bool, }) .await diff --git a/kinode/src/kernel/mod.rs b/kinode/src/kernel/mod.rs index eca399f9..2a1844fa 100644 --- a/kinode/src/kernel/mod.rs +++ b/kinode/src/kernel/mod.rs @@ -1,16 +1,12 @@ -use crate::KERNEL_PROCESS_ID; -use anyhow::Result; -use ring::signature; +use lib::types::core::{self as t, KERNEL_PROCESS_ID, STATE_PROCESS_ID, VFS_PROCESS_ID}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::str::FromStr; -use std::sync::Arc; -use tokio::sync::mpsc; -use tokio::task::JoinHandle; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; +use tokio::{sync::mpsc, task::JoinHandle}; use wasmtime::{Config, Engine, WasmBacktraceDetails}; -use lib::types::core::{self as t, STATE_PROCESS_ID, VFS_PROCESS_ID}; - /// Manipulate a single process. pub mod process; /// Implement the functions served to processes by `wit-v0.7.0/kinode.wit`. @@ -18,9 +14,8 @@ mod standard_host; /// Implement the functions served to processes by `wit-v0.8.0/kinode.wit`. mod standard_host_v0; -const PROCESS_CHANNEL_CAPACITY: usize = 100; - pub const LATEST_WIT_VERSION: u32 = 0; +const PROCESS_CHANNEL_CAPACITY: usize = 100; #[derive(Serialize, Deserialize)] struct StartProcessMetadata { @@ -33,7 +28,7 @@ struct StartProcessMetadata { // live in event loop type Senders = HashMap; // handles are for managing liveness, map is for persistence and metadata. -type ProcessHandles = HashMap>>; +type ProcessHandles = HashMap>>; enum ProcessSender { Runtime { @@ -44,51 +39,42 @@ enum ProcessSender { } /// persist kernel's process_map state for next bootup -async fn persist_state( - our_name: &str, - send_to_loop: &t::MessageSender, - process_map: &t::ProcessMap, -) -> Result<()> { - let bytes = bincode::serialize(process_map)?; - send_to_loop - .send(t::KernelMessage { - id: rand::random(), - source: t::Address { - node: our_name.to_string(), - process: KERNEL_PROCESS_ID.clone(), - }, - target: t::Address { - node: our_name.to_string(), - process: STATE_PROCESS_ID.clone(), - }, - rsvp: None, - message: t::Message::Request(t::Request { - inherit: true, - expects_response: None, - body: serde_json::to_vec(&t::StateAction::SetState(KERNEL_PROCESS_ID.clone())) - .unwrap(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: Some(t::LazyLoadBlob { mime: None, bytes }), - }) - .await?; - Ok(()) +/// TODO refactor this to hit the DB directly for performance's sake +async fn persist_state(send_to_loop: &t::MessageSender, process_map: &t::ProcessMap) { + t::KernelMessage::builder() + .id(rand::random()) + .source(("our", KERNEL_PROCESS_ID.clone())) + .target(("our", STATE_PROCESS_ID.clone())) + .message(t::Message::Request(t::Request { + inherit: false, + expects_response: None, + body: serde_json::to_vec(&t::StateAction::SetState(KERNEL_PROCESS_ID.clone())).unwrap(), + metadata: None, + capabilities: vec![], + })) + .lazy_load_blob(Some(t::LazyLoadBlob { + mime: None, + bytes: bincode::serialize(process_map) + .expect("fatal: kernel couldn't serialize process map"), + })) + .build() + .unwrap() + .send(send_to_loop) + .await; } -/// handle commands inside messages sent directly to kernel. source is always our own node. +/// handle commands inside messages sent directly to kernel. source must be our own node. /// returns Some(()) if the kernel should shut down. async fn handle_kernel_request( - our_name: String, - keypair: Arc, + our_name: &str, + keypair: &Arc, km: t::KernelMessage, - send_to_loop: t::MessageSender, - send_to_terminal: t::PrintSender, + send_to_loop: &t::MessageSender, + send_to_terminal: &t::PrintSender, senders: &mut Senders, process_handles: &mut ProcessHandles, process_map: &mut t::ProcessMap, - reverse_cap_index: &mut t::ReverseCapIndex, - caps_oracle: t::CapMessageSender, + caps_oracle: &t::CapMessageSender, engine: &Engine, home_directory_path: &str, ) -> Option<()> { @@ -97,51 +83,47 @@ async fn handle_kernel_request( }; let command: t::KernelCommand = match serde_json::from_slice(&request.body) { Err(e) => { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 0, - content: format!("kernel: couldn't parse command: {:?}", e), - }) + t::Printout::new(0, format!("kernel: couldn't parse command: {e:?}")) + .send(send_to_terminal) .await; return None; } Ok(c) => c, }; match command { + t::KernelCommand::Shutdown => { + for handle in process_handles.values() { + handle.abort(); + } + Some(()) + } + // + // sent from kernel to kernel: we've completed boot sequence, and can + // now go ahead and actually start executing persisted userspace processes + // t::KernelCommand::Booted => { for (process_id, process_sender) in senders { let ProcessSender::Userspace(sender) = process_sender else { continue; }; - let _ = sender - .send(Ok(t::KernelMessage { - id: km.id, - source: t::Address { - node: our_name.clone(), - process: KERNEL_PROCESS_ID.clone(), - }, - target: t::Address { - node: our_name.clone(), - process: process_id.clone(), - }, - rsvp: None, - message: t::Message::Request(t::Request { + sender + .send(Ok(t::KernelMessage::builder() + .id(km.id) + .source((our_name, KERNEL_PROCESS_ID.clone())) + .target((our_name, process_id.clone())) + .message(t::Message::Request(t::Request { inherit: false, expects_response: None, body: b"run".to_vec(), metadata: None, capabilities: vec![], - }), - lazy_load_blob: None, - })) - .await; + })) + .build() + .unwrap())) + .await + .expect("fatal: kernel couldn't send run message to process"); } - } - t::KernelCommand::Shutdown => { - for handle in process_handles.values() { - handle.abort(); - } - return Some(()); + None } // // initialize a new process. this is the only way to create a new process. @@ -155,38 +137,28 @@ async fn handle_kernel_request( public, } => { let Some(blob) = km.lazy_load_blob else { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 0, - content: "kernel: process startup requires bytes".into(), - }) + t::Printout::new(0, "kernel: process startup requires bytes") + .send(send_to_terminal) .await; // fire an error back - send_to_loop - .send(t::KernelMessage { - id: km.id, - source: t::Address { - node: our_name.clone(), - process: KERNEL_PROCESS_ID.clone(), - }, - target: km.rsvp.unwrap_or(km.source), - rsvp: None, - message: t::Message::Response(( - t::Response { - inherit: false, - body: serde_json::to_vec( - &t::KernelResponse::InitializeProcessError, - ) + t::KernelMessage::builder() + .id(km.id) + .source(("our", KERNEL_PROCESS_ID.clone())) + .target(km.rsvp.unwrap_or(km.source)) + .message(t::Message::Response(( + t::Response { + inherit: false, + body: serde_json::to_vec(&t::KernelResponse::InitializeProcessError) .unwrap(), - metadata: None, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - }) - .await - .expect("event loop: fatal: sender died"); + metadata: None, + capabilities: vec![], + }, + None, + ))) + .build() + .unwrap() + .send(send_to_loop) + .await; return None; }; @@ -207,11 +179,16 @@ async fn handle_kernel_request( valid_capabilities.insert(cap, sig.to_vec()); } None => { - println!( - "kernel: InitializeProcess caller {} doesn't have capability\r", - km.source.process - ); - continue; + t::Printout::new( + 0, + format!( + "kernel: InitializeProcess caller {} doesn't have capability {}", + km.source.process, + cap + ) + ) + .send(send_to_terminal) + .await; } } } @@ -219,255 +196,175 @@ async fn handle_kernel_request( // give the initializer and itself the messaging cap. // NOTE: we do this even if the process is public, because // a process might redundantly call grant_capabilities. - let msg_cap = t::Capability { - issuer: t::Address { - node: our_name.clone(), - process: id.clone(), - }, - params: "\"messaging\"".into(), - }; - valid_capabilities.insert( - msg_cap.clone(), - keypair - .sign(&rmp_serde::to_vec(&msg_cap).unwrap()) - .as_ref() - .to_vec(), - ); + let msg_cap = t::Capability::messaging((our_name, id.clone())); + let cap_sig = keypair.sign(&rmp_serde::to_vec(&msg_cap).unwrap()); + valid_capabilities.insert(msg_cap.clone(), cap_sig.as_ref().to_vec()); + caps_oracle .send(t::CapMessage::Add { on: km.source.process.clone(), - caps: vec![t::Capability { - issuer: t::Address { - node: our_name.clone(), - process: id.clone(), - }, - params: "\"messaging\"".into(), - }], - responder: tokio::sync::oneshot::channel().0, + caps: vec![msg_cap], + responder: None, }) .await .expect("event loop: fatal: sender died"); - // fires "success" response back if successful - match start_process( - our_name.clone(), + let start_process_metadata = StartProcessMetadata { + source: if let Some(ref rsvp) = km.rsvp { + rsvp.clone() + } else { + km.source.clone() + }, + process_id: id, + persisted: t::PersistedProcess { + wasm_bytes_handle, + wit_version, + on_exit, + capabilities: valid_capabilities, + public, + }, + reboot: false, + }; + let response = match start_process( + our_name, keypair.clone(), - km.id, blob.bytes, - send_to_loop.clone(), + send_to_loop, send_to_terminal, senders, process_handles, - process_map, engine, caps_oracle, - &StartProcessMetadata { - source: if let Some(ref rsvp) = km.rsvp { - rsvp.clone() - } else { - km.source.clone() - }, - process_id: id, - persisted: t::PersistedProcess { - wasm_bytes_handle, - wit_version, - on_exit, - capabilities: valid_capabilities, - public, - }, - reboot: false, - }, + &start_process_metadata, &home_directory_path, ) .await { - Ok(()) => (), - Err(_e) => { - send_to_loop - .send(t::KernelMessage { - id: km.id, - source: t::Address { - node: our_name.clone(), - process: KERNEL_PROCESS_ID.clone(), - }, - target: km.rsvp.unwrap_or(km.source), - rsvp: None, - message: t::Message::Response(( - t::Response { - inherit: false, - body: serde_json::to_vec( - &t::KernelResponse::InitializeProcessError, - ) - .unwrap(), - metadata: None, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - }) - .await - .expect("event loop: fatal: sender died"); + Ok(()) => { + let on_exit_none = start_process_metadata.persisted.on_exit.is_none(); + process_map.insert( + start_process_metadata.process_id, + start_process_metadata.persisted, + ); + if !start_process_metadata.reboot && !on_exit_none { + // if new, and not totally transient, persist + persist_state(&send_to_loop, process_map).await; + } + t::KernelResponse::InitializedProcess } - } + Err(e) => { + t::Printout::new(0, format!("kernel: error initializing process: {e:?}")) + .send(send_to_terminal) + .await; + t::KernelResponse::InitializeProcessError + } + }; + t::KernelMessage::builder() + .id(km.id) + .source(("our", KERNEL_PROCESS_ID.clone())) + .target(km.rsvp.unwrap_or(km.source)) + .message(t::Message::Response(( + t::Response { + inherit: false, + body: serde_json::to_vec(&response).unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + ))) + .build() + .unwrap() + .send(send_to_loop) + .await; + None } t::KernelCommand::GrantCapabilities { target, capabilities, } => { - let Some(entry) = process_map.get_mut(&target) else { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 0, - content: format!( - "kernel: no such process {:?} to GrantCapabilities", - target - ), - }) - .await; - return None; - }; - let signed_caps: Vec<(t::Capability, Vec)> = capabilities - .iter() - .map(|cap| { - ( - cap.clone(), - keypair - .sign(&rmp_serde::to_vec(&cap).unwrap()) - .as_ref() - .to_vec(), - ) + caps_oracle + .send(t::CapMessage::Add { + on: target, + caps: capabilities, + responder: None, }) - .collect(); - entry.capabilities.extend(signed_caps.clone()); - // add these to reverse cap index - for (cap, _) in &signed_caps { - reverse_cap_index - .entry(cap.clone().issuer.process) - .or_insert_with(HashMap::new) - .entry(target.clone()) - .or_insert_with(Vec::new) - .push(cap.clone()); - } - let _ = persist_state(&our_name, &send_to_loop, process_map).await; + .await + .expect("event loop: fatal: sender died"); + None } t::KernelCommand::DropCapabilities { target, capabilities, } => { - let Some(entry) = process_map.get_mut(&target) else { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 1, - content: format!( - "kernel: no such process {:?} to DropCapabilities", - target - ), - }) - .await; - return None; - }; - for cap in capabilities { - entry.capabilities.remove(&cap); - } - let _ = persist_state(&our_name, &send_to_loop, process_map).await; + caps_oracle + .send(t::CapMessage::Drop { + on: target, + caps: capabilities, + responder: None, + }) + .await + .expect("event loop: fatal: sender died"); + None } // send 'run' message to a process that's already been initialized t::KernelCommand::RunProcess(process_id) => { - if let Some(ProcessSender::Userspace(process_sender)) = senders.get(&process_id) { - if let Ok(()) = process_sender - .send(Ok(t::KernelMessage { - id: rand::random(), - source: t::Address { - node: our_name.clone(), - process: KERNEL_PROCESS_ID.clone(), - }, - target: t::Address { - node: our_name.clone(), - process: process_id, - }, - rsvp: None, - message: t::Message::Request(t::Request { - inherit: false, - expects_response: None, - body: b"run".to_vec(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: None, - })) - .await - { - send_to_loop - .send(t::KernelMessage { - id: km.id, - source: t::Address { - node: our_name.clone(), - process: KERNEL_PROCESS_ID.clone(), - }, - target: km.rsvp.unwrap_or(km.source), - rsvp: None, - message: t::Message::Response(( - t::Response { - inherit: false, - body: serde_json::to_vec(&t::KernelResponse::StartedProcess) - .unwrap(), - metadata: None, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - }) - .await - .expect("event loop: fatal: sender died"); - } - } else { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 0, - content: format!("kernel: no such process {:?} to run", process_id), - }) - .await; - // fire an error back - send_to_loop - .send(t::KernelMessage { - id: km.id, - source: t::Address { - node: our_name.clone(), - process: KERNEL_PROCESS_ID.clone(), - }, - target: km.rsvp.unwrap_or(km.source), - rsvp: None, - message: t::Message::Response(( - t::Response { + let response = + if let Some(ProcessSender::Userspace(process_sender)) = senders.get(&process_id) { + if let Ok(()) = process_sender + .send(Ok(t::KernelMessage::builder() + .id(rand::random()) + .source((our_name, KERNEL_PROCESS_ID.clone())) + .target((our_name, process_id)) + .message(t::Message::Request(t::Request { inherit: false, - body: serde_json::to_vec(&t::KernelResponse::RunProcessError) - .unwrap(), + expects_response: None, + body: b"run".to_vec(), metadata: None, capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - }) - .await - .expect("event loop: fatal: sender died"); - } + })) + .build() + .unwrap())) + .await + { + t::KernelResponse::StartedProcess + } else { + t::KernelResponse::RunProcessError + } + } else { + t::Printout::new(0, format!("kernel: no such process {process_id} to run")) + .send(send_to_terminal) + .await; + t::KernelResponse::RunProcessError + }; + t::KernelMessage::builder() + .id(km.id) + .source(("our", KERNEL_PROCESS_ID.clone())) + .target(km.rsvp.unwrap_or(km.source)) + .message(t::Message::Response(( + t::Response { + inherit: false, + body: serde_json::to_vec(&response).unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + ))) + .build() + .unwrap() + .send(send_to_loop) + .await; + None } t::KernelCommand::KillProcess(process_id) => { // brutal and savage killing: aborting the task. // do not do this to a process if you don't want to risk // dropped messages / un-replied-to-requests / revoked caps - let _ = senders.remove(&process_id); + senders.remove(&process_id); let process_handle = match process_handles.remove(&process_id) { Some(ph) => ph, None => { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 2, - content: format!("kernel: no such process {process_id} to kill"), - }) + t::Printout::new(2, format!("kernel: no such process {process_id} to kill")) + .send(send_to_terminal) .await; return None; } @@ -477,48 +374,38 @@ async fn handle_kernel_request( caps_oracle .send(t::CapMessage::RevokeAll { on: process_id.clone(), - responder: tokio::sync::oneshot::channel().0, + responder: None, }) .await .expect("event loop: fatal: sender died"); if request.expects_response.is_none() { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 2, - content: format!("killing process {process_id}"), - }) + t::Printout::new(2, format!("kernel: killing process {process_id}")) + .send(send_to_terminal) .await; return None; } - let _ = send_to_terminal - .send(t::Printout { - verbosity: 0, - content: format!("kernel: killing process {}", process_id), - }) + t::Printout::new(0, format!("kernel: killing process {process_id}")) + .send(send_to_terminal) .await; - send_to_loop - .send(t::KernelMessage { - id: km.id, - source: t::Address { - node: our_name.clone(), - process: KERNEL_PROCESS_ID.clone(), + t::KernelMessage::builder() + .id(km.id) + .source(("our", KERNEL_PROCESS_ID.clone())) + .target(km.rsvp.unwrap_or(km.source)) + .message(t::Message::Response(( + t::Response { + inherit: false, + body: serde_json::to_vec(&t::KernelResponse::KilledProcess(process_id)) + .unwrap(), + metadata: None, + capabilities: vec![], }, - target: km.rsvp.unwrap_or(km.source), - rsvp: None, - message: t::Message::Response(( - t::Response { - inherit: false, - body: serde_json::to_vec(&t::KernelResponse::KilledProcess(process_id)) - .unwrap(), - metadata: None, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - }) - .await - .expect("event loop: fatal: sender died"); + None, + ))) + .build() + .unwrap() + .send(send_to_loop) + .await; + None } t::KernelCommand::Debug(kind) => { let response = match kind { @@ -538,60 +425,47 @@ async fn handle_kernel_request( .map(|p| p.capabilities.contains_key(&cap)), ), }; - send_to_loop - .send(t::KernelMessage { - id: km.id, - source: t::Address { - node: our_name.clone(), - process: KERNEL_PROCESS_ID.clone(), + t::KernelMessage::builder() + .id(km.id) + .source(("our", KERNEL_PROCESS_ID.clone())) + .target(km.rsvp.unwrap_or(km.source)) + .message(t::Message::Response(( + t::Response { + inherit: false, + body: serde_json::to_vec(&t::KernelResponse::Debug(response)).unwrap(), + metadata: None, + capabilities: vec![], }, - target: km.rsvp.unwrap_or(km.source), - rsvp: None, - message: t::Message::Response(( - t::Response { - inherit: false, - body: serde_json::to_vec(&t::KernelResponse::Debug(response)).unwrap(), - metadata: None, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - }) - .await - .expect("event loop: fatal: sender died"); + None, + ))) + .build() + .unwrap() + .send(send_to_loop) + .await; + None } } - None } /// spawn a process loop and insert the process in the relevant kernel state maps async fn start_process( - our_name: String, - keypair: Arc, - km_id: u64, + our_name: &str, + keypair: Arc, km_blob_bytes: Vec, - send_to_loop: t::MessageSender, - send_to_terminal: t::PrintSender, + send_to_loop: &t::MessageSender, + send_to_terminal: &t::PrintSender, senders: &mut Senders, process_handles: &mut ProcessHandles, - process_map: &mut t::ProcessMap, engine: &Engine, - caps_oracle: t::CapMessageSender, + caps_oracle: &t::CapMessageSender, process_metadata: &StartProcessMetadata, home_directory_path: &str, -) -> Result<()> { +) -> anyhow::Result<()> { let (send_to_process, recv_in_process) = mpsc::channel::>(PROCESS_CHANNEL_CAPACITY); let id = &process_metadata.process_id; if senders.contains_key(id) { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 0, - content: format!("kernel: process with ID {} already exists", id), - }) - .await; - return Err(anyhow::anyhow!("process with ID {} already exists", id)); + return Err(anyhow::anyhow!("process with ID {id} already exists")); } senders.insert( id.clone(), @@ -599,7 +473,7 @@ async fn start_process( ); let metadata = t::ProcessMetadata { our: t::Address { - node: our_name.clone(), + node: our_name.to_string(), process: id.clone(), }, wasm_bytes_handle: process_metadata.persisted.wasm_bytes_handle.clone(), @@ -611,44 +485,17 @@ async fn start_process( id.clone(), tokio::spawn(process::make_process_loop( keypair.clone(), - metadata.clone(), + metadata, send_to_loop.clone(), send_to_terminal.clone(), recv_in_process, send_to_process, km_blob_bytes, - caps_oracle, + caps_oracle.clone(), engine.clone(), home_directory_path.to_string(), )), ); - - process_map.insert(id.clone(), process_metadata.persisted.clone()); - if !process_metadata.reboot { - // if new, persist - persist_state(&our_name, &send_to_loop, process_map).await?; - } - send_to_loop - .send(t::KernelMessage { - id: km_id, - source: t::Address { - node: our_name.clone(), - process: KERNEL_PROCESS_ID.clone(), - }, - target: process_metadata.source.clone(), - rsvp: None, - message: t::Message::Response(( - t::Response { - inherit: false, - body: serde_json::to_vec(&t::KernelResponse::InitializedProcess)?, - metadata: None, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - }) - .await?; Ok(()) } @@ -656,7 +503,7 @@ async fn start_process( /// all processes (Wasm apps) and also runtime tasks. pub async fn kernel( our: t::Identity, - keypair: Arc, + keypair: Arc, mut process_map: t::ProcessMap, mut reverse_cap_index: t::ReverseCapIndex, caps_oracle_sender: t::CapMessageSender, @@ -675,7 +522,7 @@ pub async fn kernel( bool, )>, default_pki_entries: Vec, -) -> Result<()> { +) -> anyhow::Result<()> { let mut config = Config::new(); config.cache_config_load_default().unwrap(); config.wasm_backtrace_details(WasmBacktraceDetails::Enable); @@ -683,12 +530,12 @@ pub async fn kernel( config.async_support(true); let engine = Engine::new(&config).unwrap(); - let vfs_path = format!("{}/vfs", home_directory_path); + let vfs_path = format!("{home_directory_path}/vfs"); tokio::fs::create_dir_all(&vfs_path) .await .expect("kernel startup fatal: couldn't create vfs dir"); - let mut senders: Senders = HashMap::new(); + let mut senders: Senders = HashMap::with_capacity(process_map.len() + runtime_extensions.len()); senders.insert( t::ProcessId::new(Some("net"), "distro", "sys"), ProcessSender::Runtime { @@ -707,54 +554,43 @@ pub async fn kernel( } // each running process is stored in this map - let mut process_handles: ProcessHandles = HashMap::new(); + let mut process_handles: ProcessHandles = HashMap::with_capacity(process_map.len()); - let mut is_debug: bool = false; + let mut in_stepthrough_mode: bool = false; // this flag starts as true, and terminal will alert us if we can // skip sending prints for every event. let mut print_full_event_loop: bool = true; - let mut reboot_processes: Vec<(t::ProcessId, StartProcessMetadata, Vec)> = vec![]; - // filter out OnExit::None processes from process_map - process_map.retain(|_, persisted| !persisted.on_exit.is_none()); + // create a list of processes which are successfully rebooted, + // keeping only them in the updated post-boot process map + let mut non_rebooted_processes: HashSet = HashSet::new(); for (process_id, persisted) in &process_map { + // filter out OnExit::None processes from process_map + if persisted.on_exit.is_none() { + non_rebooted_processes.insert(process_id.clone()); + continue; + } // runtime extensions will have a bytes_handle of "", because they have no // WASM code saved in filesystem. if persisted.wasm_bytes_handle.is_empty() { continue; } // read wasm bytes directly from vfs - // start process. let wasm_bytes = - match tokio::fs::read(format!("{}/{}", vfs_path, persisted.wasm_bytes_handle)).await { + match tokio::fs::read(format!("{vfs_path}/{}", persisted.wasm_bytes_handle)).await { Ok(bytes) => bytes, Err(e) => { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 0, - content: format!( - "kernel: couldn't read wasm bytes for process: {:?} with error: {}", - process_id, e - ), - }) - .await; + t::Printout::new( + 0, + format!("kernel: couldn't read wasm bytes for process: {process_id}: {e}"), + ) + .send(&send_to_terminal) + .await; + non_rebooted_processes.insert(process_id.clone()); continue; } }; - reboot_processes.push(( - process_id.clone(), - StartProcessMetadata { - source: t::Address { - node: our.name.clone(), - process: KERNEL_PROCESS_ID.clone(), - }, - process_id: process_id.clone(), - persisted: persisted.clone(), - reboot: true, - }, - wasm_bytes, - )); if let t::OnExit::Requests(requests) = &persisted.on_exit { // if a persisted process had on-death-requests, we should perform them now // even in death, a process can only message processes it has capabilities for @@ -763,111 +599,98 @@ pub async fn kernel( let mut request = request.to_owned(); request.expects_response = None; // TODO not sure if we need to verify the signature - if persisted.capabilities.contains_key(&t::Capability { - issuer: address.clone(), - params: "\"messaging\"".into(), - }) { - send_to_loop - .send(t::KernelMessage { - id: rand::random(), - source: t::Address { - node: our.name.clone(), - process: process_id.clone(), - }, - target: address.clone(), - rsvp: None, - message: t::Message::Request(request), - lazy_load_blob: blob.clone(), - }) - .await - .expect("fatal: kernel event loop died"); + if persisted + .capabilities + .contains_key(&t::Capability::messaging(address.clone())) + { + t::KernelMessage::builder() + .id(rand::random()) + .source((&our.name, process_id.clone())) + .target(address.clone()) + .message(t::Message::Request(request)) + .lazy_load_blob(blob.clone()) + .build() + .unwrap() + .send(&send_to_loop) + .await; } } } - } - for (process_id, metadata, wasm_bytes) in reboot_processes { + let start_process_metadata = StartProcessMetadata { + source: t::Address { + node: our.name.clone(), + process: KERNEL_PROCESS_ID.clone(), + }, + process_id: process_id.clone(), + persisted: persisted.clone(), + reboot: true, + }; + match start_process( - our.name.clone(), + &our.name, keypair.clone(), - rand::random(), wasm_bytes, - send_to_loop.clone(), - send_to_terminal.clone(), + &send_to_loop, + &send_to_terminal, &mut senders, &mut process_handles, - &mut process_map, &engine, - caps_oracle_sender.clone(), - &metadata, + &caps_oracle_sender, + &start_process_metadata, home_directory_path.as_str(), ) .await { - Ok(()) => (), + Ok(()) => {} Err(e) => { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 0, - content: format!( - "kernel: couldn't reboot process {:?} with error: {}", - process_id, e - ), - }) + t::Printout::new(0, format!("kernel: couldn't reboot process: {e}")) + .send(&send_to_terminal) .await; + non_rebooted_processes.insert(process_id.clone()); } } } + + process_map.retain(|process_id, _| !non_rebooted_processes.contains(process_id)); + + // persist new state + persist_state(&send_to_loop, &process_map).await; + // after all bootstrapping messages are handled, send a Booted kernelcommand // to turn it on - send_to_loop - .send(t::KernelMessage { - id: rand::random(), - source: t::Address { - node: our.name.clone(), - process: KERNEL_PROCESS_ID.clone(), - }, - target: t::Address { - node: our.name.clone(), - process: KERNEL_PROCESS_ID.clone(), - }, - rsvp: None, - message: t::Message::Request(t::Request { - inherit: true, - expects_response: None, - body: serde_json::to_vec(&t::KernelCommand::Booted).unwrap(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: None, - }) - .await - .expect("fatal: kernel event loop died"); + t::KernelMessage::builder() + .id(rand::random()) + .source((&our.name, KERNEL_PROCESS_ID.clone())) + .target((&our.name, KERNEL_PROCESS_ID.clone())) + .message(t::Message::Request(t::Request { + inherit: true, + expects_response: None, + body: serde_json::to_vec(&t::KernelCommand::Booted).unwrap(), + metadata: None, + capabilities: vec![], + })) + .build() + .unwrap() + .send(&send_to_loop) + .await; + // sending hard coded pki entries into networking for bootstrapped rpc - send_to_loop - .send(t::KernelMessage { - id: rand::random(), - source: t::Address { - node: our.name.clone(), - process: KERNEL_PROCESS_ID.clone(), - }, - target: t::Address { - node: our.name.clone(), - process: t::ProcessId::from_str("net:distro:sys").unwrap(), - }, - rsvp: None, - message: t::Message::Request(t::Request { - inherit: false, - expects_response: None, - body: rmp_serde::to_vec(&t::NetAction::KnsBatchUpdate(default_pki_entries)) - .unwrap(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: None, - }) - .await - .expect("fatal: kernel event loop died"); + t::KernelMessage::builder() + .id(rand::random()) + .source((&our.name, KERNEL_PROCESS_ID.clone())) + .target((our.name.as_str(), "net", "distro", "sys")) + .message(t::Message::Request(t::Request { + inherit: false, + expects_response: None, + body: rmp_serde::to_vec(&t::NetAction::KnsBatchUpdate(default_pki_entries)).unwrap(), + metadata: None, + capabilities: vec![], + })) + .build() + .unwrap() + .send(&send_to_loop) + .await; // main event loop loop { @@ -876,7 +699,7 @@ pub async fn kernel( Some(debug_command) = recv_debug_in_loop.recv() => { match debug_command { t::DebugCommand::ToggleStepthrough => { - is_debug = !is_debug; + in_stepthrough_mode = !in_stepthrough_mode; }, t::DebugCommand::Step => { // can't step here, must be in stepthrough-mode @@ -890,42 +713,37 @@ pub async fn kernel( // directly from the networking task in runtime, and filter them to the // sender of the original attempted message. Some(wrapped_network_error) = network_error_recv.recv() => { - let _ = send_to_terminal.send( - t::Printout { - verbosity: 3, - content: format!("{wrapped_network_error:?}") - } - ).await; + // display every single event when verbose + if print_full_event_loop { + t::Printout::new(3, format!("{wrapped_network_error:?}")).send(&send_to_terminal).await; + } // forward the error to the relevant process match senders.get(&wrapped_network_error.source.process) { Some(ProcessSender::Userspace(sender)) => { - let _ = sender.send(Err(wrapped_network_error)).await; + sender.send(Err(wrapped_network_error)).await.ok(); } Some(ProcessSender::Runtime { net_errors, .. }) => { if let Some(net_errors) = net_errors { - let _ = net_errors.send(wrapped_network_error).await; + net_errors.send(wrapped_network_error).await.ok(); } } None => { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 0, - content: format!( - "event loop: {} failed to deliver a message {}; sender has already terminated", - wrapped_network_error.source.process, - match wrapped_network_error.error.kind { - t::SendErrorKind::Timeout => "due to timeout", - t::SendErrorKind::Offline => "because the receiver is offline", - }, - ) - }) - .await; + t::Printout::new( + 0, + format!( + "event loop: {} failed to deliver a message {}; but process has already terminated", + wrapped_network_error.source.process, + match wrapped_network_error.error.kind { + t::SendErrorKind::Timeout => "due to timeout", + t::SendErrorKind::Offline => "because the receiver is offline", + }, + ) + ).send(&send_to_terminal).await; } } }, // main message receiver: kernel filters and dispatches messages - kernel_message = recv_in_loop.recv() => { - let mut kernel_message = kernel_message.expect("fatal: event loop died"); + Some(mut kernel_message) = recv_in_loop.recv() => { // the kernel treats the node-string "our" as a special case, // and replaces it with the name of the node this kernel is running. if kernel_message.source.node == "our" { @@ -935,7 +753,7 @@ pub async fn kernel( kernel_message.target.node = our.name.clone(); } // - // here: are the special kernel-level capabilities checks! + // here are the special kernel-level capabilities checks! // // enforce capabilities by matching from our set based on fixed format // enforce that if message is directed over the network, process has capability to do so @@ -945,25 +763,17 @@ pub async fn kernel( continue; }; if !proc.capabilities.contains_key( - &t::Capability { - issuer: t::Address { - node: our.name.clone(), - process: KERNEL_PROCESS_ID.clone(), - }, - params: "\"network\"".into(), - } + &t::Capability::new((&our.name, KERNEL_PROCESS_ID.clone()), "\"network\"") ) { // capabilities are not correct! skip this message. - throw_timeout(&our.name, &senders, &kernel_message).await; - let _ = send_to_terminal.send( - t::Printout { - verbosity: 0, - content: format!( - "event loop: process {} doesn't have capability to send networked messages", - kernel_message.source.process - ) - } - ).await; + t::Printout::new( + 0, + format!( + "event loop: process {} doesn't have capability to send networked messages", + kernel_message.source.process + ) + ).send(&send_to_terminal).await; + throw_timeout(&our.name, &senders, kernel_message).await; continue; } } else if kernel_message.source.node != our.name { @@ -971,44 +781,35 @@ pub async fn kernel( // your process can be messaged by any process remotely if it has // networking capabilities. let Some(persisted) = process_map.get(&kernel_message.target.process) else { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 0, - content: format!( - "event loop: got {} from network for {}, but process does not exist{}", - match kernel_message.message { - t::Message::Request(_) => "Request", - t::Message::Response(_) => "Response", - }, - kernel_message.target.process, - match kernel_message.message { - t::Message::Request(_) => "", - t::Message::Response(_) => - "\nhint: if you are using `m`, try awaiting the Response: `m --await 5 ...`", - } - ) - }) - .await; + t::Printout::new( + 0, + format!( + "event loop: got {} from network for {}, but process does not exist{}", + match kernel_message.message { + t::Message::Request(_) => "Request", + t::Message::Response(_) => "Response", + }, + kernel_message.target.process, + match kernel_message.message { + t::Message::Request(_) => "", + t::Message::Response(_) => + "\nhint: if you are using `m`, try awaiting the Response: `m --await 5 ...`", + } + ) + ).send(&send_to_terminal).await; continue; }; if !persisted.capabilities.contains_key( - &t::Capability { - issuer: t::Address { - node: our.name.clone(), - process: KERNEL_PROCESS_ID.clone(), - }, - params: "\"network\"".into(), - }) { + &t::Capability::new((&our.name, KERNEL_PROCESS_ID.clone()), "\"network\"") + ) { // capabilities are not correct! skip this message. - let _ = send_to_terminal.send( - t::Printout { - verbosity: 0, - content: format!( - "event loop: process {} got a message from over the network, but doesn't have capability to receive networked messages", - kernel_message.target.process - ) - } - ).await; + t::Printout::new( + 0, + format!( + "event loop: process {} got a message from over the network, but doesn't have capability to receive networked messages", + kernel_message.target.process + ) + ).send(&send_to_terminal).await; continue; } } else { @@ -1019,82 +820,68 @@ pub async fn kernel( && kernel_message.source.process != *VFS_PROCESS_ID { let Some(persisted_source) = process_map.get(&kernel_message.source.process) else { - throw_timeout(&our.name, &senders, &kernel_message).await; + throw_timeout(&our.name, &senders, kernel_message).await; continue; }; let Some(persisted_target) = process_map.get(&kernel_message.target.process) else { - throw_timeout(&our.name, &senders, &kernel_message).await; - let _ = send_to_terminal.send( - t::Printout { - verbosity: 2, - content: format!( - "event loop: process {} sent message to non-existing {}; dropping message", - kernel_message.source.process, kernel_message.target.process - ) - } - ).await; + t::Printout::new( + 2, + format!( + "event loop: process {} sent message to non-existing {}; dropping message", + kernel_message.source.process, kernel_message.target.process + ) + ).send(&send_to_terminal).await; + throw_timeout(&our.name, &senders, kernel_message).await; continue; }; - if !persisted_target.public && !persisted_source.capabilities.contains_key(&t::Capability { - issuer: t::Address { - node: our.name.clone(), - process: kernel_message.target.process.clone(), - }, - params: "\"messaging\"".into(), - }) { + if !persisted_target.public + && !persisted_source.capabilities.contains_key( + &t::Capability::messaging((&our.name, kernel_message.target.process.clone())) + ) { // capabilities are not correct! skip this message. - throw_timeout(&our.name, &senders, &kernel_message).await; - let _ = send_to_terminal.send( - t::Printout { - verbosity: 0, - content: format!( - "event loop: process {} doesn't have capability to message process {}", - kernel_message.source.process, kernel_message.target.process - ) - } - ).await; + t::Printout::new( + 0, + format!( + "event loop: process {} doesn't have capability to message process {}", + kernel_message.source.process, kernel_message.target.process + ) + ).send(&send_to_terminal).await; + throw_timeout(&our.name, &senders, kernel_message).await; continue; } } } // end capabilities checks + // if debug mode is on, wait for user to step through - while is_debug { + while in_stepthrough_mode { let debug = recv_debug_in_loop.recv().await.expect("event loop: debug channel died"); match debug { - t::DebugCommand::ToggleStepthrough => is_debug = !is_debug, + t::DebugCommand::ToggleStepthrough => in_stepthrough_mode = !in_stepthrough_mode, t::DebugCommand::Step => break, t::DebugCommand::ToggleEventLoop => print_full_event_loop = !print_full_event_loop, } } // display every single event when verbose if print_full_event_loop { - let _ = send_to_terminal.send( - t::Printout { - verbosity: 3, - content: format!("{kernel_message}") - } - ).await; + t::Printout::new(3, format!("{kernel_message}")).send(&send_to_terminal).await; } if our.name != kernel_message.target.node { + // handle messages sent over network send_to_net.send(kernel_message).await.expect("fatal: net module died"); - } else if kernel_message.target.process.process() == "kernel" { - // kernel only accepts messages from our own node - if our.name != kernel_message.source.node { - continue; - } + } else if kernel_message.target.process.process() == "kernel" && kernel_message.source.node == our.name { + // handle messages sent to local kernel if let Some(()) = handle_kernel_request( - our.name.clone(), - keypair.clone(), + &our.name, + &keypair, kernel_message, - send_to_loop.clone(), - send_to_terminal.clone(), + &send_to_loop, + &send_to_terminal, &mut senders, &mut process_handles, &mut process_map, - &mut reverse_cap_index, - caps_oracle_sender.clone(), + &caps_oracle_sender, &engine, &home_directory_path, ).await { @@ -1105,46 +892,42 @@ pub async fn kernel( // pass message to appropriate runtime module or process match senders.get(&kernel_message.target.process) { Some(ProcessSender::Userspace(sender)) => { - let _ = sender.send(Ok(kernel_message)).await; + sender.send(Ok(kernel_message)).await.ok(); } Some(ProcessSender::Runtime { sender, .. }) => { sender.send(kernel_message).await.expect("event loop: fatal: runtime module died"); } None => { - throw_timeout(&our.name, &senders, &kernel_message).await; - send_to_terminal - .send(t::Printout { - verbosity: 0, - content: format!( - "event loop: got {} from {:?} for {:?}, but target doesn't exist (perhaps it terminated): {}", - match kernel_message.message { - t::Message::Request(_) => "Request", - t::Message::Response(_) => "Response", - }, - kernel_message.source.process, - kernel_message.target.process, - kernel_message, - ) - }) - .await - .expect("event loop: fatal: terminal sender died"); + t::Printout::new( + 0, + format!( + "event loop: got {} from {:?} for {:?}, but target doesn't exist (perhaps it terminated): {}", + match kernel_message.message { + t::Message::Request(_) => "Request", + t::Message::Response(_) => "Response", + }, + kernel_message.source.process, + kernel_message.target.process, + kernel_message, + ) + ).send(&send_to_terminal).await; + throw_timeout(&our.name, &senders, kernel_message).await; } } } }, // capabilities oracle: handles all requests to add, drop, and check capabilities Some(cap_message) = caps_oracle_receiver.recv() => { - let _ = send_to_terminal.send( - t::Printout { - verbosity: 3, - content: format!("{cap_message:?}") - } - ).await; + if print_full_event_loop { + t::Printout::new(3, format!("{cap_message:?}")).send(&send_to_terminal).await; + } match cap_message { t::CapMessage::Add { on, caps, responder } => { // insert cap in process map let Some(entry) = process_map.get_mut(&on) else { - let _ = responder.send(false); + if let Some(responder) = responder { + responder.send(false).ok(); + } continue; }; let signed_caps: Vec<(t::Capability, Vec)> = @@ -1162,43 +945,54 @@ pub async fn kernel( .or_insert_with(Vec::new) .push(cap.clone()); } - let _ = persist_state(&our.name, &send_to_loop, &process_map).await; - let _ = responder.send(true); + if !entry.on_exit.is_none() { + persist_state(&send_to_loop, &process_map).await; + } + if let Some(responder) = responder { + responder.send(true).ok(); + } }, t::CapMessage::Drop { on, caps, responder } => { // remove cap from process map let Some(entry) = process_map.get_mut(&on) else { - let _ = responder.send(false); + if let Some(responder) = responder { + responder.send(false).ok(); + } continue; }; for cap in &caps { entry.capabilities.remove(&cap); } - let _ = persist_state(&our.name, &send_to_loop, &process_map).await; - let _ = responder.send(true); + if !entry.on_exit.is_none() { + persist_state(&send_to_loop, &process_map).await; + } + if let Some(responder) = responder { + responder.send(true).ok(); + } }, t::CapMessage::Has { on, cap, responder } => { // return boolean on responder - let _ = responder.send( + responder.send( match process_map.get(&on) { None => false, Some(p) => p.capabilities.contains_key(&cap), } - ); + ).ok(); }, t::CapMessage::GetAll { on, responder } => { // return all caps, signed, on responder - let _ = responder.send( + responder.send( match process_map.get(&on) { None => vec![], Some(p) => p.capabilities.clone().into_iter().collect(), } - ); + ).ok(); }, t::CapMessage::RevokeAll { on, responder } => { let Some(granter) = reverse_cap_index.get(&on) else { - let _ = persist_state(&our.name, &send_to_loop, &process_map).await; - let _ = responder.send(true); + if let Some(responder) = responder { + responder.send(true).ok(); + } continue; }; for (grantee, caps) in granter { @@ -1208,11 +1002,13 @@ pub async fn kernel( } }; } - let _ = persist_state(&our.name, &send_to_loop, &process_map).await; - let _ = responder.send(true); + persist_state(&send_to_loop, &process_map).await; + if let Some(responder) = responder { + responder.send(true).ok(); + } } t::CapMessage::FilterCaps { on, caps, responder } => { - let _ = responder.send( + responder.send( match process_map.get(&on) { None => vec![], Some(p) => { @@ -1229,7 +1025,7 @@ pub async fn kernel( }).collect() }, } - ); + ).ok(); }, } } @@ -1240,29 +1036,27 @@ pub async fn kernel( async fn throw_timeout( our_name: &str, senders: &HashMap, - km: &t::KernelMessage, + km: t::KernelMessage, ) { if let t::Message::Request(req) = &km.message { if req.expects_response.is_some() { - match senders.get(&km.source.process) { - Some(ProcessSender::Userspace(sender)) => { - let _ = sender - .send(Err(t::WrappedSendError { - id: km.id, - source: t::Address { - node: our_name.to_string(), - process: KERNEL_PROCESS_ID.clone(), - }, - error: t::SendError { - kind: t::SendErrorKind::Timeout, - target: km.target.clone(), - lazy_load_blob: km.lazy_load_blob.clone(), - message: km.message.clone(), - }, - })) - .await; - } - _ => return, + if let Some(ProcessSender::Userspace(sender)) = senders.get(&km.source.process) { + sender + .send(Err(t::WrappedSendError { + id: km.id, + source: t::Address { + node: our_name.to_string(), + process: KERNEL_PROCESS_ID.clone(), + }, + error: t::SendError { + kind: t::SendErrorKind::Timeout, + target: km.target, + lazy_load_blob: km.lazy_load_blob, + message: km.message, + }, + })) + .await + .ok(); } } } diff --git a/kinode/src/kernel/process.rs b/kinode/src/kernel/process.rs index 2e03cb81..63475f3d 100644 --- a/kinode/src/kernel/process.rs +++ b/kinode/src/kernel/process.rs @@ -1,15 +1,15 @@ use crate::KERNEL_PROCESS_ID; -use lib::types::core as t; -pub use lib::v0::ProcessV0; -pub use lib::Process; -use ring::signature; -use std::collections::{HashMap, VecDeque}; -use std::sync::Arc; -use tokio::fs; -use tokio::task::JoinHandle; +use lib::{types::core as t, v0::ProcessV0, Process}; +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, +}; +use tokio::{fs, task::JoinHandle}; use wasi_common::sync::Dir; -use wasmtime::component::{Component, Linker, ResourceTable as Table}; -use wasmtime::{Engine, Store}; +use wasmtime::{ + component::{Component, Linker, ResourceTable as Table}, + Engine, Store, +}; use wasmtime_wasi::{ pipe::MemoryOutputPipe, DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView, }; @@ -25,7 +25,7 @@ pub struct ProcessContext { pub struct ProcessState { /// our node's networking keypair - pub keypair: Arc, + pub keypair: Arc, /// information about ourself pub metadata: t::ProcessMetadata, /// pipe from which we get messages from the main event loop @@ -83,29 +83,18 @@ impl WasiView for ProcessWasiV0 { } } -async fn make_component( - engine: Engine, - wasm_bytes: &[u8], +async fn make_table_and_wasi( home_directory_path: String, - process_state: ProcessState, -) -> anyhow::Result<(Process, Store, MemoryOutputPipe)> { - let component = Component::new(&engine, wasm_bytes.to_vec()) - .expect("make_process_loop: couldn't read file"); - - let mut linker = Linker::new(&engine); - Process::add_to_linker(&mut linker, |state: &mut ProcessWasi| state).unwrap(); - + process_state: &ProcessState, +) -> (Table, WasiCtx, MemoryOutputPipe) { let table = Table::new(); let wasi_stderr = MemoryOutputPipe::new(STACK_TRACE_SIZE); - let our_process_id = process_state.metadata.our.process.clone(); - let send_to_terminal = process_state.send_to_terminal.clone(); - let tmp_path = format!( "{}/vfs/{}:{}/tmp", home_directory_path, - our_process_id.package(), - our_process_id.publisher() + process_state.metadata.our.process.package(), + process_state.metadata.our.process.publisher() ); let mut wasi = WasiCtxBuilder::new(); @@ -130,10 +119,26 @@ async fn make_component( } } - let wasi = wasi.stderr(wasi_stderr.clone()).build(); + (table, wasi.stderr(wasi_stderr.clone()).build(), wasi_stderr) +} +async fn make_component( + engine: Engine, + wasm_bytes: &[u8], + home_directory_path: String, + process_state: ProcessState, +) -> anyhow::Result<(Process, Store, MemoryOutputPipe)> { + let component = + Component::new(&engine, wasm_bytes.to_vec()).expect("make_component: couldn't read file"); + + let mut linker = Linker::new(&engine); + Process::add_to_linker(&mut linker, |state: &mut ProcessWasi| state).unwrap(); + let (table, wasi, wasi_stderr) = make_table_and_wasi(home_directory_path, &process_state).await; wasmtime_wasi::command::add_to_linker(&mut linker).unwrap(); + let our_process_id = process_state.metadata.our.process.clone(); + let send_to_terminal = process_state.send_to_terminal.clone(); + let mut store = Store::new( &engine, ProcessWasi { @@ -147,15 +152,12 @@ async fn make_component( match Process::instantiate_async(&mut store, &component, &linker).await { Ok(b) => b, Err(e) => { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 0, - content: format!( - "mk: process {:?} failed to instantiate: {:?}", - our_process_id, e, - ), - }) - .await; + t::Printout::new( + 0, + format!("kernel: process {our_process_id} failed to instantiate: {e:?}"), + ) + .send(&send_to_terminal) + .await; return Err(e); } }; @@ -169,51 +171,17 @@ async fn make_component_v0( home_directory_path: String, process_state: ProcessState, ) -> anyhow::Result<(ProcessV0, Store, MemoryOutputPipe)> { - let component = Component::new(&engine, wasm_bytes.to_vec()) - .expect("make_process_loop: couldn't read file"); + let component = + Component::new(&engine, wasm_bytes.to_vec()).expect("make_component: couldn't read file"); let mut linker = Linker::new(&engine); ProcessV0::add_to_linker(&mut linker, |state: &mut ProcessWasiV0| state).unwrap(); - - let table = Table::new(); - let wasi_stderr = MemoryOutputPipe::new(STACK_TRACE_SIZE); + let (table, wasi, wasi_stderr) = make_table_and_wasi(home_directory_path, &process_state).await; + wasmtime_wasi::command::add_to_linker(&mut linker).unwrap(); let our_process_id = process_state.metadata.our.process.clone(); let send_to_terminal = process_state.send_to_terminal.clone(); - let tmp_path = format!( - "{}/vfs/{}:{}/tmp", - home_directory_path, - our_process_id.package(), - our_process_id.publisher() - ); - - let mut wasi = WasiCtxBuilder::new(); - - // TODO make guarantees about this - if let Ok(Ok(())) = tokio::time::timeout( - std::time::Duration::from_secs(5), - fs::create_dir_all(&tmp_path), - ) - .await - { - if let Ok(wasi_tempdir) = - Dir::open_ambient_dir(tmp_path.clone(), wasi_common::sync::ambient_authority()) - { - wasi.preopened_dir( - wasi_tempdir, - DirPerms::all(), - FilePerms::all(), - tmp_path.clone(), - ) - .env("TEMP_DIR", tmp_path); - } - } - - let wasi = wasi.stderr(wasi_stderr.clone()).build(); - - wasmtime_wasi::command::add_to_linker(&mut linker).unwrap(); - let mut store = Store::new( &engine, ProcessWasiV0 { @@ -227,15 +195,12 @@ async fn make_component_v0( match ProcessV0::instantiate_async(&mut store, &component, &linker).await { Ok(b) => b, Err(e) => { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 0, - content: format!( - "mk: process {:?} failed to instantiate: {:?}", - our_process_id, e, - ), - }) - .await; + t::Printout::new( + 0, + format!("kernel: process {our_process_id} failed to instantiate: {e:?}"), + ) + .send(&send_to_terminal) + .await; return Err(e); } }; @@ -245,7 +210,7 @@ async fn make_component_v0( /// create a specific process, and generate a task that will run it. pub async fn make_process_loop( - keypair: Arc, + keypair: Arc, metadata: t::ProcessMetadata, send_to_loop: t::MessageSender, send_to_terminal: t::PrintSender, @@ -290,9 +255,12 @@ pub async fn make_process_loop( send_to_process.send(message).await?; } + let our = metadata.our.clone(); + let wit_version = metadata.wit_version.clone(); + let process_state = ProcessState { - keypair: keypair.clone(), - metadata: metadata.clone(), + keypair, + metadata, recv_in_process, self_sender: send_to_process, send_to_loop: send_to_loop.clone(), @@ -304,40 +272,28 @@ pub async fn make_process_loop( caps_oracle: caps_oracle.clone(), }; - let metadata = match metadata.wit_version { + let metadata = match wit_version { // assume missing version is oldest wit version None => { let (bindings, mut store, wasi_stderr) = make_component(engine, &wasm_bytes, home_directory_path, process_state).await?; // the process will run until it returns from init() or crashes - match bindings - .call_init(&mut store, &metadata.our.to_string()) - .await - { + match bindings.call_init(&mut store, &our.to_string()).await { Ok(()) => { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 1, - content: format!( - "process {} returned without error", - metadata.our.process - ), - }) + t::Printout::new(1, format!("process {our} returned without error")) + .send(&send_to_terminal) .await; } Err(_) => { let stderr = wasi_stderr.contents().into(); let stderr = String::from_utf8(stderr)?; - let _ = send_to_terminal - .send(t::Printout { - verbosity: 0, - content: format!( - "\x1b[38;5;196mprocess {} ended with error:\x1b[0m\n{}", - metadata.our.process, stderr, - ), - }) - .await; + t::Printout::new( + 0, + format!("\x1b[38;5;196mprocess {our} ended with error:\x1b[0m\n{stderr}",), + ) + .send(&send_to_terminal) + .await; } }; @@ -351,33 +307,21 @@ pub async fn make_process_loop( make_component_v0(engine, &wasm_bytes, home_directory_path, process_state).await?; // the process will run until it returns from init() or crashes - match bindings - .call_init(&mut store, &metadata.our.to_string()) - .await - { + match bindings.call_init(&mut store, &our.to_string()).await { Ok(()) => { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 1, - content: format!( - "process {} returned without error", - metadata.our.process - ), - }) + t::Printout::new(1, format!("process {our} returned without error")) + .send(&send_to_terminal) .await; } Err(_) => { let stderr = wasi_stderr.contents().into(); let stderr = String::from_utf8(stderr)?; - let _ = send_to_terminal - .send(t::Printout { - verbosity: 0, - content: format!( - "\x1b[38;5;196mprocess {} ended with error:\x1b[0m\n{}", - metadata.our.process, stderr, - ), - }) - .await; + t::Printout::new( + 0, + format!("\x1b[38;5;196mprocess {our} ended with error:\x1b[0m\n{stderr}",), + ) + .send(&send_to_terminal) + .await; } }; @@ -390,19 +334,14 @@ pub async fn make_process_loop( // the process has completed, time to perform cleanup // - let our_kernel = t::Address { - node: metadata.our.node.clone(), - process: KERNEL_PROCESS_ID.clone(), - }; - // get caps before killing let (tx, rx) = tokio::sync::oneshot::channel(); - let _ = caps_oracle + caps_oracle .send(t::CapMessage::GetAll { on: metadata.our.process.clone(), responder: tx, }) - .await; + .await?; let initial_capabilities = rx .await? .iter() @@ -412,164 +351,142 @@ pub async fn make_process_loop( }) .collect(); + t::Printout::new( + 1, + format!( + "process {} has OnExit behavior {}", + metadata.our.process, metadata.on_exit + ), + ) + .send(&send_to_terminal) + .await; + // fulfill the designated OnExit behavior match metadata.on_exit { t::OnExit::None => { - send_to_loop - .send(t::KernelMessage { - id: rand::random(), - source: our_kernel.clone(), - target: our_kernel.clone(), - rsvp: None, - message: t::Message::Request(t::Request { - inherit: false, - expects_response: None, - body: serde_json::to_vec(&t::KernelCommand::KillProcess( - metadata.our.process.clone(), - )) - .unwrap(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: None, - }) - .await?; - let _ = send_to_terminal - .send(t::Printout { - verbosity: 1, - content: format!("process {} had no OnExit behavior", metadata.our.process), - }) + t::KernelMessage::builder() + .id(rand::random()) + .source((&our.node, KERNEL_PROCESS_ID.clone())) + .target((&our.node, KERNEL_PROCESS_ID.clone())) + .message(t::Message::Request(t::Request { + inherit: false, + expects_response: None, + body: serde_json::to_vec(&t::KernelCommand::KillProcess( + metadata.our.process.clone(), + )) + .unwrap(), + metadata: None, + capabilities: vec![], + })) + .build() + .unwrap() + .send(&send_to_loop) .await; } // if restart, tell ourselves to init the app again, with same capabilities t::OnExit::Restart => { - send_to_loop - .send(t::KernelMessage { - id: rand::random(), - source: our_kernel.clone(), - target: our_kernel.clone(), - rsvp: None, - message: t::Message::Request(t::Request { - inherit: false, - expects_response: None, - body: serde_json::to_vec(&t::KernelCommand::KillProcess( - metadata.our.process.clone(), - )) - .unwrap(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: None, - }) - .await?; - let _ = send_to_terminal - .send(t::Printout { - verbosity: 1, - content: format!( - "firing OnExit::Restart for process {}", - metadata.our.process - ), - }) + // kill + t::KernelMessage::builder() + .id(rand::random()) + .source((&our.node, KERNEL_PROCESS_ID.clone())) + .target((&our.node, KERNEL_PROCESS_ID.clone())) + .message(t::Message::Request(t::Request { + inherit: false, + expects_response: None, + body: serde_json::to_vec(&t::KernelCommand::KillProcess( + metadata.our.process.clone(), + )) + .unwrap(), + metadata: None, + capabilities: vec![], + })) + .build() + .unwrap() + .send(&send_to_loop) + .await; + // then re-initialize + t::KernelMessage::builder() + .id(rand::random()) + .source((&our.node, KERNEL_PROCESS_ID.clone())) + .target((&our.node, KERNEL_PROCESS_ID.clone())) + .message(t::Message::Request(t::Request { + inherit: false, + expects_response: None, + body: serde_json::to_vec(&t::KernelCommand::InitializeProcess { + id: metadata.our.process.clone(), + wasm_bytes_handle: metadata.wasm_bytes_handle, + wit_version: metadata.wit_version, + on_exit: metadata.on_exit, + initial_capabilities, + public: metadata.public, + }) + .unwrap(), + metadata: None, + capabilities: vec![], + })) + .lazy_load_blob(Some(t::LazyLoadBlob { + mime: None, + bytes: wasm_bytes, + })) + .build() + .unwrap() + .send(&send_to_loop) + .await; + // then run + t::KernelMessage::builder() + .id(rand::random()) + .source((&our.node, KERNEL_PROCESS_ID.clone())) + .target((&our.node, KERNEL_PROCESS_ID.clone())) + .message(t::Message::Request(t::Request { + inherit: false, + expects_response: None, + body: serde_json::to_vec(&t::KernelCommand::RunProcess( + metadata.our.process.clone(), + )) + .unwrap(), + metadata: None, + capabilities: vec![], + })) + .build() + .unwrap() + .send(&send_to_loop) .await; - send_to_loop - .send(t::KernelMessage { - id: rand::random(), - source: our_kernel.clone(), - target: our_kernel.clone(), - rsvp: None, - message: t::Message::Request(t::Request { - inherit: false, - expects_response: None, - body: serde_json::to_vec(&t::KernelCommand::InitializeProcess { - id: metadata.our.process.clone(), - wasm_bytes_handle: metadata.wasm_bytes_handle, - wit_version: metadata.wit_version, - on_exit: metadata.on_exit, - initial_capabilities, - public: metadata.public, - }) - .unwrap(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: Some(t::LazyLoadBlob { - mime: None, - bytes: wasm_bytes, - }), - }) - .await?; - send_to_loop - .send(t::KernelMessage { - id: rand::random(), - source: our_kernel.clone(), - target: our_kernel.clone(), - rsvp: None, - message: t::Message::Request(t::Request { - inherit: false, - expects_response: None, - body: serde_json::to_vec(&t::KernelCommand::RunProcess( - metadata.our.process.clone(), - )) - .unwrap(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: None, - }) - .await?; } // if requests, fire them // even in death, a process can only message processes it has capabilities for t::OnExit::Requests(requests) => { - send_to_terminal - .send(t::Printout { - verbosity: 1, - content: format!( - "firing OnExit::Requests for process {}", - metadata.our.process - ), - }) - .await?; for (address, mut request, blob) in requests { request.expects_response = None; - send_to_loop - .send(t::KernelMessage { - id: rand::random(), - source: metadata.our.clone(), - target: address, - rsvp: None, - message: t::Message::Request(request), - lazy_load_blob: blob, - }) - .await?; + t::KernelMessage::builder() + .id(rand::random()) + .source(metadata.our.clone()) + .target(address) + .message(t::Message::Request(request)) + .lazy_load_blob(blob) + .build() + .unwrap() + .send(&send_to_loop) + .await; } - send_to_loop - .send(t::KernelMessage { - id: rand::random(), - source: our_kernel.clone(), - target: our_kernel.clone(), - rsvp: None, - message: t::Message::Request(t::Request { - inherit: false, - expects_response: None, - body: serde_json::to_vec(&t::KernelCommand::KillProcess( - metadata.our.process.clone(), - )) - .unwrap(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: None, - }) - .await?; + t::KernelMessage::builder() + .id(rand::random()) + .source((&our.node, KERNEL_PROCESS_ID.clone())) + .target((&our.node, KERNEL_PROCESS_ID.clone())) + .message(t::Message::Request(t::Request { + inherit: false, + expects_response: None, + body: serde_json::to_vec(&t::KernelCommand::KillProcess( + metadata.our.process.clone(), + )) + .unwrap(), + metadata: None, + capabilities: vec![], + })) + .build() + .unwrap() + .send(&send_to_loop) + .await; } } Ok(()) } - -pub async fn print(sender: &t::PrintSender, verbosity: u8, content: String) { - let _ = sender - .send(t::Printout { verbosity, content }) - .await - .expect("fatal: kernel terminal print pipe died!"); -} diff --git a/kinode/src/kernel/standard_host.rs b/kinode/src/kernel/standard_host.rs index 84e7b2aa..efdca811 100644 --- a/kinode/src/kernel/standard_host.rs +++ b/kinode/src/kernel/standard_host.rs @@ -1,18 +1,13 @@ use crate::kernel::process; use anyhow::Result; -use lib::core::{KERNEL_PROCESS_ID, VFS_PROCESS_ID}; -use lib::types::core::{self as t, STATE_PROCESS_ID}; -pub use lib::wit; -pub use lib::wit::Host as StandardHost; +use lib::types::core::{self as t, KERNEL_PROCESS_ID, STATE_PROCESS_ID, VFS_PROCESS_ID}; +use lib::wit; +use lib::wit::Host as StandardHost; use ring::signature::{self, KeyPair}; async fn print_debug(proc: &process::ProcessState, content: &str) { - let _ = proc - .send_to_terminal - .send(t::Printout { - verbosity: 2, - content: format!("{}: {}", proc.metadata.our.process, content), - }) + t::Printout::new(2, format!("{}: {}", proc.metadata.our.process, content)) + .send(&proc.send_to_terminal) .await; } @@ -295,34 +290,34 @@ impl process::ProcessState { // 1. whether this request expects a response -- if so, rsvp = our address, always // 2. whether this request inherits -- if so, rsvp = prompting message's rsvp // 3. if neither, rsvp = None - let kernel_message = t::KernelMessage { - id: request_id, - source, - target: t::Address::de_wit(target), - rsvp: match ( - request.expects_response, - request.inherit, - &self.prompting_message, - ) { - (Some(_), _, _) => { - // this request expects response, so receives any response - // make sure to use the real source, not a fake injected-by-kernel source - Some(self.metadata.our.clone()) - } - (None, true, Some(ref prompt)) => { - // this request inherits, so response will be routed to prompting message - prompt.rsvp.clone() - } - _ => None, - }, - message: t::Message::Request(request), - lazy_load_blob: blob, - }; - - self.send_to_loop - .send(kernel_message) - .await - .expect("fatal: kernel couldn't send request"); + t::KernelMessage::builder() + .id(request_id) + .source(source) + .target(t::Address::de_wit(target)) + .rsvp( + match ( + request.expects_response, + request.inherit, + &self.prompting_message, + ) { + (Some(_), _, _) => { + // this request expects response, so receives any response + // make sure to use the real source, not a fake injected-by-kernel source + Some(self.metadata.our.clone()) + } + (None, true, Some(ref prompt)) => { + // this request inherits, so response will be routed to prompting message + prompt.rsvp.clone() + } + _ => None, + }, + ) + .message(t::Message::Request(request)) + .lazy_load_blob(blob) + .build() + .unwrap() + .send(&self.send_to_loop) + .await; Ok(request_id) } @@ -333,11 +328,11 @@ impl process::ProcessState { // the process requires a prompting_message in order to issue a response let Some(ref prompting_message) = self.prompting_message else { - process::print( - &self.send_to_terminal, + t::Printout::new( 0, format!("kernel: need non-None prompting_message to handle Response {response:?}"), ) + .send(&self.send_to_terminal) .await; return; }; @@ -377,21 +372,20 @@ impl process::ProcessState { }; } - self.send_to_loop - .send(t::KernelMessage { - id, - source: self.metadata.our.clone(), - target, - rsvp: None, - message: t::Message::Response(( - response, - // the context will be set by the process receiving this Response. - None, - )), - lazy_load_blob: blob, - }) - .await - .expect("fatal: kernel couldn't send response"); + t::KernelMessage::builder() + .id(id) + .source(self.metadata.our.clone()) + .target(target) + .message(t::Message::Response(( + response, + // the context will be set by the process receiving this Response. + None, + ))) + .lazy_load_blob(blob) + .build() + .unwrap() + .send(&self.send_to_loop) + .await; } } @@ -679,11 +673,8 @@ impl StandardHost for process::ProcessWasi { wit_version: self.process.metadata.wit_version, on_exit: t::OnExit::de_wit(on_exit), initial_capabilities: request_capabilities - .iter() - .map(|cap| t::Capability { - issuer: t::Address::de_wit(cap.clone().issuer), - params: cap.clone().params, - }) + .into_iter() + .map(|cap| t::de_wit_capability(cap).0) .collect(), public, }) @@ -716,7 +707,7 @@ impl StandardHost for process::ProcessWasi { }, params: "\"messaging\"".into(), }], - responder: tx, + responder: Some(tx), }) .await .unwrap(); @@ -767,7 +758,7 @@ impl StandardHost for process::ProcessWasi { issuer: self.process.metadata.our.clone(), params: "\"messaging\"".into(), }], - responder: tx, + responder: Some(tx), }) .await .unwrap(); @@ -786,7 +777,7 @@ impl StandardHost for process::ProcessWasi { }, params: "\"messaging\"".into(), }], - responder: tx, + responder: Some(tx), }) .await .unwrap(); @@ -810,7 +801,7 @@ impl StandardHost for process::ProcessWasi { .iter() .map(|cap| t::de_wit_capability(cap.clone()).0) .collect(), - responder: tx, + responder: Some(tx), }) .await?; let _ = rx.await?; @@ -828,7 +819,7 @@ impl StandardHost for process::ProcessWasi { .iter() .map(|cap| t::de_wit_capability(cap.clone()).0) .collect(), - responder: tx, + responder: Some(tx), }) .await?; let _ = rx.await?; @@ -848,10 +839,7 @@ impl StandardHost for process::ProcessWasi { let caps = rx.await?; Ok(caps .into_iter() - .map(|cap| wit::Capability { - issuer: t::Address::en_wit(&cap.0.issuer), - params: cap.0.params, - }) + .map(|cap| t::en_wit_capability(cap)) .collect()) } diff --git a/kinode/src/kernel/standard_host_v0.rs b/kinode/src/kernel/standard_host_v0.rs index 8b75914c..69f52dfb 100644 --- a/kinode/src/kernel/standard_host_v0.rs +++ b/kinode/src/kernel/standard_host_v0.rs @@ -1,9 +1,8 @@ use crate::kernel::process; use anyhow::Result; -use lib::core::{KERNEL_PROCESS_ID, VFS_PROCESS_ID}; -use lib::types::core::{self as t, STATE_PROCESS_ID}; -pub use lib::v0::wit; -pub use lib::v0::wit::Host as StandardHost; +use lib::types::core::{self as t, KERNEL_PROCESS_ID, STATE_PROCESS_ID, VFS_PROCESS_ID}; +use lib::v0::wit; +use lib::v0::wit::Host as StandardHost; use ring::signature::{self, KeyPair}; async fn print_debug(proc: &process::ProcessState, content: &str) { @@ -335,11 +334,11 @@ impl process::ProcessState { // the process requires a prompting_message in order to issue a response let Some(ref prompting_message) = self.prompting_message else { - process::print( - &self.send_to_terminal, + t::Printout::new( 0, format!("kernel: need non-None prompting_message to handle Response {response:?}"), ) + .send(&self.send_to_terminal) .await; return; }; @@ -685,11 +684,8 @@ impl StandardHost for process::ProcessWasiV0 { wit_version: self.process.metadata.wit_version, on_exit: t::OnExit::de_wit_v0(on_exit), initial_capabilities: request_capabilities - .iter() - .map(|cap| t::Capability { - issuer: t::Address::de_wit_v0(cap.clone().issuer), - params: cap.clone().params, - }) + .into_iter() + .map(|cap| t::de_wit_capability_v0(cap).0) .collect(), public, }) @@ -722,7 +718,7 @@ impl StandardHost for process::ProcessWasiV0 { }, params: "\"messaging\"".into(), }], - responder: tx, + responder: Some(tx), }) .await .unwrap(); @@ -773,7 +769,7 @@ impl StandardHost for process::ProcessWasiV0 { issuer: self.process.metadata.our.clone(), params: "\"messaging\"".into(), }], - responder: tx, + responder: Some(tx), }) .await .unwrap(); @@ -792,7 +788,7 @@ impl StandardHost for process::ProcessWasiV0 { }, params: "\"messaging\"".into(), }], - responder: tx, + responder: Some(tx), }) .await .unwrap(); @@ -816,7 +812,7 @@ impl StandardHost for process::ProcessWasiV0 { .iter() .map(|cap| t::de_wit_capability_v0(cap.clone()).0) .collect(), - responder: tx, + responder: Some(tx), }) .await?; let _ = rx.await?; @@ -834,7 +830,7 @@ impl StandardHost for process::ProcessWasiV0 { .iter() .map(|cap| t::de_wit_capability_v0(cap.clone()).0) .collect(), - responder: tx, + responder: Some(tx), }) .await?; let _ = rx.await?; @@ -854,10 +850,7 @@ impl StandardHost for process::ProcessWasiV0 { let caps = rx.await?; Ok(caps .into_iter() - .map(|cap| wit::Capability { - issuer: t::Address::en_wit_v0(&cap.0.issuer), - params: cap.0.params, - }) + .map(|cap| t::en_wit_capability_v0(cap)) .collect()) } diff --git a/kinode/src/kv.rs b/kinode/src/kv.rs index b8b74b7d..7c02ffd2 100644 --- a/kinode/src/kv.rs +++ b/kinode/src/kv.rs @@ -350,11 +350,11 @@ async fn check_caps( node: our_node.to_string(), process: KV_PROCESS_ID.clone(), }, - params: serde_json::to_string(&serde_json::json!({ + params: serde_json::json!({ "kind": "write", "db": request.db.to_string(), - })) - .unwrap(), + }) + .to_string(), }, responder: send_cap_bool, }) @@ -376,11 +376,11 @@ async fn check_caps( node: our_node.to_string(), process: KV_PROCESS_ID.clone(), }, - params: serde_json::to_string(&serde_json::json!({ + params: serde_json::json!({ "kind": "read", "db": request.db.to_string(), - })) - .unwrap(), + }) + .to_string(), }, responder: send_cap_bool, }) @@ -458,14 +458,14 @@ async fn add_capability( node: our_node.to_string(), process: KV_PROCESS_ID.clone(), }, - params: serde_json::to_string(&serde_json::json!({ "kind": kind, "db": db })).unwrap(), + params: serde_json::json!({ "kind": kind, "db": db }).to_string(), }; let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); send_to_caps_oracle .send(CapMessage::Add { on: source.process.clone(), caps: vec![cap], - responder: send_cap_bool, + responder: Some(send_cap_bool), }) .await?; let _ = recv_cap_bool.await?; diff --git a/kinode/src/sqlite.rs b/kinode/src/sqlite.rs index ee03b5cd..dec0145c 100644 --- a/kinode/src/sqlite.rs +++ b/kinode/src/sqlite.rs @@ -355,17 +355,14 @@ async fn check_caps( send_to_caps_oracle .send(CapMessage::Has { on: source.process.clone(), - cap: Capability { - issuer: Address { - node: our_node.to_string(), - process: SQLITE_PROCESS_ID.clone(), - }, - params: serde_json::to_string(&serde_json::json!({ + cap: Capability::new( + (our_node, SQLITE_PROCESS_ID.clone()), + serde_json::json!({ "kind": "write", "db": request.db.to_string(), - })) - .unwrap(), - }, + }) + .to_string(), + ), responder: send_cap_bool, }) .await?; @@ -381,17 +378,14 @@ async fn check_caps( send_to_caps_oracle .send(CapMessage::Has { on: source.process.clone(), - cap: Capability { - issuer: Address { - node: our_node.to_string(), - process: SQLITE_PROCESS_ID.clone(), - }, - params: serde_json::to_string(&serde_json::json!({ + cap: Capability::new( + (our_node, SQLITE_PROCESS_ID.clone()), + serde_json::json!({ "kind": "read", "db": request.db.to_string(), - })) - .unwrap(), - }, + }) + .to_string(), + ), responder: send_cap_bool, }) .await?; @@ -477,14 +471,14 @@ async fn add_capability( node: our_node.to_string(), process: SQLITE_PROCESS_ID.clone(), }, - params: serde_json::to_string(&serde_json::json!({ "kind": kind, "db": db })).unwrap(), + params: serde_json::json!({ "kind": kind, "db": db }).to_string(), }; let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); send_to_caps_oracle .send(CapMessage::Add { on: source.process.clone(), caps: vec![cap], - responder: send_cap_bool, + responder: Some(send_cap_bool), }) .await?; let _ = recv_cap_bool.await?; diff --git a/kinode/src/state.rs b/kinode/src/state.rs index 5b830778..8643852f 100644 --- a/kinode/src/state.rs +++ b/kinode/src/state.rs @@ -36,7 +36,8 @@ pub async fn load_state( let kernel_id_vec = process_to_vec(KERNEL_PROCESS_ID.clone()); match db.get(&kernel_id_vec) { Ok(Some(value)) => { - process_map = bincode::deserialize::(&value).unwrap(); + process_map = bincode::deserialize::(&value) + .expect("failed to deserialize kernel process map"); // if our networking key changed, we need to re-sign all local caps process_map.iter_mut().for_each(|(_id, process)| { process.capabilities.iter_mut().for_each(|(cap, sig)| { @@ -116,7 +117,6 @@ pub async fn state_sender( let our_node = our_node.clone(); let db_clone = db.clone(); let send_to_loop = send_to_loop.clone(); - let send_to_terminal = send_to_terminal.clone(); let home_directory_path = home_directory_path.clone(); tokio::spawn(async move { @@ -129,9 +129,6 @@ pub async fn state_sender( handle_request(&our_node, km, db_clone, &send_to_loop, &home_directory_path) .await { - Printout::new(1, format!("state: {e}")) - .send(&send_to_terminal) - .await; KernelMessage::builder() .id(km_id) .source((our_node.as_str(), STATE_PROCESS_ID.clone())) @@ -559,11 +556,11 @@ async fn bootstrap( node: our_name.into(), process: VFS_PROCESS_ID.clone(), }, - params: serde_json::to_string(&serde_json::json!({ + params: serde_json::json!({ "kind": "read", "drive": drive_path, - })) - .unwrap(), + }) + .to_string(), }; requested_caps.insert(read_cap.clone(), sign_cap(read_cap, keypair.clone())); let write_cap = Capability { @@ -571,11 +568,11 @@ async fn bootstrap( node: our_name.into(), process: VFS_PROCESS_ID.clone(), }, - params: serde_json::to_string(&serde_json::json!({ + params: serde_json::json!({ "kind": "write", "drive": drive_path, - })) - .unwrap(), + }) + .to_string(), }; requested_caps.insert(write_cap.clone(), sign_cap(write_cap, keypair.clone())); diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index 70212e07..d1c4ed9c 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -391,16 +391,29 @@ async fn handle_request( } VfsAction::Rename { new_path } => { let new_path = join_paths_safely(vfs_path, &new_path); - fs::rename(&path, new_path).await?; + fs::rename(&path, new_path) + .await + .map_err(|e| VfsError::IOError { + error: e.to_string(), + path: request.path, + })?; (VfsResponse::Ok, None) } VfsAction::CopyFile { new_path } => { let new_path = join_paths_safely(vfs_path, &new_path); - fs::copy(&path, new_path).await?; + fs::copy(&path, new_path) + .await + .map_err(|e| VfsError::IOError { + error: e.to_string(), + path: request.path, + })?; (VfsResponse::Ok, None) } VfsAction::Metadata => { - let metadata = fs::metadata(&path).await?; + let metadata = fs::metadata(&path).await.map_err(|e| VfsError::IOError { + error: e.to_string(), + path: request.path, + })?; let file_type = get_file_type(&metadata); let meta = FileMetadata { len: metadata.len(), @@ -411,13 +424,23 @@ async fn handle_request( VfsAction::Len => { let file = open_file(open_files, &path, false, false).await?; let file = file.lock().await; - let len = file.metadata().await?.len(); + let len = file + .metadata() + .await + .map_err(|e| VfsError::IOError { + error: e.to_string(), + path: request.path, + })? + .len(); (VfsResponse::Len(len), None) } VfsAction::SetLen(len) => { - let file = open_file(open_files, path, false, false).await?; + let file = open_file(open_files, &path, false, false).await?; let file = file.lock().await; - file.set_len(len).await?; + file.set_len(len).await.map_err(|e| VfsError::IOError { + error: e.to_string(), + path: request.path, + })?; (VfsResponse::Ok, None) } VfsAction::Hash => { @@ -464,7 +487,7 @@ async fn handle_request( let (is_file, is_dir, local_path, file_contents) = { let mut file = zip.by_index(i).map_err(|e| VfsError::IOError { error: e.to_string(), - path: "".into(), + path: request.path.clone(), })?; let is_file = file.is_file(); let is_dir = file.is_dir(); @@ -754,20 +777,18 @@ async fn read_capability( send_to_caps_oracle: &CapMessageSender, ) -> bool { let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); + let cap = Capability::new( + (our_node, VFS_PROCESS_ID.clone()), + if root { + "{\"root\":true}".to_string() + } else { + format!("{{\"kind\": \"{kind}\", \"drive\": \"{drive}\"}}") + }, + ); if let Err(_) = send_to_caps_oracle .send(CapMessage::Has { on: source.process.clone(), - cap: Capability { - issuer: Address { - node: our_node.to_string(), - process: VFS_PROCESS_ID.clone(), - }, - params: if root { - "{{\"root\": true}}".to_string() - } else { - format!("{{\"kind\": \"{kind}\", \"drive\": \"{drive}\"}}") - }, - }, + cap, responder: send_cap_bool, }) .await @@ -784,19 +805,16 @@ async fn add_capability( source: &Address, send_to_caps_oracle: &CapMessageSender, ) -> Result<(), VfsError> { - let cap = Capability { - issuer: Address { - node: our_node.to_string(), - process: VFS_PROCESS_ID.clone(), - }, - params: format!("{{\"kind\": \"{kind}\", \"drive\": \"{drive}\"}}"), - }; + let cap = Capability::new( + (our_node, VFS_PROCESS_ID.clone()), + format!("{{\"kind\": \"{kind}\", \"drive\": \"{drive}\"}}"), + ); let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); send_to_caps_oracle .send(CapMessage::Add { on: source.process.clone(), caps: vec![cap], - responder: send_cap_bool, + responder: Some(send_cap_bool), }) .await?; match recv_cap_bool.await? { diff --git a/lib/src/core.rs b/lib/src/core.rs index b97a7c61..05e794b6 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -279,12 +279,13 @@ pub struct Address { } impl Address { - pub fn new(node: &str, process: T) -> Address + pub fn new(node: T, process: U) -> Address where - T: Into, + T: Into, + U: Into, { Address { - node: node.to_string(), + node: node.into(), process: process.into(), } } @@ -399,11 +400,12 @@ impl From<(&str, &str, &str, &str)> for Address { } } -impl From<(&str, T)> for Address +impl From<(T, U)> for Address where - T: Into, + T: Into, + U: Into, { - fn from(input: (&str, T)) -> Self { + fn from(input: (T, U)) -> Self { Address::new(input.0, input.1) } } @@ -468,21 +470,50 @@ pub enum Message { Response((Response, Option)), } -#[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, Hash, Serialize, Deserialize)] pub struct Capability { pub issuer: Address, - pub params: String, // JSON-string + pub params: String, +} + +impl Eq for Capability {} + +impl PartialEq for Capability { + fn eq(&self, other: &Self) -> bool { + let self_json_params: serde_json::Value = + serde_json::from_str(&self.params).unwrap_or_default(); + let other_json_params: serde_json::Value = + serde_json::from_str(&other.params).unwrap_or_default(); + self.issuer == other.issuer && self_json_params == other_json_params + } +} + +impl Capability { + pub fn new(issuer: T, params: U) -> Self + where + T: Into
, + U: Into, + { + Capability { + issuer: issuer.into(), + params: params.into(), + } + } + + pub fn messaging(issuer: T) -> Self + where + T: Into
, + { + Capability { + issuer: issuer.into(), + params: "\"messaging\"".into(), + } + } } impl std::fmt::Display for Capability { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}({})", - self.issuer, - serde_json::from_str::(&self.params) - .unwrap_or(serde_json::json!("invalid JSON in capability")) - ) + write!(f, "{}({})", self.issuer, self.params) } } @@ -597,6 +628,20 @@ impl OnExit { } } +impl std::fmt::Display for OnExit { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "{}", + match self { + OnExit::None => "None", + OnExit::Restart => "Restart", + OnExit::Requests(_) => "Requests", + } + ) + } +} + impl std::fmt::Display for Message { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{}", display_message(self, "\n ")) @@ -839,7 +884,7 @@ pub fn de_wit_capability(wit: wit::Capability) -> (Capability, Vec) { publisher_node: wit.issuer.process.publisher_node, }, }, - params: wit.params, + params: serde_json::from_str(&wit.params).unwrap_or_default(), }, vec![], ) @@ -856,7 +901,7 @@ pub fn de_wit_capability_v0(wit: crate::v0::wit::Capability) -> (Capability, Vec publisher_node: wit.issuer.process.publisher_node, }, }, - params: wit.params, + params: serde_json::from_str(&wit.params).unwrap_or_default(), }, vec![], ) @@ -865,14 +910,14 @@ pub fn de_wit_capability_v0(wit: crate::v0::wit::Capability) -> (Capability, Vec pub fn en_wit_capability(cap: (Capability, Vec)) -> wit::Capability { wit::Capability { issuer: cap.0.issuer.en_wit(), - params: cap.0.params, + params: cap.0.params.to_string(), } } pub fn en_wit_capability_v0(cap: (Capability, Vec)) -> crate::v0::wit::Capability { crate::v0::wit::Capability { issuer: cap.0.issuer.en_wit_v0(), - params: cap.0.params, + params: cap.0.params.to_string(), } } @@ -1347,13 +1392,13 @@ pub enum CapMessage { Add { on: ProcessId, caps: Vec, - responder: tokio::sync::oneshot::Sender, + responder: Option>, }, /// root delete: uncritically remove all `caps` from `on` Drop { on: ProcessId, caps: Vec, - responder: tokio::sync::oneshot::Sender, + responder: Option>, }, /// does `on` have `cap` in its store? Has { @@ -1370,7 +1415,7 @@ pub enum CapMessage { /// Remove all caps issued by `on` from every process on the entire system RevokeAll { on: ProcessId, - responder: tokio::sync::oneshot::Sender, + responder: Option>, }, /// before `on` sends a message, filter out any bogus caps it may have attached, sign any new /// caps it may have created, and retreive the signature for the caps in its store. @@ -1492,19 +1537,19 @@ pub enum StateResponse { #[derive(Error, Debug, Serialize, Deserialize)] pub enum StateError { - #[error("kernel_state: rocksdb internal error: {error}")] + #[error("rocksdb internal error: {error}")] RocksDBError { action: String, error: String }, - #[error("kernel_state: startup error")] + #[error("startup error")] StartupError { action: String }, - #[error("kernel_state: bytes blob required for {action}")] + #[error("bytes blob required for {action}")] BadBytes { action: String }, - #[error("kernel_state: bad request error: {error}")] + #[error("bad request error: {error}")] BadRequest { error: String }, - #[error("kernel_state: Bad JSON blob: {error}")] + #[error("Bad JSON blob: {error}")] BadJson { error: String }, - #[error("kernel_state: state not found for ProcessId {process_id}")] + #[error("state not found for ProcessId {process_id}")] NotFound { process_id: ProcessId }, - #[error("kernel_state: IO error: {error}")] + #[error("IO error: {error}")] IOError { error: String }, } @@ -1601,23 +1646,23 @@ pub enum VfsResponse { #[derive(Error, Debug, Serialize, Deserialize)] pub enum VfsError { - #[error("vfs: No capability for action {action} at path {path}")] + #[error("No capability for action {action} at path {path}")] NoCap { action: String, path: String }, - #[error("vfs: Bytes blob required for {action} at path {path}")] + #[error("Bytes blob required for {action} at path {path}")] BadBytes { action: String, path: String }, - #[error("vfs: bad request error: {error}")] + #[error("bad request error: {error}")] BadRequest { error: String }, - #[error("vfs: error parsing path: {path}: {error}")] + #[error("error parsing path: {path}: {error}")] ParseError { error: String, path: String }, - #[error("vfs: IO error: {error}, at path {path}")] + #[error("IO error: {error}, at path {path}")] IOError { error: String, path: String }, - #[error("vfs: kernel capability channel error: {error}")] + #[error("kernel capability channel error: {error}")] CapChannelFail { error: String }, - #[error("vfs: Bad JSON blob: {error}")] + #[error("Bad JSON blob: {error}")] BadJson { error: String }, - #[error("vfs: File not found at path {path}")] + #[error("File not found at path {path}")] NotFound { path: String }, - #[error("vfs: Creating directory failed at path: {path}: {error}")] + #[error("Creating directory failed at path: {path}: {error}")] CreateDirError { path: String, error: String }, } @@ -1667,19 +1712,19 @@ pub enum KvResponse { #[derive(Debug, Serialize, Deserialize, Error)] pub enum KvError { - #[error("kv: DbDoesNotExist")] + #[error("DbDoesNotExist")] NoDb, - #[error("kv: KeyNotFound")] + #[error("KeyNotFound")] KeyNotFound, - #[error("kv: no Tx found")] + #[error("no Tx found")] NoTx, - #[error("kv: No capability: {error}")] + #[error("No capability: {error}")] NoCap { error: String }, - #[error("kv: rocksdb internal error: {error}")] + #[error("rocksdb internal error: {error}")] RocksDBError { action: String, error: String }, - #[error("kv: input bytes/json/key error: {error}")] + #[error("input bytes/json/key error: {error}")] InputError { error: String }, - #[error("kv: IO error: {error}")] + #[error("IO error: {error}")] IOError { error: String }, }