From 475aea17fbf347ca74f24f278c102a796859c26c Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Mon, 9 Dec 2024 15:53:31 -0500 Subject: [PATCH] add wit v1 support to kernel --- kinode/src/kernel/mod.rs | 2 + kinode/src/kernel/process.rs | 99 ++- kinode/src/kernel/standard_host_v1.rs | 958 ++++++++++++++++++++++++++ lib/build.rs | 13 +- lib/src/core.rs | 225 +++++- lib/src/lib.rs | 14 + 6 files changed, 1289 insertions(+), 22 deletions(-) create mode 100644 kinode/src/kernel/standard_host_v1.rs diff --git a/kinode/src/kernel/mod.rs b/kinode/src/kernel/mod.rs index 4f7a4658..71c389f8 100644 --- a/kinode/src/kernel/mod.rs +++ b/kinode/src/kernel/mod.rs @@ -17,6 +17,8 @@ pub mod process; mod standard_host; /// Implement the functions served to processes by `wit-v0.8.0/kinode.wit`. mod standard_host_v0; +/// Implement the functions served to processes by `wit-v1.0.0/kinode.wit`. +mod standard_host_v1; pub const LATEST_WIT_VERSION: u32 = 0; const PROCESS_CHANNEL_CAPACITY: usize = 100; diff --git a/kinode/src/kernel/process.rs b/kinode/src/kernel/process.rs index 94aa8ea9..db234ea7 100644 --- a/kinode/src/kernel/process.rs +++ b/kinode/src/kernel/process.rs @@ -1,5 +1,5 @@ use crate::KERNEL_PROCESS_ID; -use lib::{types::core as t, v0::ProcessV0, Process}; +use lib::{types::core as t, v0::ProcessV0, v1::ProcessV1, Process}; use std::{ collections::{HashMap, VecDeque}, path::PathBuf, @@ -71,6 +71,7 @@ impl WasiView for ProcessWasi { } } +/// **can be removed in 1.0.0** pub struct ProcessWasiV0 { pub process: ProcessState, table: Table, @@ -86,6 +87,21 @@ impl WasiView for ProcessWasiV0 { } } +pub struct ProcessWasiV1 { + pub process: ProcessState, + table: Table, + wasi: WasiCtx, +} + +impl WasiView for ProcessWasiV1 { + fn table(&mut self) -> &mut Table { + &mut self.table + } + fn ctx(&mut self) -> &mut WasiCtx { + &mut self.wasi + } +} + async fn make_table_and_wasi( home_directory_path: PathBuf, process_state: &ProcessState, @@ -134,6 +150,7 @@ async fn make_table_and_wasi( (table, wasi.stderr(wasi_stderr.clone()).build(), wasi_stderr) } +/// **can be removed in 1.0.0** async fn make_component( engine: Engine, wasm_bytes: &[u8], @@ -177,6 +194,7 @@ async fn make_component( Ok((bindings, store, wasi_stderr)) } +/// **can be removed in 1.0.0** async fn make_component_v0( engine: Engine, wasm_bytes: &[u8], @@ -220,6 +238,49 @@ async fn make_component_v0( Ok((bindings, store, wasi_stderr)) } +async fn make_component_v1( + engine: Engine, + wasm_bytes: &[u8], + home_directory_path: PathBuf, + process_state: ProcessState, +) -> anyhow::Result<(ProcessV1, Store, MemoryOutputPipe)> { + let component = + Component::new(&engine, wasm_bytes.to_vec()).expect("make_component: couldn't read file"); + + let mut linker = Linker::new(&engine); + ProcessV1::add_to_linker(&mut linker, |state: &mut ProcessWasiV1| state).unwrap(); + let (table, wasi, wasi_stderr) = make_table_and_wasi(home_directory_path, &process_state).await; + wasmtime_wasi::command::add_to_linker(&mut linker).unwrap(); + + let our_process_id = process_state.metadata.our.process.clone(); + let send_to_terminal = process_state.send_to_terminal.clone(); + + let mut store = Store::new( + &engine, + ProcessWasiV1 { + process: process_state, + table, + wasi, + }, + ); + + let (bindings, _bindings) = + match ProcessV1::instantiate_async(&mut store, &component, &linker).await { + Ok(b) => b, + Err(e) => { + t::Printout::new( + 0, + format!("kernel: process {our_process_id} failed to instantiate: {e:?}"), + ) + .send(&send_to_terminal) + .await; + return Err(e); + } + }; + + Ok((bindings, store, wasi_stderr)) +} + /// create a specific process, and generate a task that will run it. pub async fn make_process_loop( keypair: Arc, @@ -287,6 +348,7 @@ pub async fn make_process_loop( let metadata = match wit_version { // assume missing version is oldest wit version + // **can be removed in 1.0.0** None => { let (bindings, mut store, wasi_stderr) = make_component(engine, &wasm_bytes, home_directory_path, process_state).await?; @@ -319,8 +381,8 @@ pub async fn make_process_loop( store.data().process.metadata.to_owned() } // match version numbers - // assume higher uncovered version number is latest version - Some(0) | _ => { + // **can be removed in 1.0.0** + Some(0) => { let (bindings, mut store, wasi_stderr) = make_component_v0(engine, &wasm_bytes, home_directory_path, process_state).await?; @@ -348,6 +410,37 @@ pub async fn make_process_loop( } }; + // update metadata to what was mutated by process in store + store.data().process.metadata.to_owned() + } + Some(1) | _ => { + let (bindings, mut store, wasi_stderr) = + make_component_v1(engine, &wasm_bytes, home_directory_path, process_state).await?; + + // the process will run until it returns from init() or crashes + match bindings.call_init(&mut store, &our.to_string()).await { + Ok(()) => { + t::Printout::new(1, format!("process {our} returned without error")) + .send(&send_to_terminal) + .await; + } + Err(e) => { + let stderr = wasi_stderr.contents().into(); + let stderr = String::from_utf8(stderr)?; + let output = if stderr != String::new() { + stderr + } else { + format!("{}", e.root_cause()) + }; + t::Printout::new( + 0, + format!("\x1b[38;5;196mprocess {our} ended with error:\x1b[0m\n{output}"), + ) + .send(&send_to_terminal) + .await; + } + }; + // update metadata to what was mutated by process in store store.data().process.metadata.to_owned() } diff --git a/kinode/src/kernel/standard_host_v1.rs b/kinode/src/kernel/standard_host_v1.rs new file mode 100644 index 00000000..3b9f05ac --- /dev/null +++ b/kinode/src/kernel/standard_host_v1.rs @@ -0,0 +1,958 @@ +use crate::kernel::process; +use anyhow::Result; +use lib::types::core::{self as t, KERNEL_PROCESS_ID, STATE_PROCESS_ID, VFS_PROCESS_ID}; +use lib::v1::wit; +use lib::v1::wit::Host as StandardHost; +use ring::signature::{self, KeyPair}; + +async fn print_debug(proc: &process::ProcessState, content: &str) { + let _ = proc + .send_to_terminal + .send(t::Printout { + verbosity: 2, + content: format!("{}: {}", proc.metadata.our.process, content), + }) + .await; +} + +impl process::ProcessState { + /// Ingest latest message directed to this process, and save it as the current message. + /// If there is no message in the queue, wait async until one is received. + async fn get_next_message_for_process_v1( + &mut self, + ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { + let res = match self.message_queue.pop_front() { + Some(message_from_queue) => message_from_queue, + None => self.ingest_message_v1().await, + }; + self.kernel_message_to_process_receive_v1(res) + } + + /// instead of ingesting latest, wait for a specific ID and queue all others + async fn get_specific_message_for_process_v1( + &mut self, + awaited_message_id: u64, + ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { + // first, check if the awaited message is already in the queue and handle if so + for (i, message) in self.message_queue.iter().enumerate() { + match message { + Ok(ref km) if km.id == awaited_message_id => { + let km = self.message_queue.remove(i).unwrap(); + return self.kernel_message_to_process_receive_v1(km); + } + _ => continue, + } + } + // next, wait for the awaited message to arrive + loop { + let res = self.ingest_message_v1().await; + let id = match &res { + Ok(km) => km.id, + Err(e) => e.id, + }; + if id == awaited_message_id { + return self.kernel_message_to_process_receive_v1(res); + } else { + self.message_queue.push_back(res); + } + } + } + + /// ingest next valid message from kernel. + /// cancel any timeout task associated with this message. + /// if the message is a response, only enqueue if we have an outstanding request for it. + async fn ingest_message_v1(&mut self) -> Result { + loop { + let message = self + .recv_in_process + .recv() + .await + .expect("fatal: process couldn't receive next message"); + + match &message { + Ok(km) => match &km.message { + t::Message::Response(_) => { + if let Some((_context, timeout_handle)) = self.contexts.get_mut(&km.id) { + timeout_handle.abort(); + return message; + } + } + _ => { + return message; + } + }, + Err(e) => { + if let Some((_context, timeout_handle)) = self.contexts.get_mut(&e.id) { + timeout_handle.abort(); + return message; + } + } + } + } + } + + /// Convert a message from the main event loop into a result for the process to receive. + /// If the message is a response or error, get context if we have one. + fn kernel_message_to_process_receive_v1( + &mut self, + incoming: Result, + ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { + let (mut km, context) = match incoming { + Ok(mut km) => match km.message { + t::Message::Request(t::Request { + ref expects_response, + .. + }) => { + if km.lazy_load_blob.is_some() { + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; + } + if expects_response.is_some() || km.rsvp.is_some() { + // update prompting_message iff there is someone to reply to + self.prompting_message = Some(km.clone()); + } + (km, None) + } + t::Message::Response(_) => match self.contexts.remove(&km.id) { + Some((context, _timeout_handle)) => { + if km.lazy_load_blob.is_some() { + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; + } + self.prompting_message = context.prompting_message; + (km, context.context) + } + None => { + if km.lazy_load_blob.is_some() { + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; + } + self.prompting_message = Some(km.clone()); + (km, None) + } + }, + }, + Err(e) => match self.contexts.remove(&e.id) { + None => return Err((t::en_wit_send_error_v1(e.error), None)), + Some((context, _timeout_handle)) => { + self.prompting_message = context.prompting_message; + return Err((t::en_wit_send_error_v1(e.error), context.context)); + } + }, + }; + + let pk = signature::UnparsedPublicKey::new( + &signature::ED25519, + self.keypair.as_ref().public_key(), + ); + + // prune any invalid capabilities before handing to process + // where invalid = supposedly issued by us, but not signed properly by us + match &mut km.message { + t::Message::Request(request) => { + request.capabilities.retain(|(cap, sig)| { + // The only time we verify a cap's signature is when a foreign node + // sends us a cap that we (allegedly) issued + if km.source.node != self.metadata.our.node + && cap.issuer.node == self.metadata.our.node + { + match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { + Ok(_) => true, + Err(_) => false, + } + } else { + return true; + } + }); + } + t::Message::Response((response, _)) => { + response.capabilities.retain(|(cap, sig)| { + // The only time we verify a cap's signature is when a foreign node + // sends us a cap that we (allegedly) issued + if km.source.node != self.metadata.our.node + && cap.issuer.node == self.metadata.our.node + { + match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { + Ok(_) => true, + Err(_) => false, + } + } else { + return true; + } + }); + } + }; + + Ok(( + km.source.en_wit_v1(), + match km.message { + t::Message::Request(request) => { + wit::Message::Request(t::en_wit_request_v1(request)) + } + // NOTE: we throw away whatever context came from the sender, that's not ours + t::Message::Response((response, _sent_context)) => { + wit::Message::Response((t::en_wit_response_v1(response), context)) + } + }, + )) + } + + /// takes Request generated by a process and sends it to the main event loop. + /// will only fail if process does not have capability to send to target. + /// if the request has a timeout (expects response), start a task to track + /// that timeout and return timeout error if it expires. + async fn send_request_v1( + &mut self, + // only used when kernel steps in to get/set state + fake_source: Option, + target: wit::Address, + request: wit::Request, + new_context: Option, + blob: Option, + ) -> Result { + let source = fake_source.unwrap_or(self.metadata.our.clone()); + let mut request = t::de_wit_request_v1(request); + + // if request chooses to inherit, it means to take the ID and lazy_load_blob, + // if any, from the last message it ingested + + // if request chooses to inherit, match id to precedessor + // otherwise, id is generated randomly + let request_id: u64 = if request.inherit && self.prompting_message.is_some() { + self.prompting_message.as_ref().unwrap().id + } else { + loop { + let id = rand::random(); + if !self.contexts.contains_key(&id) { + break id; + } + } + }; + + // if a blob is provided, it will be used; otherwise, if inherit is true, + // and a predecessor exists, its blob will be used; otherwise, no blob will be used. + let blob = match blob { + Some(p) => Some(t::LazyLoadBlob { + mime: p.mime, + bytes: p.bytes, + }), + None => match request.inherit { + true => self.last_blob.clone(), + false => None, + }, + }; + + if !request.capabilities.is_empty() { + request.capabilities = { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.caps_oracle + .send(t::CapMessage::FilterCaps { + on: self.metadata.our.process.clone(), + caps: request + .capabilities + .into_iter() + .map(|(cap, _)| cap) + .collect(), + responder: tx, + }) + .await + .expect("fatal: process couldn't access capabilities oracle"); + rx.await + .expect("fatal: process couldn't receive capabilities") + }; + } + + // if the request expects a response, modify the process' context map as needed + // and set a timer. + // TODO optimize this SIGNIFICANTLY: stop spawning tasks + // and use a global clock + garbage collect step to check for timeouts + if let Some(timeout_secs) = request.expects_response { + let this_request = request.clone(); + let this_blob = blob.clone(); + let self_sender = self.self_sender.clone(); + let original_target = t::Address::de_wit_v1(target.clone()); + let timeout_handle = tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await; + let _ = self_sender + .send(Err(t::WrappedSendError { + id: request_id, + source: original_target.clone(), + error: t::SendError { + kind: t::SendErrorKind::Timeout, + target: original_target, + message: t::Message::Request(this_request), + lazy_load_blob: this_blob, + }, + })) + .await; + }); + self.contexts.insert( + request_id, + ( + process::ProcessContext { + prompting_message: self.prompting_message.clone(), + context: new_context, + }, + timeout_handle, + ), + ); + } + + // rsvp is set based on this priority: + // 1. whether this request expects a response -- if so, rsvp = our address, always + // 2. whether this request inherits -- if so, rsvp = prompting message's rsvp + // 3. if neither, rsvp = None + let kernel_message = t::KernelMessage { + id: request_id, + source, + target: t::Address::de_wit_v1(target), + rsvp: match ( + request.expects_response, + request.inherit, + &self.prompting_message, + ) { + (Some(_), _, _) => { + // this request expects response, so receives any response + // make sure to use the real source, not a fake injected-by-kernel source + Some(self.metadata.our.clone()) + } + (None, true, Some(ref prompt)) => { + // this request inherits, so response will be routed to prompting message + prompt.rsvp.clone() + } + _ => None, + }, + message: t::Message::Request(request), + lazy_load_blob: blob, + }; + + self.send_to_loop + .send(kernel_message) + .await + .expect("fatal: kernel couldn't send request"); + + Ok(request_id) + } + + /// takes Response generated by a process and sends it to the main event loop. + async fn send_response_v1(&mut self, response: wit::Response, blob: Option) { + let mut response = t::de_wit_response_v1(response); + + // the process requires a prompting_message in order to issue a response + let Some(ref prompting_message) = self.prompting_message else { + t::Printout::new( + 0, + format!("kernel: need non-None prompting_message to handle Response {response:?}"), + ) + .send(&self.send_to_terminal) + .await; + return; + }; + + // given the current process state, produce the id and target that + // a response it emits should have. + let (id, target) = ( + prompting_message.id, + match &prompting_message.rsvp { + None => prompting_message.source.clone(), + Some(rsvp) => rsvp.clone(), + }, + ); + + let blob = match response.inherit { + true => self.last_blob.clone(), + false => t::de_wit_blob_v1(blob), + }; + + if !response.capabilities.is_empty() { + response.capabilities = { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.caps_oracle + .send(t::CapMessage::FilterCaps { + on: self.metadata.our.process.clone(), + caps: response + .capabilities + .into_iter() + .map(|(cap, _)| cap) + .collect(), + responder: tx, + }) + .await + .expect("fatal: process couldn't access capabilities oracle"); + rx.await + .expect("fatal: process couldn't receive capabilities") + }; + } + + self.send_to_loop + .send(t::KernelMessage { + id, + source: self.metadata.our.clone(), + target, + rsvp: None, + message: t::Message::Response(( + response, + // the context will be set by the process receiving this Response. + None, + )), + lazy_load_blob: blob, + }) + .await + .expect("fatal: kernel couldn't send response"); + } +} + +async fn send_and_await_response( + process: &mut process::ProcessWasiV1, + source: Option, + target: wit::Address, + request: wit::Request, + blob: Option, +) -> Result> { + if request.expects_response.is_none() { + return Err(anyhow::anyhow!( + "kernel: got invalid send_and_await_response() Request from {:?}: must expect response", + process.process.metadata.our.process + )); + } + if t::Address::de_wit_v1(target.clone()) == process.process.metadata.our { + return Err(anyhow::anyhow!( + "kernel: got invalid send_and_await_response() Request from and to {}: cannot await a Request to `our`: will deadlock", + process.process.metadata.our, + )); + } + let id = process + .process + .send_request_v1(source, target, request, None, blob) + .await; + match id { + Ok(id) => match process + .process + .get_specific_message_for_process_v1(id) + .await + { + Ok((address, wit::Message::Response(response))) => { + Ok(Ok((address, wit::Message::Response(response)))) + } + Ok((_address, wit::Message::Request(_))) => Err(anyhow::anyhow!( + "fatal: received Request instead of Response" + )), + Err((net_err, _context)) => Ok(Err(net_err)), + }, + Err(e) => Err(e), + } +} + +/// +/// create the process API. this is where the functions that a process can use live. +/// +#[async_trait::async_trait] +impl StandardHost for process::ProcessWasiV1 { + // + // system utils: + // + + /// Print a message to the runtime terminal. Add the name of the process to the + /// beginning of the string, so user can verify source. + async fn print_to_terminal(&mut self, verbosity: u8, content: String) -> Result<()> { + self.process + .send_to_terminal + .send(t::Printout { + verbosity, + content: format!( + "{}:{}: {}", + self.process.metadata.our.process.package(), + self.process.metadata.our.process.publisher(), + content + ), + }) + .await + .map_err(|e| anyhow::anyhow!("fatal: couldn't send to terminal: {e:?}")) + } + + async fn our(&mut self) -> Result { + Ok(self.process.metadata.our.en_wit_v1()) + } + + // + // process management: + // + + /// TODO critical: move to kernel logic to enable persistence of choice made here + async fn set_on_exit(&mut self, on_exit: wit::OnExit) -> Result<()> { + self.process.metadata.on_exit = t::OnExit::de_wit_v1(on_exit); + print_debug(&self.process, "set new on-exit behavior").await; + Ok(()) + } + + async fn get_on_exit(&mut self) -> Result { + Ok(self.process.metadata.on_exit.en_wit_v1()) + } + + /// create a message from the *kernel* to the filesystem, + /// asking it to fetch the current state saved under this process + async fn get_state(&mut self) -> Result>> { + let old_last_blob = self.process.last_blob.clone(); + let res = match send_and_await_response( + self, + Some(t::Address { + node: self.process.metadata.our.node.clone(), + process: KERNEL_PROCESS_ID.clone(), + }), + wit::Address { + node: self.process.metadata.our.node.clone(), + process: STATE_PROCESS_ID.en_wit_v1(), + }, + wit::Request { + inherit: false, + expects_response: Some(5), + body: serde_json::to_vec(&t::StateAction::GetState( + self.process.metadata.our.process.clone(), + )) + .unwrap(), + metadata: Some(self.process.metadata.our.process.to_string()), + capabilities: vec![], + }, + None, + ) + .await + { + Ok(Ok(_resp)) => { + // basically assuming filesystem responding properly here + match &self.process.last_blob { + None => Ok(None), + Some(blob) => Ok(Some(blob.bytes.clone())), + } + } + _ => Ok(None), + }; + self.process.last_blob = old_last_blob; + return res; + } + + /// create a message from the *kernel* to the filesystem, + /// asking it to replace the current state saved under + /// this process with these bytes + async fn set_state(&mut self, bytes: Vec) -> Result<()> { + let old_last_blob = self.process.last_blob.clone(); + let res = match send_and_await_response( + self, + Some(t::Address { + node: self.process.metadata.our.node.clone(), + process: KERNEL_PROCESS_ID.clone(), + }), + wit::Address { + node: self.process.metadata.our.node.clone(), + process: STATE_PROCESS_ID.en_wit_v1(), + }, + wit::Request { + inherit: false, + expects_response: Some(5), + body: serde_json::to_vec(&t::StateAction::SetState( + self.process.metadata.our.process.clone(), + )) + .unwrap(), + metadata: Some(self.process.metadata.our.process.to_string()), + capabilities: vec![], + }, + Some(wit::LazyLoadBlob { mime: None, bytes }), + ) + .await + { + Ok(Ok(_resp)) => { + // basically assuming filesystem responding properly here + Ok(()) + } + _ => Err(anyhow::anyhow!( + "filesystem did not respond properly to SetState!!" + )), + }; + self.process.last_blob = old_last_blob; + print_debug(&self.process, "persisted state").await; + return res; + } + + /// create a message from the *kernel* to the filesystem, + /// asking it to delete the current state saved under this process + async fn clear_state(&mut self) -> Result<()> { + let old_last_blob = self.process.last_blob.clone(); + let res = match send_and_await_response( + self, + Some(t::Address { + node: self.process.metadata.our.node.clone(), + process: KERNEL_PROCESS_ID.clone(), + }), + wit::Address { + node: self.process.metadata.our.node.clone(), + process: STATE_PROCESS_ID.en_wit_v1(), + }, + wit::Request { + inherit: false, + expects_response: Some(5), + body: serde_json::to_vec(&t::StateAction::DeleteState( + self.process.metadata.our.process.clone(), + )) + .unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + ) + .await + { + Ok(Ok(_resp)) => { + // basically assuming filesystem responding properly here + Ok(()) + } + _ => Err(anyhow::anyhow!( + "filesystem did not respond properly to ClearState!!" + )), + }; + self.process.last_blob = old_last_blob; + print_debug(&self.process, "cleared persisted state").await; + return res; + } + + /// shortcut to spawn a new process. the child process will automatically + /// be able to send messages to the parent process, and vice versa. + /// the .wasm file for the process must already be in VFS. + async fn spawn( + &mut self, + name: Option, + wasm_path: String, // must be located within package's drive + on_exit: wit::OnExit, + request_capabilities: Vec, + grant_capabilities: Vec<(wit::ProcessId, wit::Json)>, + public: bool, + ) -> Result> { + // save existing blob to restore later + let old_last_blob = self.process.last_blob.clone(); + let vfs_address = wit::Address { + node: self.process.metadata.our.node.clone(), + process: VFS_PROCESS_ID.en_wit_v1(), + }; + let Ok(Ok((_, hash_response))) = send_and_await_response( + self, + None, + vfs_address.clone(), + wit::Request { + inherit: false, + expects_response: Some(5), + body: serde_json::to_vec(&t::VfsRequest { + path: wasm_path.clone(), + action: t::VfsAction::Read, + }) + .unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + ) + .await + else { + println!("spawn: GetHash fail"); + // reset blob to what it was + self.process.last_blob = old_last_blob; + return Ok(Err(wit::SpawnError::NoFileAtPath)); + }; + let wit::Message::Response((wit::Response { body, .. }, _)) = hash_response else { + // reset blob to what it was + self.process.last_blob = old_last_blob; + return Ok(Err(wit::SpawnError::NoFileAtPath)); + }; + let t::VfsResponse::Read = serde_json::from_slice(&body).unwrap() else { + // reset blob to what it was + self.process.last_blob = old_last_blob; + return Ok(Err(wit::SpawnError::NoFileAtPath)); + }; + let Some(t::LazyLoadBlob { mime: _, ref bytes }) = self.process.last_blob else { + // reset blob to what it was + self.process.last_blob = old_last_blob; + return Ok(Err(wit::SpawnError::NoFileAtPath)); + }; + + let name = match name { + Some(name) => name, + None => rand::random::().to_string(), + }; + let new_process_id = t::ProcessId::new( + Some(&name), + self.process.metadata.our.process.package(), + self.process.metadata.our.process.publisher(), + ) + .check()?; + + let request_capabilities_filtered = { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.process + .caps_oracle + .send(t::CapMessage::FilterCaps { + on: self.process.metadata.our.process.clone(), + caps: request_capabilities + .into_iter() + .map(|cap| t::de_wit_capability_v1(cap).0) + .collect(), + responder: tx, + }) + .await + .expect("fatal: process couldn't access capabilities oracle"); + rx.await + .expect("fatal: process couldn't receive capabilities") + }; + + let Ok(Ok((_, _response))) = send_and_await_response( + self, + Some(t::Address { + node: self.process.metadata.our.node.clone(), + process: KERNEL_PROCESS_ID.clone(), + }), + wit::Address { + node: self.process.metadata.our.node.clone(), + process: KERNEL_PROCESS_ID.en_wit_v1(), + }, + wit::Request { + inherit: false, + expects_response: Some(5), // TODO evaluate + body: serde_json::to_vec(&t::KernelCommand::InitializeProcess { + id: new_process_id.clone(), + wasm_bytes_handle: wasm_path, + wit_version: self.process.metadata.wit_version, + on_exit: t::OnExit::de_wit_v1(on_exit), + initial_capabilities: request_capabilities_filtered + .into_iter() + .map(|(cap, _sig)| cap) + .collect(), + public, + }) + .unwrap(), + metadata: None, + capabilities: vec![], + }, + Some(wit::LazyLoadBlob { + mime: None, + bytes: bytes.to_vec(), + }), + ) + .await + else { + // reset blob to what it was + self.process.last_blob = old_last_blob; + return Ok(Err(wit::SpawnError::NameTaken)); + }; + // insert messaging capabilities into requested processes + for (process_id, params) in grant_capabilities { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.process + .caps_oracle + .send(t::CapMessage::Add { + on: t::ProcessId::de_wit_v1(process_id), + caps: vec![t::Capability::new( + self.process.metadata.our.clone(), + params, + )], + responder: Some(tx), + }) + .await + .unwrap(); + let _ = rx.await.unwrap(); + } + // finally, send the command to run the new process + let Ok(Ok((_, response))) = send_and_await_response( + self, + Some(t::Address { + node: self.process.metadata.our.node.clone(), + process: KERNEL_PROCESS_ID.clone(), + }), + wit::Address { + node: self.process.metadata.our.node.clone(), + process: KERNEL_PROCESS_ID.en_wit_v1(), + }, + wit::Request { + inherit: false, + expects_response: Some(5), // TODO evaluate + body: serde_json::to_vec(&t::KernelCommand::RunProcess(new_process_id.clone())) + .unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + ) + .await + else { + // reset blob to what it was + self.process.last_blob = old_last_blob; + return Ok(Err(wit::SpawnError::NameTaken)); + }; + // reset blob to what it was + self.process.last_blob = old_last_blob; + let wit::Message::Response((wit::Response { body, .. }, _)) = response else { + return Ok(Err(wit::SpawnError::NoFileAtPath)); + }; + let t::KernelResponse::StartedProcess = serde_json::from_slice(&body).unwrap() else { + return Ok(Err(wit::SpawnError::NoFileAtPath)); + }; + // child processes are always able to Message parent + let (tx, rx) = tokio::sync::oneshot::channel(); + self.process + .caps_oracle + .send(t::CapMessage::Add { + on: new_process_id.clone(), + caps: vec![t::Capability::messaging(self.process.metadata.our.clone())], + responder: Some(tx), + }) + .await + .unwrap(); + rx.await.unwrap(); + + // parent process is always able to Message child + let (tx, rx) = tokio::sync::oneshot::channel(); + self.process + .caps_oracle + .send(t::CapMessage::Add { + on: self.process.metadata.our.process.clone(), + caps: vec![t::Capability::messaging(( + self.process.metadata.our.node.clone(), + &new_process_id, + ))], + responder: Some(tx), + }) + .await + .unwrap(); + rx.await.unwrap(); + print_debug(&self.process, "spawned a new process").await; + Ok(Ok(new_process_id.en_wit_v1().to_owned())) + } + + // + // capabilities management + // + + async fn save_capabilities(&mut self, caps: Vec) -> Result<()> { + let (tx, rx) = tokio::sync::oneshot::channel(); + let _ = self + .process + .caps_oracle + .send(t::CapMessage::Add { + on: self.process.metadata.our.process.clone(), + caps: caps + .iter() + .map(|cap| t::de_wit_capability_v1(cap.clone()).0) + .collect(), + responder: Some(tx), + }) + .await?; + let _ = rx.await?; + Ok(()) + } + + async fn drop_capabilities(&mut self, caps: Vec) -> Result<()> { + let (tx, rx) = tokio::sync::oneshot::channel(); + let _ = self + .process + .caps_oracle + .send(t::CapMessage::Drop { + on: self.process.metadata.our.process.clone(), + caps: caps + .iter() + .map(|cap| t::de_wit_capability_v1(cap.clone()).0) + .collect(), + responder: Some(tx), + }) + .await?; + let _ = rx.await?; + Ok(()) + } + + async fn our_capabilities(&mut self) -> Result> { + let (tx, rx) = tokio::sync::oneshot::channel(); + let _ = self + .process + .caps_oracle + .send(t::CapMessage::GetAll { + on: self.process.metadata.our.process.clone(), + responder: tx, + }) + .await?; + let caps = rx.await?; + Ok(caps + .into_iter() + .map(|cap| t::en_wit_capability_v1(cap)) + .collect()) + } + + // + // message I/O: + // + + /// from a process: receive the next incoming message. will wait async until a message is received. + /// the incoming message can be a Request or a Response, or an Error of the Network variety. + async fn receive( + &mut self, + ) -> Result)>> { + Ok(self.process.get_next_message_for_process_v1().await) + } + + /// from a process: grab the blob part of the current prompting message. + /// if the prompting message did not have a blob, will return None. + /// will also return None if there is no prompting message. + async fn get_blob(&mut self) -> Result> { + Ok(t::en_wit_blob_v1(self.process.last_blob.clone())) + } + + async fn send_request( + &mut self, + target: wit::Address, + request: wit::Request, + context: Option, + blob: Option, + ) -> Result<()> { + let id = self + .process + .send_request_v1(None, target, request, context, blob) + .await; + match id { + Ok(_id) => Ok(()), + Err(e) => Err(e), + } + } + + async fn send_requests( + &mut self, + requests: Vec<( + wit::Address, + wit::Request, + Option, + Option, + )>, + ) -> Result<()> { + for request in requests { + let id = self + .process + .send_request_v1(None, request.0, request.1, request.2, request.3) + .await; + match id { + Ok(_id) => continue, + Err(e) => return Err(e), + } + } + Ok(()) + } + + async fn send_response( + &mut self, + response: wit::Response, + blob: Option, + ) -> Result<()> { + self.process.send_response_v1(response, blob).await; + Ok(()) + } + + async fn send_and_await_response( + &mut self, + target: wit::Address, + request: wit::Request, + blob: Option, + ) -> Result> { + send_and_await_response(self, None, target, request, blob).await + } +} diff --git a/lib/build.rs b/lib/build.rs index e615cbe0..66a46c9c 100644 --- a/lib/build.rs +++ b/lib/build.rs @@ -8,6 +8,8 @@ const KINODE_WIT_0_7_0_URL: &str = "https://raw.githubusercontent.com/kinode-dao/kinode-wit/aa2c8b11c9171b949d1991c32f58591c0e881f85/kinode.wit"; const KINODE_WIT_0_8_0_URL: &str = "https://raw.githubusercontent.com/kinode-dao/kinode-wit/v0.8/kinode.wit"; +const KINODE_WIT_1_0_0_URL: &str = + "https://raw.githubusercontent.com/kinode-dao/kinode-wit/1.0/kinode.wit"; /// copied from `kit` async fn download_file(url: &str, path: &Path) -> anyhow::Result<()> { @@ -77,5 +79,14 @@ fn main() { download_file(KINODE_WIT_0_8_0_URL, &wit_file) .await .expect("Failed to download WIT 0.8"); - }) + }); + + let wit_file = pwd.join("wit-v1.0.0").join("kinode.wit"); + + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + download_file(KINODE_WIT_1_0_0_URL, &wit_file) + .await + .expect("Failed to download WIT 1.0"); + }); } diff --git a/lib/src/core.rs b/lib/src/core.rs index c6d80bb9..c6017124 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -145,6 +145,13 @@ impl ProcessId { publisher_node: self.publisher_node.clone(), } } + pub fn en_wit_v1(&self) -> crate::v1::wit::ProcessId { + crate::v1::wit::ProcessId { + process_name: self.process_name.clone(), + package_name: self.package_name.clone(), + publisher_node: self.publisher_node.clone(), + } + } pub fn de_wit(wit: wit::ProcessId) -> Self { ProcessId { process_name: wit.process_name, @@ -159,6 +166,13 @@ impl ProcessId { publisher_node: wit.publisher_node, } } + pub fn de_wit_v1(wit: crate::v1::wit::ProcessId) -> Self { + ProcessId { + process_name: wit.process_name, + package_name: wit.package_name, + publisher_node: wit.publisher_node, + } + } pub fn check(self) -> Result { check_process_id_kimap_safe(&self)?; Ok(self) @@ -335,6 +349,12 @@ impl Address { process: self.process.en_wit_v0(), } } + pub fn en_wit_v1(&self) -> crate::v1::wit::Address { + crate::v1::wit::Address { + node: self.node.clone(), + process: self.process.en_wit_v1(), + } + } pub fn de_wit(wit: wit::Address) -> Address { Address { node: wit.node, @@ -355,6 +375,16 @@ impl Address { }, } } + pub fn de_wit_v1(wit: crate::v1::wit::Address) -> Address { + Address { + node: wit.node, + process: ProcessId { + process_name: wit.process.process_name, + package_name: wit.process.package_name, + publisher_node: wit.process.publisher_node, + }, + } + } pub fn check(self) -> Result { if !is_kimap_safe(&self.node) { return Err(AddressParseError::NodeNotKimapSafe(self.node.clone())); @@ -641,6 +671,24 @@ impl OnExit { } } + pub fn en_wit_v1(&self) -> crate::v1::wit::OnExit { + match self { + OnExit::None => crate::v1::wit::OnExit::None, + OnExit::Restart => crate::v1::wit::OnExit::Restart, + OnExit::Requests(reqs) => crate::v1::wit::OnExit::Requests( + reqs.iter() + .map(|(address, request, blob)| { + ( + address.en_wit_v1(), + en_wit_request_v1(request.clone()), + en_wit_blob_v1(blob.clone()), + ) + }) + .collect(), + ), + } + } + pub fn de_wit(wit: wit::OnExit) -> Self { match wit { wit::OnExit::None => OnExit::None, @@ -676,6 +724,24 @@ impl OnExit { ), } } + + pub fn de_wit_v1(wit: crate::v1::wit::OnExit) -> Self { + match wit { + crate::v1::wit::OnExit::None => OnExit::None, + crate::v1::wit::OnExit::Restart => OnExit::Restart, + crate::v1::wit::OnExit::Requests(reqs) => OnExit::Requests( + reqs.into_iter() + .map(|(address, request, blob)| { + ( + Address::de_wit_v1(address), + de_wit_request_v1(request), + de_wit_blob_v1(blob), + ) + }) + .collect(), + ), + } + } } impl std::fmt::Display for OnExit { @@ -783,8 +849,8 @@ pub fn de_wit_request(wit: wit::Request) -> Request { metadata: wit.metadata, capabilities: wit .capabilities - .iter() - .map(|cap| de_wit_capability(cap.clone())) + .into_iter() + .map(|cap| de_wit_capability(cap)) .collect(), } } @@ -797,8 +863,22 @@ pub fn de_wit_request_v0(wit: crate::v0::wit::Request) -> Request { metadata: wit.metadata, capabilities: wit .capabilities - .iter() - .map(|cap| de_wit_capability_v0(cap.clone())) + .into_iter() + .map(|cap| de_wit_capability_v0(cap)) + .collect(), + } +} + +pub fn de_wit_request_v1(wit: crate::v1::wit::Request) -> Request { + Request { + inherit: wit.inherit, + expects_response: wit.expects_response, + body: wit.body, + metadata: wit.metadata, + capabilities: wit + .capabilities + .into_iter() + .map(|cap| de_wit_capability_v1(cap)) .collect(), } } @@ -811,8 +891,8 @@ pub fn en_wit_request(request: Request) -> wit::Request { metadata: request.metadata, capabilities: request .capabilities - .iter() - .map(|cap| en_wit_capability(cap.clone())) + .into_iter() + .map(|cap| en_wit_capability(cap)) .collect(), } } @@ -825,8 +905,22 @@ pub fn en_wit_request_v0(request: Request) -> crate::v0::wit::Request { metadata: request.metadata, capabilities: request .capabilities - .iter() - .map(|cap| en_wit_capability_v0(cap.clone())) + .into_iter() + .map(|cap| en_wit_capability_v0(cap)) + .collect(), + } +} + +pub fn en_wit_request_v1(request: Request) -> crate::v1::wit::Request { + crate::v1::wit::Request { + inherit: request.inherit, + expects_response: request.expects_response, + body: request.body, + metadata: request.metadata, + capabilities: request + .capabilities + .into_iter() + .map(|cap| en_wit_capability_v1(cap)) .collect(), } } @@ -838,8 +932,8 @@ pub fn de_wit_response(wit: wit::Response) -> Response { metadata: wit.metadata, capabilities: wit .capabilities - .iter() - .map(|cap| de_wit_capability(cap.clone())) + .into_iter() + .map(|cap| de_wit_capability(cap)) .collect(), } } @@ -851,8 +945,21 @@ pub fn de_wit_response_v0(wit: crate::v0::wit::Response) -> Response { metadata: wit.metadata, capabilities: wit .capabilities - .iter() - .map(|cap| de_wit_capability_v0(cap.clone())) + .into_iter() + .map(|cap| de_wit_capability_v0(cap)) + .collect(), + } +} + +pub fn de_wit_response_v1(wit: crate::v1::wit::Response) -> Response { + Response { + inherit: wit.inherit, + body: wit.body, + metadata: wit.metadata, + capabilities: wit + .capabilities + .into_iter() + .map(|cap| de_wit_capability_v1(cap)) .collect(), } } @@ -864,8 +971,8 @@ pub fn en_wit_response(response: Response) -> wit::Response { metadata: response.metadata, capabilities: response .capabilities - .iter() - .map(|cap| en_wit_capability(cap.clone())) + .into_iter() + .map(|cap| en_wit_capability(cap)) .collect(), } } @@ -877,8 +984,21 @@ pub fn en_wit_response_v0(response: Response) -> crate::v0::wit::Response { metadata: response.metadata, capabilities: response .capabilities - .iter() - .map(|cap| en_wit_capability_v0(cap.clone())) + .into_iter() + .map(|cap| en_wit_capability_v0(cap)) + .collect(), + } +} + +pub fn en_wit_response_v1(response: Response) -> crate::v1::wit::Response { + crate::v1::wit::Response { + inherit: response.inherit, + body: response.body, + metadata: response.metadata, + capabilities: response + .capabilities + .into_iter() + .map(|cap| en_wit_capability_v1(cap)) .collect(), } } @@ -903,6 +1023,16 @@ pub fn de_wit_blob_v0(wit: Option) -> Option) -> Option { + match wit { + None => None, + Some(wit) => Some(LazyLoadBlob { + mime: wit.mime, + bytes: wit.bytes, + }), + } +} + pub fn en_wit_blob(load: Option) -> Option { match load { None => None, @@ -923,6 +1053,16 @@ pub fn en_wit_blob_v0(load: Option) -> Option) -> Option { + match load { + None => None, + Some(load) => Some(crate::v1::wit::LazyLoadBlob { + mime: load.mime, + bytes: load.bytes, + }), + } +} + pub fn de_wit_capability(wit: wit::Capability) -> (Capability, Vec) { ( Capability { @@ -957,17 +1097,41 @@ pub fn de_wit_capability_v0(wit: crate::v0::wit::Capability) -> (Capability, Vec ) } +pub fn de_wit_capability_v1(wit: crate::v1::wit::Capability) -> (Capability, Vec) { + ( + Capability { + issuer: Address { + node: wit.issuer.node, + process: ProcessId { + process_name: wit.issuer.process.process_name, + package_name: wit.issuer.process.package_name, + publisher_node: wit.issuer.process.publisher_node, + }, + }, + params: wit.params, + }, + vec![], + ) +} + pub fn en_wit_capability(cap: (Capability, Vec)) -> wit::Capability { wit::Capability { issuer: cap.0.issuer.en_wit(), - params: cap.0.params.to_string(), + params: cap.0.params, } } pub fn en_wit_capability_v0(cap: (Capability, Vec)) -> crate::v0::wit::Capability { crate::v0::wit::Capability { issuer: cap.0.issuer.en_wit_v0(), - params: cap.0.params.to_string(), + params: cap.0.params, + } +} + +pub fn en_wit_capability_v1(cap: (Capability, Vec)) -> crate::v1::wit::Capability { + crate::v1::wit::Capability { + issuer: cap.0.issuer.en_wit_v1(), + params: cap.0.params, } } @@ -989,6 +1153,15 @@ pub fn en_wit_message_v0(message: Message) -> crate::v0::wit::Message { } } +pub fn en_wit_message_v1(message: Message) -> crate::v1::wit::Message { + match message { + Message::Request(request) => crate::v1::wit::Message::Request(en_wit_request_v1(request)), + Message::Response((response, context)) => { + crate::v1::wit::Message::Response((en_wit_response_v1(response), context)) + } + } +} + pub fn en_wit_send_error(error: SendError) -> wit::SendError { wit::SendError { kind: en_wit_send_error_kind(error.kind), @@ -1006,6 +1179,15 @@ pub fn en_wit_send_error_v0(error: SendError) -> crate::v0::wit::SendError { } } +pub fn en_wit_send_error_v1(error: SendError) -> crate::v1::wit::SendError { + crate::v1::wit::SendError { + kind: en_wit_send_error_kind_v1(error.kind), + target: error.target.en_wit_v1(), + message: en_wit_message_v1(error.message), + lazy_load_blob: en_wit_blob_v1(error.lazy_load_blob), + } +} + pub fn en_wit_send_error_kind(kind: SendErrorKind) -> wit::SendErrorKind { match kind { SendErrorKind::Offline => wit::SendErrorKind::Offline, @@ -1020,6 +1202,13 @@ pub fn en_wit_send_error_kind_v0(kind: SendErrorKind) -> crate::v0::wit::SendErr } } +pub fn en_wit_send_error_kind_v1(kind: SendErrorKind) -> crate::v1::wit::SendErrorKind { + match kind { + SendErrorKind::Offline => crate::v1::wit::SendErrorKind::Offline, + SendErrorKind::Timeout => crate::v1::wit::SendErrorKind::Timeout, + } +} + // // END SYNC with process_lib // diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 61214b23..b686319f 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -12,12 +12,16 @@ pub mod types { pub use kinode::process; pub use kinode::process::standard as wit; +// can remove in 1.0! + wasmtime::component::bindgen!({ path: "wit-v0.7.0", world: "process", async: true, }); +// can remove in 1.0! + pub mod v0 { pub use kinode::process; pub use kinode::process::standard as wit; @@ -27,3 +31,13 @@ pub mod v0 { async: true, }); } + +pub mod v1 { + pub use kinode::process; + pub use kinode::process::standard as wit; + wasmtime::component::bindgen!({ + path: "wit-v1.0.0", + world: "process-v1", + async: true, + }); +}