Merge pull request #249 from kinode-dao/dr/drop-unexpected-responses

kernel: in a process, only ingest responses/errors which have an outstanding request in contexts map
This commit is contained in:
doria 2024-02-13 15:56:38 -03:00 committed by GitHub
commit 6672c28b0c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -112,11 +112,7 @@ impl ProcessState {
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let res = match self.message_queue.pop_front() {
Some(message_from_queue) => message_from_queue,
None => self
.recv_in_process
.recv()
.await
.expect("fatal: process couldn't receive next message"),
None => self.ingest_message().await,
};
self.kernel_message_to_process_receive(res)
}
@ -138,11 +134,7 @@ impl ProcessState {
}
// next, wait for the awaited message to arrive
loop {
let res = self
.recv_in_process
.recv()
.await
.expect("fatal: process couldn't receive next message");
let res = self.ingest_message().await;
let id = match &res {
Ok(km) => km.id,
Err(e) => e.id,
@ -155,6 +147,131 @@ impl ProcessState {
}
}
/// ingest next valid message from kernel.
/// cancel any timeout task associated with this message.
/// if the message is a response, only enqueue if we have an outstanding request for it.
async fn ingest_message(&mut self) -> Result<t::KernelMessage, t::WrappedSendError> {
loop {
let message = self
.recv_in_process
.recv()
.await
.expect("fatal: process couldn't receive next message");
match &message {
Ok(km) => match &km.message {
t::Message::Response(_) => {
if let Some((_context, timeout_handle)) = self.contexts.get_mut(&km.id) {
timeout_handle.abort();
return message;
}
}
_ => {
return message;
}
},
Err(e) => {
if let Some((_context, timeout_handle)) = self.contexts.get_mut(&e.id) {
timeout_handle.abort();
return message;
}
}
}
}
}
/// Convert a message from the main event loop into a result for the process to receive.
/// If the message is a response or error, get context if we have one.
fn kernel_message_to_process_receive(
&mut self,
incoming: Result<t::KernelMessage, t::WrappedSendError>,
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let (mut km, context) = match incoming {
Ok(mut km) => match km.message {
t::Message::Request(_) => {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(km, None)
}
t::Message::Response(_) => match self.contexts.remove(&km.id) {
Some((context, _timeout_handle)) => {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = context.prompting_message;
(km, context.context)
}
None => {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(km, None)
}
},
},
Err(e) => match self.contexts.remove(&e.id) {
None => return Err((t::en_wit_send_error(e.error), None)),
Some((context, _timeout_handle)) => {
self.prompting_message = context.prompting_message;
return Err((t::en_wit_send_error(e.error), context.context));
}
},
};
let pk = signature::UnparsedPublicKey::new(
&signature::ED25519,
self.keypair.as_ref().public_key(),
);
// prune any invalid capabilities before handing to process
// where invalid = supposedly issued by us, but not signed properly by us
match &mut km.message {
t::Message::Request(request) => {
request.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
t::Message::Response((response, _)) => {
response.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
};
Ok((
km.source.en_wit(),
match km.message {
t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)),
// NOTE: we throw away whatever context came from the sender, that's not ours
t::Message::Response((response, _sent_context)) => {
wit::Message::Response((t::en_wit_response(response), context))
}
},
))
}
/// takes Request generated by a process and sends it to the main event loop.
/// will only fail if process does not have capability to send to target.
/// if the request has a timeout (expects response), start a task to track
@ -362,99 +479,6 @@ impl ProcessState {
.await
.expect("fatal: kernel couldn't send response");
}
/// Convert a message from the main event loop into a result for the process to receive.
/// If the message is a response or error, get context if we have one.
fn kernel_message_to_process_receive(
&mut self,
incoming: Result<t::KernelMessage, t::WrappedSendError>,
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let (mut km, context) = match incoming {
Ok(mut km) => match km.message {
t::Message::Request(_) => {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(km, None)
}
t::Message::Response(_) => {
if let Some((context, timeout_handle)) = self.contexts.remove(&km.id) {
timeout_handle.abort();
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = context.prompting_message;
(km, context.context)
} else {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(km, None)
}
}
},
Err(e) => match self.contexts.remove(&e.id) {
None => return Err((t::en_wit_send_error(e.error), None)),
Some((context, timeout_handle)) => {
timeout_handle.abort();
self.prompting_message = context.prompting_message;
return Err((t::en_wit_send_error(e.error), context.context));
}
},
};
let pk = signature::UnparsedPublicKey::new(
&signature::ED25519,
self.keypair.as_ref().public_key(),
);
// prune any invalid capabilities before handing to process
// where invalid = supposedly issued by us, but not signed properly by us
match &mut km.message {
t::Message::Request(request) => {
request.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
t::Message::Response((response, _)) => {
response.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
};
Ok((
km.source.en_wit(),
match km.message {
t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)),
// NOTE: we throw away whatever context came from the sender, that's not ours
t::Message::Response((response, _sent_context)) => {
wit::Message::Response((t::en_wit_response(response), context))
}
},
))
}
}
/// create a specific process, and generate a task that will run it.