Merge pull request #231 from kinode-dao/dr/kernel-nested-requests

kernel: refactor for clarity
This commit is contained in:
doria 2024-02-08 22:44:55 -03:00 committed by GitHub
commit 622f0eb5a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 250 additions and 248 deletions

View File

@ -43,7 +43,6 @@ enum ProcessSender {
}
/// persist kernel's process_map state for next bootup
/// and (TODO) wait for filesystem to respond in the affirmative
async fn persist_state(
our_name: &str,
send_to_loop: &t::MessageSender,
@ -76,7 +75,7 @@ async fn persist_state(
Ok(())
}
/// handle messages sent directly to kernel. source is always our own node.
/// handle commands inside messages sent directly to kernel. source is always our own node.
async fn handle_kernel_request(
our_name: String,
keypair: Arc<signature::Ed25519KeyPair>,
@ -566,7 +565,7 @@ async fn handle_kernel_request(
}
}
// double check immediate run
/// spawn a process loop and insert the process in the relevant kernel state maps
async fn start_process(
our_name: String,
keypair: Arc<signature::Ed25519KeyPair>,
@ -1164,23 +1163,15 @@ pub async fn kernel(
match process_map.get(&on) {
None => vec![],
Some(p) => {
caps.iter().filter_map(|cap| {
caps.into_iter().filter_map(|cap| {
// if issuer is message source, then sign the cap
if cap.issuer.process == on {
Some((
cap.clone(),
keypair
.sign(&rmp_serde::to_vec(&cap).unwrap())
.as_ref()
.to_vec()
))
let sig = keypair.sign(&rmp_serde::to_vec(&cap).unwrap());
Some((cap, sig.as_ref().to_vec()))
// otherwise, only attach previously saved caps
// NOTE we don't need to verify the sigs!
} else {
match p.capabilities.get(cap) {
None => None,
Some(sig) => Some((cap.clone(), sig.clone()))
}
p.capabilities.get(&cap).map(|sig| (cap, sig.clone()))
}
}).collect()
},

View File

@ -1,8 +1,10 @@
use crate::kernel::{ProcessMessageReceiver, ProcessMessageSender};
use crate::KERNEL_PROCESS_ID;
use anyhow::Result;
//pub use kinode::process::standard as wit;
//pub use kinode::process::standard::Host as StandardHost;
use lib::types::core as t;
pub use lib::wit;
pub use lib::wit::Host as StandardHost;
pub use lib::Process;
use ring::signature::{self, KeyPair};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
@ -11,28 +13,42 @@ use wasmtime::component::*;
use wasmtime::{Engine, Store};
use wasmtime_wasi::preview2::{pipe::MemoryOutputPipe, Table, WasiCtx, WasiCtxBuilder, WasiView};
use lib::types::core as t;
pub use lib::wit;
pub use lib::wit::Host as StandardHost;
pub use lib::Process;
const STACK_TRACE_SIZE: usize = 5000;
// bindgen!({
// path: "wit",
// world: "process",
// async: true,
// });
pub struct ProcessContext {
// store predecessor in order to set prompting message when popped
pub prompting_message: Option<t::KernelMessage>,
// can be empty if a request doesn't set context, but still needs to inherit
pub context: Option<t::Context>,
}
pub struct ProcessState {
/// our node's networking keypair
pub keypair: Arc<signature::Ed25519KeyPair>,
/// information about ourself
pub metadata: t::ProcessMetadata,
/// pipe from which we get messages from the main event loop
pub recv_in_process: ProcessMessageReceiver,
/// pipe to send messages to ourself (received in `recv_in_process`)
pub self_sender: ProcessMessageSender,
/// pipe for sending messages to the main event loop
pub send_to_loop: t::MessageSender,
/// pipe for sending [`t::Printout`]s to the terminal
pub send_to_terminal: t::PrintSender,
/// store the current incoming message that we've gotten from receive(), if it
/// is a request. if it is a response, the context map will be used to set this
/// as the message it was when the outgoing request for that response was made.
/// however, the blob stored here will **always** be the blob of the last message
/// received from the event loop.
/// the prompting_message won't have a blob, rather it is stored in last_blob.
pub prompting_message: Option<t::KernelMessage>,
pub last_blob: Option<t::LazyLoadBlob>,
pub contexts: HashMap<u64, (t::ProcessContext, JoinHandle<()>)>,
/// store the contexts and timeout task of all outstanding requests
pub contexts: HashMap<u64, (ProcessContext, JoinHandle<()>)>,
/// store the messages that we've gotten from event loop but haven't processed yet
/// TODO make this an ordered map for O(1) retrieval by ID
pub message_queue: VecDeque<Result<t::KernelMessage, t::WrappedSendError>>,
/// pipe for getting info about capabilities
pub caps_oracle: t::CapMessageSender,
}
@ -57,8 +73,6 @@ impl WasiView for ProcessWasi {
}
}
const STACK_TRACE_SIZE: usize = 5000;
pub async fn send_and_await_response(
process: &mut ProcessWasi,
source: Option<t::Address>,
@ -91,41 +105,78 @@ pub async fn send_and_await_response(
}
impl ProcessState {
/// Ingest latest message directed to this process, and mark it as the prompting message.
/// Ingest latest message directed to this process, and save it as the current message.
/// If there is no message in the queue, wait async until one is received.
/// The message will only be saved as the prompting-message if it's a Request.
pub async fn get_next_message_for_process(
&mut self,
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let res = match self.message_queue.pop_front() {
Some(message_from_queue) => message_from_queue,
None => self.recv_in_process.recv().await.unwrap(),
None => self
.recv_in_process
.recv()
.await
.expect("fatal: process couldn't receive next message"),
};
self.kernel_message_to_process_receive(res)
}
/// instead of ingesting latest, wait for a specific ID and queue all others
async fn get_specific_message_for_process(
&mut self,
awaited_message_id: u64,
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
// first, check if the awaited message is already in the queue and handle if so
for (i, message) in self.message_queue.iter().enumerate() {
match message {
Ok(ref km) if km.id == awaited_message_id => {
let km = self.message_queue.remove(i).unwrap();
return self.kernel_message_to_process_receive(km);
}
_ => continue,
}
}
// next, wait for the awaited message to arrive
loop {
let res = self
.recv_in_process
.recv()
.await
.expect("fatal: process couldn't receive next message");
let id = match &res {
Ok(km) => km.id,
Err(e) => e.id,
};
if id == awaited_message_id {
return self.kernel_message_to_process_receive(res);
} else {
self.message_queue.push_back(res);
}
}
}
/// takes Request generated by a process and sends it to the main event loop.
/// will only fail if process does not have capability to send to target.
/// if the request has a timeout (expects response), start a task to track
/// that timeout and return timeout error if it expires.
pub async fn send_request(
&mut self,
fake_source: Option<t::Address>, // only used when kernel steps in to get/set state
// only used when kernel steps in to get/set state
fake_source: Option<t::Address>,
target: wit::Address,
request: wit::Request,
new_context: Option<wit::Context>,
blob: Option<wit::LazyLoadBlob>,
) -> Result<u64> {
let source = match &fake_source {
Some(_) => fake_source.unwrap(),
None => self.metadata.our.clone(),
};
// if request chooses to inherit context, match id to prompting_message
let source = fake_source.unwrap_or(self.metadata.our.clone());
let mut request = t::de_wit_request(request);
// if request chooses to inherit, it means to take the ID and lazy_load_blob,
// if any, from the last message it ingested
// if request chooses to inherit, match id to precedessor
// otherwise, id is generated randomly
let request_id: u64 = if request.inherit
&& request.expects_response.is_none()
&& self.prompting_message.is_some()
{
let request_id: u64 = if request.inherit && self.prompting_message.is_some() {
self.prompting_message.as_ref().unwrap().id
} else {
loop {
@ -136,6 +187,8 @@ impl ProcessState {
}
};
// if a blob is provided, it will be used; otherwise, if inherit is true,
// and a predecessor exists, its blob will be used; otherwise, no blob will be used.
let blob = match blob {
Some(p) => Some(t::LazyLoadBlob {
mime: p.mime,
@ -147,70 +200,46 @@ impl ProcessState {
},
};
let mut inner_request = t::de_wit_request(request.clone());
if !request.capabilities.is_empty() {
request.capabilities = {
let (tx, rx) = tokio::sync::oneshot::channel();
self.caps_oracle
.send(t::CapMessage::FilterCaps {
on: self.metadata.our.process.clone(),
caps: request
.capabilities
.into_iter()
.map(|(cap, _)| cap)
.collect(),
responder: tx,
})
.await
.expect("fatal: process couldn't access capabilities oracle");
rx.await
.expect("fatal: process couldn't receive capabilities")
};
}
inner_request.capabilities = {
let (tx, rx) = tokio::sync::oneshot::channel();
self.caps_oracle
.send(t::CapMessage::FilterCaps {
on: self.metadata.our.process.clone(),
caps: request
.capabilities
.iter()
.map(|cap| t::de_wit_capability(cap.clone()).0)
.collect(),
responder: tx,
})
.await?;
rx.await?
};
// rsvp is set if there was a Request expecting Response
// followed by inheriting Request(s) not expecting Response;
// this is done such that the ultimate request handler knows that,
// in fact, a Response *is* expected.
// could also be None if entire chain of Requests are
// not expecting Response
let kernel_message = t::KernelMessage {
id: request_id,
source: source.clone(),
target: t::Address::de_wit(target.clone()),
rsvp: match (
request.inherit,
request.expects_response,
&self.prompting_message,
) {
// this request expects response, so receives any response
// make sure to use the real source, not a fake injected-by-kernel source
(_, Some(_), _) => Some(self.metadata.our.clone()),
// this request inherits, so response will be routed to prompting message
(true, None, Some(ref prompt)) => prompt.rsvp.clone(),
// this request doesn't inherit, and doesn't itself want a response
(false, None, _) => None,
// no rsvp because neither prompting message nor this request wants a response
(_, None, None) => None,
},
message: t::Message::Request(inner_request),
lazy_load_blob: blob.clone(),
};
// modify the process' context map as needed.
// if there is a prompting message, we need to store the ultimate
// even if there is no new context string.
// TODO optimize this significantly
// if the request expects a response, modify the process' context map as needed
// and set a timer.
// TODO optimize this SIGNIFICANTLY: stop spawning tasks
// and use a global clock + garbage collect step to check for timeouts
if let Some(timeout_secs) = request.expects_response {
let this_request = request.clone();
let this_blob = blob.clone();
let self_sender = self.self_sender.clone();
let original_target = t::Address::de_wit(target.clone());
let timeout_handle = tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await;
let _ = self_sender
.send(Err(t::WrappedSendError {
id: request_id,
source: t::Address::de_wit(target.clone()), // TODO check this
source: original_target.clone(),
error: t::SendError {
kind: t::SendErrorKind::Timeout,
target: t::Address::de_wit(target),
message: t::Message::Request(t::de_wit_request(request.clone())),
lazy_load_blob: blob,
target: original_target,
message: t::Message::Request(this_request),
lazy_load_blob: this_blob,
},
}))
.await;
@ -218,12 +247,8 @@ impl ProcessState {
self.contexts.insert(
request_id,
(
t::ProcessContext {
prompting_message: if self.prompting_message.is_some() {
self.prompting_message.clone()
} else {
None
},
ProcessContext {
prompting_message: self.prompting_message.clone(),
context: new_context,
},
timeout_handle,
@ -231,6 +256,34 @@ impl ProcessState {
);
}
// rsvp is set based on this priority:
// 1. whether this request expects a response -- if so, rsvp = our address, always
// 2. whether this request inherits -- if so, rsvp = prompting message's rsvp
// 3. if neither, rsvp = None
let kernel_message = t::KernelMessage {
id: request_id,
source,
target: t::Address::de_wit(target),
rsvp: match (
request.expects_response,
request.inherit,
&self.prompting_message,
) {
(Some(_), _, _) => {
// this request expects response, so receives any response
// make sure to use the real source, not a fake injected-by-kernel source
Some(self.metadata.our.clone())
}
(None, true, Some(ref prompt)) => {
// this request inherits, so response will be routed to prompting message
prompt.rsvp.clone()
}
_ => None,
},
message: t::Message::Request(request),
lazy_load_blob: blob,
};
self.send_to_loop
.send(kernel_message)
.await
@ -245,43 +298,53 @@ impl ProcessState {
response: wit::Response,
blob: Option<wit::LazyLoadBlob>,
) {
let (id, target) = match self.make_response_id_target().await {
Some(r) => r,
None => {
let _ = self
.send_to_terminal
.send(t::Printout {
verbosity: 2,
content: format!("kernel: dropping Response {:?}", response),
})
.await;
return;
}
let mut response = t::de_wit_response(response);
// the process requires a prompting_message in order to issue a response
let Some(ref prompting_message) = self.prompting_message else {
print(
&self.send_to_terminal,
0,
format!("kernel: need non-None prompting_message to handle Response {response:?}"),
)
.await;
return;
};
// given the current process state, produce the id and target that
// a response it emits should have.
let (id, target) = (
prompting_message.id,
match &prompting_message.rsvp {
None => prompting_message.source.clone(),
Some(rsvp) => rsvp.clone(),
},
);
let blob = match response.inherit {
true => self.last_blob.clone(),
false => t::de_wit_blob(blob),
};
let mut inner_response = t::de_wit_response(response.clone());
inner_response.capabilities = {
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = self
.caps_oracle
.send(t::CapMessage::FilterCaps {
on: self.metadata.our.process.clone(),
caps: response
.capabilities
.iter()
.map(|cap| t::de_wit_capability(cap.clone()).0)
.collect(),
responder: tx,
})
.await;
rx.await.expect("fatal: process couldn't get caps")
};
if !response.capabilities.is_empty() {
response.capabilities = {
let (tx, rx) = tokio::sync::oneshot::channel();
self.caps_oracle
.send(t::CapMessage::FilterCaps {
on: self.metadata.our.process.clone(),
caps: response
.capabilities
.into_iter()
.map(|(cap, _)| cap)
.collect(),
responder: tx,
})
.await
.expect("fatal: process couldn't access capabilities oracle");
rx.await
.expect("fatal: process couldn't receive capabilities")
};
}
self.send_to_loop
.send(t::KernelMessage {
@ -290,7 +353,7 @@ impl ProcessState {
target,
rsvp: None,
message: t::Message::Response((
inner_response,
response,
// the context will be set by the process receiving this Response.
None,
)),
@ -300,63 +363,32 @@ impl ProcessState {
.expect("fatal: kernel couldn't send response");
}
/// instead of ingesting latest, wait for a specific ID and queue all others
async fn get_specific_message_for_process(
&mut self,
awaited_message_id: u64,
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
// first, check if the awaited message is already in the queue and handle if so
for (i, message) in self.message_queue.iter().enumerate() {
match message {
Ok(ref km) if km.id == awaited_message_id => {
let km = self.message_queue.remove(i).unwrap();
return self.kernel_message_to_process_receive(km.clone());
}
_ => continue,
}
}
// next, wait for the awaited message to arrive
loop {
let res = self.recv_in_process.recv().await.unwrap();
match res {
Ok(ref km) if km.id == awaited_message_id => {
return self.kernel_message_to_process_receive(Ok(km.clone()))
}
Ok(km) => self.message_queue.push_back(Ok(km)),
Err(e) if e.id == awaited_message_id => {
return self.kernel_message_to_process_receive(Err(e))
}
Err(e) => self.message_queue.push_back(Err(e)),
}
}
}
/// convert a message from the main event loop into a result for the process to receive
/// if the message is a response or error, get context if we have one
/// Convert a message from the main event loop into a result for the process to receive.
/// If the message is a response or error, get context if we have one.
fn kernel_message_to_process_receive(
&mut self,
res: Result<t::KernelMessage, t::WrappedSendError>,
incoming: Result<t::KernelMessage, t::WrappedSendError>,
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let (context, km) = match res {
Ok(km) => match &km.message {
let (mut km, context) = match incoming {
Ok(mut km) => match km.message {
t::Message::Request(_) => {
self.last_blob = km.lazy_load_blob.clone();
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(None, km)
(km, None)
}
t::Message::Response(_) => {
if let Some((context, timeout_handle)) = self.contexts.remove(&km.id) {
timeout_handle.abort();
self.last_blob = km.lazy_load_blob.clone();
self.prompting_message = match context.prompting_message {
None => Some(km.clone()),
Some(prompting_message) => Some(prompting_message),
};
(context.context, km)
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = context.prompting_message;
(km, context.context)
} else {
self.last_blob = km.lazy_load_blob.clone();
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(None, km)
(km, None)
}
}
},
@ -375,74 +407,54 @@ impl ProcessState {
self.keypair.as_ref().public_key(),
);
// prune any invalid capabilities before handing to process
// where invalid = supposedly issued by us, but not signed properly by us
match &mut km.message {
t::Message::Request(request) => {
request.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
t::Message::Response((response, _)) => {
response.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
};
Ok((
km.source.en_wit(),
match km.message {
t::Message::Request(mut request) => {
// prune any invalid caps before sending
request.capabilities = request
.capabilities
.iter()
.filter_map(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => Some((cap.clone(), sig.clone())),
Err(_) => None,
}
} else {
return Some((cap.clone(), sig.clone()));
}
})
.collect::<Vec<(t::Capability, Vec<u8>)>>();
wit::Message::Request(t::en_wit_request(request))
}
t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)),
// NOTE: we throw away whatever context came from the sender, that's not ours
t::Message::Response((mut response, _context)) => {
// prune any invalid caps before sending
response.capabilities = response
.capabilities
.iter()
.filter_map(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => Some((cap.clone(), sig.clone())),
Err(_) => None,
}
} else {
return Some((cap.clone(), sig.clone()));
}
})
.collect::<Vec<(t::Capability, Vec<u8>)>>();
t::Message::Response((response, _sent_context)) => {
wit::Message::Response((t::en_wit_response(response), context))
}
},
))
}
/// Given the current process state, return the id and target that
/// a response it emits should have. This takes into
/// account the `rsvp` of the prompting message, if any.
async fn make_response_id_target(&self) -> Option<(u64, t::Address)> {
let Some(ref prompting_message) = self.prompting_message else {
println!("need non-None prompting_message to handle Response");
return None;
};
Some((
prompting_message.id,
match &prompting_message.rsvp {
None => prompting_message.source.clone(),
Some(address) => address.clone(),
},
))
}
}
/// create a specific process, and generate a task that will run it.
@ -763,3 +775,10 @@ pub async fn make_process_loop(
}
Ok(())
}
async fn print(sender: &t::PrintSender, verbosity: u8, content: String) {
let _ = sender
.send(t::Printout { verbosity, content })
.await
.expect("fatal: kernel terminal print pipe died!");
}

View File

@ -1,6 +1,4 @@
use crate::kernel::process;
//use crate::kernel::process::kinode::process::standard as wit;
//use crate::kernel::process::StandardHost;
use crate::KERNEL_PROCESS_ID;
use crate::VFS_PROCESS_ID;
use anyhow::Result;
@ -27,6 +25,7 @@ impl StandardHost for process::ProcessWasi {
//
// system utils:
//
async fn print_to_terminal(&mut self, verbosity: u8, content: String) -> Result<()> {
self.process
.send_to_terminal
@ -376,6 +375,7 @@ impl StandardHost for process::ProcessWasi {
print_debug(&self.process, "spawned a new process").await;
Ok(Ok(new_process_id.en_wit().to_owned()))
}
//
// capabilities management
//

View File

@ -1042,14 +1042,6 @@ impl std::fmt::Display for PersistedProcess {
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ProcessContext {
// store ultimate in order to set prompting message if needed
pub prompting_message: Option<KernelMessage>,
// can be empty if a request doesn't set context, but still needs to inherit
pub context: Option<Context>,
}
pub type PackageVersion = (u32, u32, u32);
/// the type that gets deserialized from `metadata.json` in a package