add wit v1 support to kernel

This commit is contained in:
dr-frmr 2024-12-09 15:53:31 -05:00
parent 4aa35be1a4
commit 475aea17fb
No known key found for this signature in database
6 changed files with 1289 additions and 22 deletions

View File

@ -17,6 +17,8 @@ pub mod process;
mod standard_host;
/// Implement the functions served to processes by `wit-v0.8.0/kinode.wit`.
mod standard_host_v0;
/// Implement the functions served to processes by `wit-v1.0.0/kinode.wit`.
mod standard_host_v1;
pub const LATEST_WIT_VERSION: u32 = 0;
const PROCESS_CHANNEL_CAPACITY: usize = 100;

View File

@ -1,5 +1,5 @@
use crate::KERNEL_PROCESS_ID;
use lib::{types::core as t, v0::ProcessV0, Process};
use lib::{types::core as t, v0::ProcessV0, v1::ProcessV1, Process};
use std::{
collections::{HashMap, VecDeque},
path::PathBuf,
@ -71,6 +71,7 @@ impl WasiView for ProcessWasi {
}
}
/// **can be removed in 1.0.0**
pub struct ProcessWasiV0 {
pub process: ProcessState,
table: Table,
@ -86,6 +87,21 @@ impl WasiView for ProcessWasiV0 {
}
}
pub struct ProcessWasiV1 {
pub process: ProcessState,
table: Table,
wasi: WasiCtx,
}
impl WasiView for ProcessWasiV1 {
fn table(&mut self) -> &mut Table {
&mut self.table
}
fn ctx(&mut self) -> &mut WasiCtx {
&mut self.wasi
}
}
async fn make_table_and_wasi(
home_directory_path: PathBuf,
process_state: &ProcessState,
@ -134,6 +150,7 @@ async fn make_table_and_wasi(
(table, wasi.stderr(wasi_stderr.clone()).build(), wasi_stderr)
}
/// **can be removed in 1.0.0**
async fn make_component(
engine: Engine,
wasm_bytes: &[u8],
@ -177,6 +194,7 @@ async fn make_component(
Ok((bindings, store, wasi_stderr))
}
/// **can be removed in 1.0.0**
async fn make_component_v0(
engine: Engine,
wasm_bytes: &[u8],
@ -220,6 +238,49 @@ async fn make_component_v0(
Ok((bindings, store, wasi_stderr))
}
async fn make_component_v1(
engine: Engine,
wasm_bytes: &[u8],
home_directory_path: PathBuf,
process_state: ProcessState,
) -> anyhow::Result<(ProcessV1, Store<ProcessWasiV1>, MemoryOutputPipe)> {
let component =
Component::new(&engine, wasm_bytes.to_vec()).expect("make_component: couldn't read file");
let mut linker = Linker::new(&engine);
ProcessV1::add_to_linker(&mut linker, |state: &mut ProcessWasiV1| state).unwrap();
let (table, wasi, wasi_stderr) = make_table_and_wasi(home_directory_path, &process_state).await;
wasmtime_wasi::command::add_to_linker(&mut linker).unwrap();
let our_process_id = process_state.metadata.our.process.clone();
let send_to_terminal = process_state.send_to_terminal.clone();
let mut store = Store::new(
&engine,
ProcessWasiV1 {
process: process_state,
table,
wasi,
},
);
let (bindings, _bindings) =
match ProcessV1::instantiate_async(&mut store, &component, &linker).await {
Ok(b) => b,
Err(e) => {
t::Printout::new(
0,
format!("kernel: process {our_process_id} failed to instantiate: {e:?}"),
)
.send(&send_to_terminal)
.await;
return Err(e);
}
};
Ok((bindings, store, wasi_stderr))
}
/// create a specific process, and generate a task that will run it.
pub async fn make_process_loop(
keypair: Arc<ring::signature::Ed25519KeyPair>,
@ -287,6 +348,7 @@ pub async fn make_process_loop(
let metadata = match wit_version {
// assume missing version is oldest wit version
// **can be removed in 1.0.0**
None => {
let (bindings, mut store, wasi_stderr) =
make_component(engine, &wasm_bytes, home_directory_path, process_state).await?;
@ -319,8 +381,8 @@ pub async fn make_process_loop(
store.data().process.metadata.to_owned()
}
// match version numbers
// assume higher uncovered version number is latest version
Some(0) | _ => {
// **can be removed in 1.0.0**
Some(0) => {
let (bindings, mut store, wasi_stderr) =
make_component_v0(engine, &wasm_bytes, home_directory_path, process_state).await?;
@ -348,6 +410,37 @@ pub async fn make_process_loop(
}
};
// update metadata to what was mutated by process in store
store.data().process.metadata.to_owned()
}
Some(1) | _ => {
let (bindings, mut store, wasi_stderr) =
make_component_v1(engine, &wasm_bytes, home_directory_path, process_state).await?;
// the process will run until it returns from init() or crashes
match bindings.call_init(&mut store, &our.to_string()).await {
Ok(()) => {
t::Printout::new(1, format!("process {our} returned without error"))
.send(&send_to_terminal)
.await;
}
Err(e) => {
let stderr = wasi_stderr.contents().into();
let stderr = String::from_utf8(stderr)?;
let output = if stderr != String::new() {
stderr
} else {
format!("{}", e.root_cause())
};
t::Printout::new(
0,
format!("\x1b[38;5;196mprocess {our} ended with error:\x1b[0m\n{output}"),
)
.send(&send_to_terminal)
.await;
}
};
// update metadata to what was mutated by process in store
store.data().process.metadata.to_owned()
}

View File

@ -0,0 +1,958 @@
use crate::kernel::process;
use anyhow::Result;
use lib::types::core::{self as t, KERNEL_PROCESS_ID, STATE_PROCESS_ID, VFS_PROCESS_ID};
use lib::v1::wit;
use lib::v1::wit::Host as StandardHost;
use ring::signature::{self, KeyPair};
async fn print_debug(proc: &process::ProcessState, content: &str) {
let _ = proc
.send_to_terminal
.send(t::Printout {
verbosity: 2,
content: format!("{}: {}", proc.metadata.our.process, content),
})
.await;
}
impl process::ProcessState {
/// Ingest latest message directed to this process, and save it as the current message.
/// If there is no message in the queue, wait async until one is received.
async fn get_next_message_for_process_v1(
&mut self,
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let res = match self.message_queue.pop_front() {
Some(message_from_queue) => message_from_queue,
None => self.ingest_message_v1().await,
};
self.kernel_message_to_process_receive_v1(res)
}
/// instead of ingesting latest, wait for a specific ID and queue all others
async fn get_specific_message_for_process_v1(
&mut self,
awaited_message_id: u64,
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
// first, check if the awaited message is already in the queue and handle if so
for (i, message) in self.message_queue.iter().enumerate() {
match message {
Ok(ref km) if km.id == awaited_message_id => {
let km = self.message_queue.remove(i).unwrap();
return self.kernel_message_to_process_receive_v1(km);
}
_ => continue,
}
}
// next, wait for the awaited message to arrive
loop {
let res = self.ingest_message_v1().await;
let id = match &res {
Ok(km) => km.id,
Err(e) => e.id,
};
if id == awaited_message_id {
return self.kernel_message_to_process_receive_v1(res);
} else {
self.message_queue.push_back(res);
}
}
}
/// ingest next valid message from kernel.
/// cancel any timeout task associated with this message.
/// if the message is a response, only enqueue if we have an outstanding request for it.
async fn ingest_message_v1(&mut self) -> Result<t::KernelMessage, t::WrappedSendError> {
loop {
let message = self
.recv_in_process
.recv()
.await
.expect("fatal: process couldn't receive next message");
match &message {
Ok(km) => match &km.message {
t::Message::Response(_) => {
if let Some((_context, timeout_handle)) = self.contexts.get_mut(&km.id) {
timeout_handle.abort();
return message;
}
}
_ => {
return message;
}
},
Err(e) => {
if let Some((_context, timeout_handle)) = self.contexts.get_mut(&e.id) {
timeout_handle.abort();
return message;
}
}
}
}
}
/// Convert a message from the main event loop into a result for the process to receive.
/// If the message is a response or error, get context if we have one.
fn kernel_message_to_process_receive_v1(
&mut self,
incoming: Result<t::KernelMessage, t::WrappedSendError>,
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let (mut km, context) = match incoming {
Ok(mut km) => match km.message {
t::Message::Request(t::Request {
ref expects_response,
..
}) => {
if km.lazy_load_blob.is_some() {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
}
if expects_response.is_some() || km.rsvp.is_some() {
// update prompting_message iff there is someone to reply to
self.prompting_message = Some(km.clone());
}
(km, None)
}
t::Message::Response(_) => match self.contexts.remove(&km.id) {
Some((context, _timeout_handle)) => {
if km.lazy_load_blob.is_some() {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
}
self.prompting_message = context.prompting_message;
(km, context.context)
}
None => {
if km.lazy_load_blob.is_some() {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
}
self.prompting_message = Some(km.clone());
(km, None)
}
},
},
Err(e) => match self.contexts.remove(&e.id) {
None => return Err((t::en_wit_send_error_v1(e.error), None)),
Some((context, _timeout_handle)) => {
self.prompting_message = context.prompting_message;
return Err((t::en_wit_send_error_v1(e.error), context.context));
}
},
};
let pk = signature::UnparsedPublicKey::new(
&signature::ED25519,
self.keypair.as_ref().public_key(),
);
// prune any invalid capabilities before handing to process
// where invalid = supposedly issued by us, but not signed properly by us
match &mut km.message {
t::Message::Request(request) => {
request.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
t::Message::Response((response, _)) => {
response.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
};
Ok((
km.source.en_wit_v1(),
match km.message {
t::Message::Request(request) => {
wit::Message::Request(t::en_wit_request_v1(request))
}
// NOTE: we throw away whatever context came from the sender, that's not ours
t::Message::Response((response, _sent_context)) => {
wit::Message::Response((t::en_wit_response_v1(response), context))
}
},
))
}
/// takes Request generated by a process and sends it to the main event loop.
/// will only fail if process does not have capability to send to target.
/// if the request has a timeout (expects response), start a task to track
/// that timeout and return timeout error if it expires.
async fn send_request_v1(
&mut self,
// only used when kernel steps in to get/set state
fake_source: Option<t::Address>,
target: wit::Address,
request: wit::Request,
new_context: Option<wit::Context>,
blob: Option<wit::LazyLoadBlob>,
) -> Result<u64> {
let source = fake_source.unwrap_or(self.metadata.our.clone());
let mut request = t::de_wit_request_v1(request);
// if request chooses to inherit, it means to take the ID and lazy_load_blob,
// if any, from the last message it ingested
// if request chooses to inherit, match id to precedessor
// otherwise, id is generated randomly
let request_id: u64 = if request.inherit && self.prompting_message.is_some() {
self.prompting_message.as_ref().unwrap().id
} else {
loop {
let id = rand::random();
if !self.contexts.contains_key(&id) {
break id;
}
}
};
// if a blob is provided, it will be used; otherwise, if inherit is true,
// and a predecessor exists, its blob will be used; otherwise, no blob will be used.
let blob = match blob {
Some(p) => Some(t::LazyLoadBlob {
mime: p.mime,
bytes: p.bytes,
}),
None => match request.inherit {
true => self.last_blob.clone(),
false => None,
},
};
if !request.capabilities.is_empty() {
request.capabilities = {
let (tx, rx) = tokio::sync::oneshot::channel();
self.caps_oracle
.send(t::CapMessage::FilterCaps {
on: self.metadata.our.process.clone(),
caps: request
.capabilities
.into_iter()
.map(|(cap, _)| cap)
.collect(),
responder: tx,
})
.await
.expect("fatal: process couldn't access capabilities oracle");
rx.await
.expect("fatal: process couldn't receive capabilities")
};
}
// if the request expects a response, modify the process' context map as needed
// and set a timer.
// TODO optimize this SIGNIFICANTLY: stop spawning tasks
// and use a global clock + garbage collect step to check for timeouts
if let Some(timeout_secs) = request.expects_response {
let this_request = request.clone();
let this_blob = blob.clone();
let self_sender = self.self_sender.clone();
let original_target = t::Address::de_wit_v1(target.clone());
let timeout_handle = tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await;
let _ = self_sender
.send(Err(t::WrappedSendError {
id: request_id,
source: original_target.clone(),
error: t::SendError {
kind: t::SendErrorKind::Timeout,
target: original_target,
message: t::Message::Request(this_request),
lazy_load_blob: this_blob,
},
}))
.await;
});
self.contexts.insert(
request_id,
(
process::ProcessContext {
prompting_message: self.prompting_message.clone(),
context: new_context,
},
timeout_handle,
),
);
}
// rsvp is set based on this priority:
// 1. whether this request expects a response -- if so, rsvp = our address, always
// 2. whether this request inherits -- if so, rsvp = prompting message's rsvp
// 3. if neither, rsvp = None
let kernel_message = t::KernelMessage {
id: request_id,
source,
target: t::Address::de_wit_v1(target),
rsvp: match (
request.expects_response,
request.inherit,
&self.prompting_message,
) {
(Some(_), _, _) => {
// this request expects response, so receives any response
// make sure to use the real source, not a fake injected-by-kernel source
Some(self.metadata.our.clone())
}
(None, true, Some(ref prompt)) => {
// this request inherits, so response will be routed to prompting message
prompt.rsvp.clone()
}
_ => None,
},
message: t::Message::Request(request),
lazy_load_blob: blob,
};
self.send_to_loop
.send(kernel_message)
.await
.expect("fatal: kernel couldn't send request");
Ok(request_id)
}
/// takes Response generated by a process and sends it to the main event loop.
async fn send_response_v1(&mut self, response: wit::Response, blob: Option<wit::LazyLoadBlob>) {
let mut response = t::de_wit_response_v1(response);
// the process requires a prompting_message in order to issue a response
let Some(ref prompting_message) = self.prompting_message else {
t::Printout::new(
0,
format!("kernel: need non-None prompting_message to handle Response {response:?}"),
)
.send(&self.send_to_terminal)
.await;
return;
};
// given the current process state, produce the id and target that
// a response it emits should have.
let (id, target) = (
prompting_message.id,
match &prompting_message.rsvp {
None => prompting_message.source.clone(),
Some(rsvp) => rsvp.clone(),
},
);
let blob = match response.inherit {
true => self.last_blob.clone(),
false => t::de_wit_blob_v1(blob),
};
if !response.capabilities.is_empty() {
response.capabilities = {
let (tx, rx) = tokio::sync::oneshot::channel();
self.caps_oracle
.send(t::CapMessage::FilterCaps {
on: self.metadata.our.process.clone(),
caps: response
.capabilities
.into_iter()
.map(|(cap, _)| cap)
.collect(),
responder: tx,
})
.await
.expect("fatal: process couldn't access capabilities oracle");
rx.await
.expect("fatal: process couldn't receive capabilities")
};
}
self.send_to_loop
.send(t::KernelMessage {
id,
source: self.metadata.our.clone(),
target,
rsvp: None,
message: t::Message::Response((
response,
// the context will be set by the process receiving this Response.
None,
)),
lazy_load_blob: blob,
})
.await
.expect("fatal: kernel couldn't send response");
}
}
async fn send_and_await_response(
process: &mut process::ProcessWasiV1,
source: Option<t::Address>,
target: wit::Address,
request: wit::Request,
blob: Option<wit::LazyLoadBlob>,
) -> Result<Result<(wit::Address, wit::Message), wit::SendError>> {
if request.expects_response.is_none() {
return Err(anyhow::anyhow!(
"kernel: got invalid send_and_await_response() Request from {:?}: must expect response",
process.process.metadata.our.process
));
}
if t::Address::de_wit_v1(target.clone()) == process.process.metadata.our {
return Err(anyhow::anyhow!(
"kernel: got invalid send_and_await_response() Request from and to {}: cannot await a Request to `our`: will deadlock",
process.process.metadata.our,
));
}
let id = process
.process
.send_request_v1(source, target, request, None, blob)
.await;
match id {
Ok(id) => match process
.process
.get_specific_message_for_process_v1(id)
.await
{
Ok((address, wit::Message::Response(response))) => {
Ok(Ok((address, wit::Message::Response(response))))
}
Ok((_address, wit::Message::Request(_))) => Err(anyhow::anyhow!(
"fatal: received Request instead of Response"
)),
Err((net_err, _context)) => Ok(Err(net_err)),
},
Err(e) => Err(e),
}
}
///
/// create the process API. this is where the functions that a process can use live.
///
#[async_trait::async_trait]
impl StandardHost for process::ProcessWasiV1 {
//
// system utils:
//
/// Print a message to the runtime terminal. Add the name of the process to the
/// beginning of the string, so user can verify source.
async fn print_to_terminal(&mut self, verbosity: u8, content: String) -> Result<()> {
self.process
.send_to_terminal
.send(t::Printout {
verbosity,
content: format!(
"{}:{}: {}",
self.process.metadata.our.process.package(),
self.process.metadata.our.process.publisher(),
content
),
})
.await
.map_err(|e| anyhow::anyhow!("fatal: couldn't send to terminal: {e:?}"))
}
async fn our(&mut self) -> Result<wit::Address> {
Ok(self.process.metadata.our.en_wit_v1())
}
//
// process management:
//
/// TODO critical: move to kernel logic to enable persistence of choice made here
async fn set_on_exit(&mut self, on_exit: wit::OnExit) -> Result<()> {
self.process.metadata.on_exit = t::OnExit::de_wit_v1(on_exit);
print_debug(&self.process, "set new on-exit behavior").await;
Ok(())
}
async fn get_on_exit(&mut self) -> Result<wit::OnExit> {
Ok(self.process.metadata.on_exit.en_wit_v1())
}
/// create a message from the *kernel* to the filesystem,
/// asking it to fetch the current state saved under this process
async fn get_state(&mut self) -> Result<Option<Vec<u8>>> {
let old_last_blob = self.process.last_blob.clone();
let res = match send_and_await_response(
self,
Some(t::Address {
node: self.process.metadata.our.node.clone(),
process: KERNEL_PROCESS_ID.clone(),
}),
wit::Address {
node: self.process.metadata.our.node.clone(),
process: STATE_PROCESS_ID.en_wit_v1(),
},
wit::Request {
inherit: false,
expects_response: Some(5),
body: serde_json::to_vec(&t::StateAction::GetState(
self.process.metadata.our.process.clone(),
))
.unwrap(),
metadata: Some(self.process.metadata.our.process.to_string()),
capabilities: vec![],
},
None,
)
.await
{
Ok(Ok(_resp)) => {
// basically assuming filesystem responding properly here
match &self.process.last_blob {
None => Ok(None),
Some(blob) => Ok(Some(blob.bytes.clone())),
}
}
_ => Ok(None),
};
self.process.last_blob = old_last_blob;
return res;
}
/// create a message from the *kernel* to the filesystem,
/// asking it to replace the current state saved under
/// this process with these bytes
async fn set_state(&mut self, bytes: Vec<u8>) -> Result<()> {
let old_last_blob = self.process.last_blob.clone();
let res = match send_and_await_response(
self,
Some(t::Address {
node: self.process.metadata.our.node.clone(),
process: KERNEL_PROCESS_ID.clone(),
}),
wit::Address {
node: self.process.metadata.our.node.clone(),
process: STATE_PROCESS_ID.en_wit_v1(),
},
wit::Request {
inherit: false,
expects_response: Some(5),
body: serde_json::to_vec(&t::StateAction::SetState(
self.process.metadata.our.process.clone(),
))
.unwrap(),
metadata: Some(self.process.metadata.our.process.to_string()),
capabilities: vec![],
},
Some(wit::LazyLoadBlob { mime: None, bytes }),
)
.await
{
Ok(Ok(_resp)) => {
// basically assuming filesystem responding properly here
Ok(())
}
_ => Err(anyhow::anyhow!(
"filesystem did not respond properly to SetState!!"
)),
};
self.process.last_blob = old_last_blob;
print_debug(&self.process, "persisted state").await;
return res;
}
/// create a message from the *kernel* to the filesystem,
/// asking it to delete the current state saved under this process
async fn clear_state(&mut self) -> Result<()> {
let old_last_blob = self.process.last_blob.clone();
let res = match send_and_await_response(
self,
Some(t::Address {
node: self.process.metadata.our.node.clone(),
process: KERNEL_PROCESS_ID.clone(),
}),
wit::Address {
node: self.process.metadata.our.node.clone(),
process: STATE_PROCESS_ID.en_wit_v1(),
},
wit::Request {
inherit: false,
expects_response: Some(5),
body: serde_json::to_vec(&t::StateAction::DeleteState(
self.process.metadata.our.process.clone(),
))
.unwrap(),
metadata: None,
capabilities: vec![],
},
None,
)
.await
{
Ok(Ok(_resp)) => {
// basically assuming filesystem responding properly here
Ok(())
}
_ => Err(anyhow::anyhow!(
"filesystem did not respond properly to ClearState!!"
)),
};
self.process.last_blob = old_last_blob;
print_debug(&self.process, "cleared persisted state").await;
return res;
}
/// shortcut to spawn a new process. the child process will automatically
/// be able to send messages to the parent process, and vice versa.
/// the .wasm file for the process must already be in VFS.
async fn spawn(
&mut self,
name: Option<String>,
wasm_path: String, // must be located within package's drive
on_exit: wit::OnExit,
request_capabilities: Vec<wit::Capability>,
grant_capabilities: Vec<(wit::ProcessId, wit::Json)>,
public: bool,
) -> Result<Result<wit::ProcessId, wit::SpawnError>> {
// save existing blob to restore later
let old_last_blob = self.process.last_blob.clone();
let vfs_address = wit::Address {
node: self.process.metadata.our.node.clone(),
process: VFS_PROCESS_ID.en_wit_v1(),
};
let Ok(Ok((_, hash_response))) = send_and_await_response(
self,
None,
vfs_address.clone(),
wit::Request {
inherit: false,
expects_response: Some(5),
body: serde_json::to_vec(&t::VfsRequest {
path: wasm_path.clone(),
action: t::VfsAction::Read,
})
.unwrap(),
metadata: None,
capabilities: vec![],
},
None,
)
.await
else {
println!("spawn: GetHash fail");
// reset blob to what it was
self.process.last_blob = old_last_blob;
return Ok(Err(wit::SpawnError::NoFileAtPath));
};
let wit::Message::Response((wit::Response { body, .. }, _)) = hash_response else {
// reset blob to what it was
self.process.last_blob = old_last_blob;
return Ok(Err(wit::SpawnError::NoFileAtPath));
};
let t::VfsResponse::Read = serde_json::from_slice(&body).unwrap() else {
// reset blob to what it was
self.process.last_blob = old_last_blob;
return Ok(Err(wit::SpawnError::NoFileAtPath));
};
let Some(t::LazyLoadBlob { mime: _, ref bytes }) = self.process.last_blob else {
// reset blob to what it was
self.process.last_blob = old_last_blob;
return Ok(Err(wit::SpawnError::NoFileAtPath));
};
let name = match name {
Some(name) => name,
None => rand::random::<u64>().to_string(),
};
let new_process_id = t::ProcessId::new(
Some(&name),
self.process.metadata.our.process.package(),
self.process.metadata.our.process.publisher(),
)
.check()?;
let request_capabilities_filtered = {
let (tx, rx) = tokio::sync::oneshot::channel();
self.process
.caps_oracle
.send(t::CapMessage::FilterCaps {
on: self.process.metadata.our.process.clone(),
caps: request_capabilities
.into_iter()
.map(|cap| t::de_wit_capability_v1(cap).0)
.collect(),
responder: tx,
})
.await
.expect("fatal: process couldn't access capabilities oracle");
rx.await
.expect("fatal: process couldn't receive capabilities")
};
let Ok(Ok((_, _response))) = send_and_await_response(
self,
Some(t::Address {
node: self.process.metadata.our.node.clone(),
process: KERNEL_PROCESS_ID.clone(),
}),
wit::Address {
node: self.process.metadata.our.node.clone(),
process: KERNEL_PROCESS_ID.en_wit_v1(),
},
wit::Request {
inherit: false,
expects_response: Some(5), // TODO evaluate
body: serde_json::to_vec(&t::KernelCommand::InitializeProcess {
id: new_process_id.clone(),
wasm_bytes_handle: wasm_path,
wit_version: self.process.metadata.wit_version,
on_exit: t::OnExit::de_wit_v1(on_exit),
initial_capabilities: request_capabilities_filtered
.into_iter()
.map(|(cap, _sig)| cap)
.collect(),
public,
})
.unwrap(),
metadata: None,
capabilities: vec![],
},
Some(wit::LazyLoadBlob {
mime: None,
bytes: bytes.to_vec(),
}),
)
.await
else {
// reset blob to what it was
self.process.last_blob = old_last_blob;
return Ok(Err(wit::SpawnError::NameTaken));
};
// insert messaging capabilities into requested processes
for (process_id, params) in grant_capabilities {
let (tx, rx) = tokio::sync::oneshot::channel();
self.process
.caps_oracle
.send(t::CapMessage::Add {
on: t::ProcessId::de_wit_v1(process_id),
caps: vec![t::Capability::new(
self.process.metadata.our.clone(),
params,
)],
responder: Some(tx),
})
.await
.unwrap();
let _ = rx.await.unwrap();
}
// finally, send the command to run the new process
let Ok(Ok((_, response))) = send_and_await_response(
self,
Some(t::Address {
node: self.process.metadata.our.node.clone(),
process: KERNEL_PROCESS_ID.clone(),
}),
wit::Address {
node: self.process.metadata.our.node.clone(),
process: KERNEL_PROCESS_ID.en_wit_v1(),
},
wit::Request {
inherit: false,
expects_response: Some(5), // TODO evaluate
body: serde_json::to_vec(&t::KernelCommand::RunProcess(new_process_id.clone()))
.unwrap(),
metadata: None,
capabilities: vec![],
},
None,
)
.await
else {
// reset blob to what it was
self.process.last_blob = old_last_blob;
return Ok(Err(wit::SpawnError::NameTaken));
};
// reset blob to what it was
self.process.last_blob = old_last_blob;
let wit::Message::Response((wit::Response { body, .. }, _)) = response else {
return Ok(Err(wit::SpawnError::NoFileAtPath));
};
let t::KernelResponse::StartedProcess = serde_json::from_slice(&body).unwrap() else {
return Ok(Err(wit::SpawnError::NoFileAtPath));
};
// child processes are always able to Message parent
let (tx, rx) = tokio::sync::oneshot::channel();
self.process
.caps_oracle
.send(t::CapMessage::Add {
on: new_process_id.clone(),
caps: vec![t::Capability::messaging(self.process.metadata.our.clone())],
responder: Some(tx),
})
.await
.unwrap();
rx.await.unwrap();
// parent process is always able to Message child
let (tx, rx) = tokio::sync::oneshot::channel();
self.process
.caps_oracle
.send(t::CapMessage::Add {
on: self.process.metadata.our.process.clone(),
caps: vec![t::Capability::messaging((
self.process.metadata.our.node.clone(),
&new_process_id,
))],
responder: Some(tx),
})
.await
.unwrap();
rx.await.unwrap();
print_debug(&self.process, "spawned a new process").await;
Ok(Ok(new_process_id.en_wit_v1().to_owned()))
}
//
// capabilities management
//
async fn save_capabilities(&mut self, caps: Vec<wit::Capability>) -> Result<()> {
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = self
.process
.caps_oracle
.send(t::CapMessage::Add {
on: self.process.metadata.our.process.clone(),
caps: caps
.iter()
.map(|cap| t::de_wit_capability_v1(cap.clone()).0)
.collect(),
responder: Some(tx),
})
.await?;
let _ = rx.await?;
Ok(())
}
async fn drop_capabilities(&mut self, caps: Vec<wit::Capability>) -> Result<()> {
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = self
.process
.caps_oracle
.send(t::CapMessage::Drop {
on: self.process.metadata.our.process.clone(),
caps: caps
.iter()
.map(|cap| t::de_wit_capability_v1(cap.clone()).0)
.collect(),
responder: Some(tx),
})
.await?;
let _ = rx.await?;
Ok(())
}
async fn our_capabilities(&mut self) -> Result<Vec<wit::Capability>> {
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = self
.process
.caps_oracle
.send(t::CapMessage::GetAll {
on: self.process.metadata.our.process.clone(),
responder: tx,
})
.await?;
let caps = rx.await?;
Ok(caps
.into_iter()
.map(|cap| t::en_wit_capability_v1(cap))
.collect())
}
//
// message I/O:
//
/// from a process: receive the next incoming message. will wait async until a message is received.
/// the incoming message can be a Request or a Response, or an Error of the Network variety.
async fn receive(
&mut self,
) -> Result<Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)>> {
Ok(self.process.get_next_message_for_process_v1().await)
}
/// from a process: grab the blob part of the current prompting message.
/// if the prompting message did not have a blob, will return None.
/// will also return None if there is no prompting message.
async fn get_blob(&mut self) -> Result<Option<wit::LazyLoadBlob>> {
Ok(t::en_wit_blob_v1(self.process.last_blob.clone()))
}
async fn send_request(
&mut self,
target: wit::Address,
request: wit::Request,
context: Option<wit::Context>,
blob: Option<wit::LazyLoadBlob>,
) -> Result<()> {
let id = self
.process
.send_request_v1(None, target, request, context, blob)
.await;
match id {
Ok(_id) => Ok(()),
Err(e) => Err(e),
}
}
async fn send_requests(
&mut self,
requests: Vec<(
wit::Address,
wit::Request,
Option<wit::Context>,
Option<wit::LazyLoadBlob>,
)>,
) -> Result<()> {
for request in requests {
let id = self
.process
.send_request_v1(None, request.0, request.1, request.2, request.3)
.await;
match id {
Ok(_id) => continue,
Err(e) => return Err(e),
}
}
Ok(())
}
async fn send_response(
&mut self,
response: wit::Response,
blob: Option<wit::LazyLoadBlob>,
) -> Result<()> {
self.process.send_response_v1(response, blob).await;
Ok(())
}
async fn send_and_await_response(
&mut self,
target: wit::Address,
request: wit::Request,
blob: Option<wit::LazyLoadBlob>,
) -> Result<Result<(wit::Address, wit::Message), wit::SendError>> {
send_and_await_response(self, None, target, request, blob).await
}
}

View File

@ -8,6 +8,8 @@ const KINODE_WIT_0_7_0_URL: &str =
"https://raw.githubusercontent.com/kinode-dao/kinode-wit/aa2c8b11c9171b949d1991c32f58591c0e881f85/kinode.wit";
const KINODE_WIT_0_8_0_URL: &str =
"https://raw.githubusercontent.com/kinode-dao/kinode-wit/v0.8/kinode.wit";
const KINODE_WIT_1_0_0_URL: &str =
"https://raw.githubusercontent.com/kinode-dao/kinode-wit/1.0/kinode.wit";
/// copied from `kit`
async fn download_file(url: &str, path: &Path) -> anyhow::Result<()> {
@ -77,5 +79,14 @@ fn main() {
download_file(KINODE_WIT_0_8_0_URL, &wit_file)
.await
.expect("Failed to download WIT 0.8");
})
});
let wit_file = pwd.join("wit-v1.0.0").join("kinode.wit");
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
download_file(KINODE_WIT_1_0_0_URL, &wit_file)
.await
.expect("Failed to download WIT 1.0");
});
}

View File

@ -145,6 +145,13 @@ impl ProcessId {
publisher_node: self.publisher_node.clone(),
}
}
pub fn en_wit_v1(&self) -> crate::v1::wit::ProcessId {
crate::v1::wit::ProcessId {
process_name: self.process_name.clone(),
package_name: self.package_name.clone(),
publisher_node: self.publisher_node.clone(),
}
}
pub fn de_wit(wit: wit::ProcessId) -> Self {
ProcessId {
process_name: wit.process_name,
@ -159,6 +166,13 @@ impl ProcessId {
publisher_node: wit.publisher_node,
}
}
pub fn de_wit_v1(wit: crate::v1::wit::ProcessId) -> Self {
ProcessId {
process_name: wit.process_name,
package_name: wit.package_name,
publisher_node: wit.publisher_node,
}
}
pub fn check(self) -> Result<Self, AddressParseError> {
check_process_id_kimap_safe(&self)?;
Ok(self)
@ -335,6 +349,12 @@ impl Address {
process: self.process.en_wit_v0(),
}
}
pub fn en_wit_v1(&self) -> crate::v1::wit::Address {
crate::v1::wit::Address {
node: self.node.clone(),
process: self.process.en_wit_v1(),
}
}
pub fn de_wit(wit: wit::Address) -> Address {
Address {
node: wit.node,
@ -355,6 +375,16 @@ impl Address {
},
}
}
pub fn de_wit_v1(wit: crate::v1::wit::Address) -> Address {
Address {
node: wit.node,
process: ProcessId {
process_name: wit.process.process_name,
package_name: wit.process.package_name,
publisher_node: wit.process.publisher_node,
},
}
}
pub fn check(self) -> Result<Self, AddressParseError> {
if !is_kimap_safe(&self.node) {
return Err(AddressParseError::NodeNotKimapSafe(self.node.clone()));
@ -641,6 +671,24 @@ impl OnExit {
}
}
pub fn en_wit_v1(&self) -> crate::v1::wit::OnExit {
match self {
OnExit::None => crate::v1::wit::OnExit::None,
OnExit::Restart => crate::v1::wit::OnExit::Restart,
OnExit::Requests(reqs) => crate::v1::wit::OnExit::Requests(
reqs.iter()
.map(|(address, request, blob)| {
(
address.en_wit_v1(),
en_wit_request_v1(request.clone()),
en_wit_blob_v1(blob.clone()),
)
})
.collect(),
),
}
}
pub fn de_wit(wit: wit::OnExit) -> Self {
match wit {
wit::OnExit::None => OnExit::None,
@ -676,6 +724,24 @@ impl OnExit {
),
}
}
pub fn de_wit_v1(wit: crate::v1::wit::OnExit) -> Self {
match wit {
crate::v1::wit::OnExit::None => OnExit::None,
crate::v1::wit::OnExit::Restart => OnExit::Restart,
crate::v1::wit::OnExit::Requests(reqs) => OnExit::Requests(
reqs.into_iter()
.map(|(address, request, blob)| {
(
Address::de_wit_v1(address),
de_wit_request_v1(request),
de_wit_blob_v1(blob),
)
})
.collect(),
),
}
}
}
impl std::fmt::Display for OnExit {
@ -783,8 +849,8 @@ pub fn de_wit_request(wit: wit::Request) -> Request {
metadata: wit.metadata,
capabilities: wit
.capabilities
.iter()
.map(|cap| de_wit_capability(cap.clone()))
.into_iter()
.map(|cap| de_wit_capability(cap))
.collect(),
}
}
@ -797,8 +863,22 @@ pub fn de_wit_request_v0(wit: crate::v0::wit::Request) -> Request {
metadata: wit.metadata,
capabilities: wit
.capabilities
.iter()
.map(|cap| de_wit_capability_v0(cap.clone()))
.into_iter()
.map(|cap| de_wit_capability_v0(cap))
.collect(),
}
}
pub fn de_wit_request_v1(wit: crate::v1::wit::Request) -> Request {
Request {
inherit: wit.inherit,
expects_response: wit.expects_response,
body: wit.body,
metadata: wit.metadata,
capabilities: wit
.capabilities
.into_iter()
.map(|cap| de_wit_capability_v1(cap))
.collect(),
}
}
@ -811,8 +891,8 @@ pub fn en_wit_request(request: Request) -> wit::Request {
metadata: request.metadata,
capabilities: request
.capabilities
.iter()
.map(|cap| en_wit_capability(cap.clone()))
.into_iter()
.map(|cap| en_wit_capability(cap))
.collect(),
}
}
@ -825,8 +905,22 @@ pub fn en_wit_request_v0(request: Request) -> crate::v0::wit::Request {
metadata: request.metadata,
capabilities: request
.capabilities
.iter()
.map(|cap| en_wit_capability_v0(cap.clone()))
.into_iter()
.map(|cap| en_wit_capability_v0(cap))
.collect(),
}
}
pub fn en_wit_request_v1(request: Request) -> crate::v1::wit::Request {
crate::v1::wit::Request {
inherit: request.inherit,
expects_response: request.expects_response,
body: request.body,
metadata: request.metadata,
capabilities: request
.capabilities
.into_iter()
.map(|cap| en_wit_capability_v1(cap))
.collect(),
}
}
@ -838,8 +932,8 @@ pub fn de_wit_response(wit: wit::Response) -> Response {
metadata: wit.metadata,
capabilities: wit
.capabilities
.iter()
.map(|cap| de_wit_capability(cap.clone()))
.into_iter()
.map(|cap| de_wit_capability(cap))
.collect(),
}
}
@ -851,8 +945,21 @@ pub fn de_wit_response_v0(wit: crate::v0::wit::Response) -> Response {
metadata: wit.metadata,
capabilities: wit
.capabilities
.iter()
.map(|cap| de_wit_capability_v0(cap.clone()))
.into_iter()
.map(|cap| de_wit_capability_v0(cap))
.collect(),
}
}
pub fn de_wit_response_v1(wit: crate::v1::wit::Response) -> Response {
Response {
inherit: wit.inherit,
body: wit.body,
metadata: wit.metadata,
capabilities: wit
.capabilities
.into_iter()
.map(|cap| de_wit_capability_v1(cap))
.collect(),
}
}
@ -864,8 +971,8 @@ pub fn en_wit_response(response: Response) -> wit::Response {
metadata: response.metadata,
capabilities: response
.capabilities
.iter()
.map(|cap| en_wit_capability(cap.clone()))
.into_iter()
.map(|cap| en_wit_capability(cap))
.collect(),
}
}
@ -877,8 +984,21 @@ pub fn en_wit_response_v0(response: Response) -> crate::v0::wit::Response {
metadata: response.metadata,
capabilities: response
.capabilities
.iter()
.map(|cap| en_wit_capability_v0(cap.clone()))
.into_iter()
.map(|cap| en_wit_capability_v0(cap))
.collect(),
}
}
pub fn en_wit_response_v1(response: Response) -> crate::v1::wit::Response {
crate::v1::wit::Response {
inherit: response.inherit,
body: response.body,
metadata: response.metadata,
capabilities: response
.capabilities
.into_iter()
.map(|cap| en_wit_capability_v1(cap))
.collect(),
}
}
@ -903,6 +1023,16 @@ pub fn de_wit_blob_v0(wit: Option<crate::v0::wit::LazyLoadBlob>) -> Option<LazyL
}
}
pub fn de_wit_blob_v1(wit: Option<crate::v1::wit::LazyLoadBlob>) -> Option<LazyLoadBlob> {
match wit {
None => None,
Some(wit) => Some(LazyLoadBlob {
mime: wit.mime,
bytes: wit.bytes,
}),
}
}
pub fn en_wit_blob(load: Option<LazyLoadBlob>) -> Option<wit::LazyLoadBlob> {
match load {
None => None,
@ -923,6 +1053,16 @@ pub fn en_wit_blob_v0(load: Option<LazyLoadBlob>) -> Option<crate::v0::wit::Lazy
}
}
pub fn en_wit_blob_v1(load: Option<LazyLoadBlob>) -> Option<crate::v1::wit::LazyLoadBlob> {
match load {
None => None,
Some(load) => Some(crate::v1::wit::LazyLoadBlob {
mime: load.mime,
bytes: load.bytes,
}),
}
}
pub fn de_wit_capability(wit: wit::Capability) -> (Capability, Vec<u8>) {
(
Capability {
@ -957,17 +1097,41 @@ pub fn de_wit_capability_v0(wit: crate::v0::wit::Capability) -> (Capability, Vec
)
}
pub fn de_wit_capability_v1(wit: crate::v1::wit::Capability) -> (Capability, Vec<u8>) {
(
Capability {
issuer: Address {
node: wit.issuer.node,
process: ProcessId {
process_name: wit.issuer.process.process_name,
package_name: wit.issuer.process.package_name,
publisher_node: wit.issuer.process.publisher_node,
},
},
params: wit.params,
},
vec![],
)
}
pub fn en_wit_capability(cap: (Capability, Vec<u8>)) -> wit::Capability {
wit::Capability {
issuer: cap.0.issuer.en_wit(),
params: cap.0.params.to_string(),
params: cap.0.params,
}
}
pub fn en_wit_capability_v0(cap: (Capability, Vec<u8>)) -> crate::v0::wit::Capability {
crate::v0::wit::Capability {
issuer: cap.0.issuer.en_wit_v0(),
params: cap.0.params.to_string(),
params: cap.0.params,
}
}
pub fn en_wit_capability_v1(cap: (Capability, Vec<u8>)) -> crate::v1::wit::Capability {
crate::v1::wit::Capability {
issuer: cap.0.issuer.en_wit_v1(),
params: cap.0.params,
}
}
@ -989,6 +1153,15 @@ pub fn en_wit_message_v0(message: Message) -> crate::v0::wit::Message {
}
}
pub fn en_wit_message_v1(message: Message) -> crate::v1::wit::Message {
match message {
Message::Request(request) => crate::v1::wit::Message::Request(en_wit_request_v1(request)),
Message::Response((response, context)) => {
crate::v1::wit::Message::Response((en_wit_response_v1(response), context))
}
}
}
pub fn en_wit_send_error(error: SendError) -> wit::SendError {
wit::SendError {
kind: en_wit_send_error_kind(error.kind),
@ -1006,6 +1179,15 @@ pub fn en_wit_send_error_v0(error: SendError) -> crate::v0::wit::SendError {
}
}
pub fn en_wit_send_error_v1(error: SendError) -> crate::v1::wit::SendError {
crate::v1::wit::SendError {
kind: en_wit_send_error_kind_v1(error.kind),
target: error.target.en_wit_v1(),
message: en_wit_message_v1(error.message),
lazy_load_blob: en_wit_blob_v1(error.lazy_load_blob),
}
}
pub fn en_wit_send_error_kind(kind: SendErrorKind) -> wit::SendErrorKind {
match kind {
SendErrorKind::Offline => wit::SendErrorKind::Offline,
@ -1020,6 +1202,13 @@ pub fn en_wit_send_error_kind_v0(kind: SendErrorKind) -> crate::v0::wit::SendErr
}
}
pub fn en_wit_send_error_kind_v1(kind: SendErrorKind) -> crate::v1::wit::SendErrorKind {
match kind {
SendErrorKind::Offline => crate::v1::wit::SendErrorKind::Offline,
SendErrorKind::Timeout => crate::v1::wit::SendErrorKind::Timeout,
}
}
//
// END SYNC with process_lib
//

View File

@ -12,12 +12,16 @@ pub mod types {
pub use kinode::process;
pub use kinode::process::standard as wit;
// can remove in 1.0!
wasmtime::component::bindgen!({
path: "wit-v0.7.0",
world: "process",
async: true,
});
// can remove in 1.0!
pub mod v0 {
pub use kinode::process;
pub use kinode::process::standard as wit;
@ -27,3 +31,13 @@ pub mod v0 {
async: true,
});
}
pub mod v1 {
pub use kinode::process;
pub use kinode::process::standard as wit;
wasmtime::component::bindgen!({
path: "wit-v1.0.0",
world: "process-v1",
async: true,
});
}