mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-11-26 11:53:31 +03:00
kernel: adjust message queueing to only ingest responses/errors for which we have an outstanding request in context map
This commit is contained in:
parent
b4c44cc79f
commit
1d7043600e
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -3227,7 +3227,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "kit"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/kinode-dao/kit?rev=25b098f#25b098fab136387065d6058162d33c727d277ab8"
|
||||
source = "git+https://github.com/kinode-dao/kit?rev=0c43430#0c434306fdce55e11d3309959fc4a0fe6ae28def"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64 0.21.7",
|
||||
|
@ -112,11 +112,7 @@ impl ProcessState {
|
||||
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
|
||||
let res = match self.message_queue.pop_front() {
|
||||
Some(message_from_queue) => message_from_queue,
|
||||
None => self
|
||||
.recv_in_process
|
||||
.recv()
|
||||
.await
|
||||
.expect("fatal: process couldn't receive next message"),
|
||||
None => self.ingest_message().await,
|
||||
};
|
||||
self.kernel_message_to_process_receive(res)
|
||||
}
|
||||
@ -138,11 +134,7 @@ impl ProcessState {
|
||||
}
|
||||
// next, wait for the awaited message to arrive
|
||||
loop {
|
||||
let res = self
|
||||
.recv_in_process
|
||||
.recv()
|
||||
.await
|
||||
.expect("fatal: process couldn't receive next message");
|
||||
let res = self.ingest_message().await;
|
||||
let id = match &res {
|
||||
Ok(km) => km.id,
|
||||
Err(e) => e.id,
|
||||
@ -155,6 +147,131 @@ impl ProcessState {
|
||||
}
|
||||
}
|
||||
|
||||
/// ingest next valid message from kernel.
|
||||
/// cancel any timeout task associated with this message.
|
||||
/// if the message is a response, only enqueue if we have an outstanding request for it.
|
||||
async fn ingest_message(&mut self) -> Result<t::KernelMessage, t::WrappedSendError> {
|
||||
loop {
|
||||
let message = self
|
||||
.recv_in_process
|
||||
.recv()
|
||||
.await
|
||||
.expect("fatal: process couldn't receive next message");
|
||||
|
||||
match &message {
|
||||
Ok(km) => match &km.message {
|
||||
t::Message::Response(_) => {
|
||||
if let Some((_context, timeout_handle)) = self.contexts.get_mut(&km.id) {
|
||||
timeout_handle.abort();
|
||||
return message;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
return message;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
if let Some((_context, timeout_handle)) = self.contexts.get_mut(&e.id) {
|
||||
timeout_handle.abort();
|
||||
return message;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert a message from the main event loop into a result for the process to receive.
|
||||
/// If the message is a response or error, get context if we have one.
|
||||
fn kernel_message_to_process_receive(
|
||||
&mut self,
|
||||
incoming: Result<t::KernelMessage, t::WrappedSendError>,
|
||||
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
|
||||
let (mut km, context) = match incoming {
|
||||
Ok(mut km) => match km.message {
|
||||
t::Message::Request(_) => {
|
||||
self.last_blob = km.lazy_load_blob;
|
||||
km.lazy_load_blob = None;
|
||||
self.prompting_message = Some(km.clone());
|
||||
(km, None)
|
||||
}
|
||||
t::Message::Response(_) => match self.contexts.remove(&km.id) {
|
||||
Some((context, _timeout_handle)) => {
|
||||
self.last_blob = km.lazy_load_blob;
|
||||
km.lazy_load_blob = None;
|
||||
self.prompting_message = context.prompting_message;
|
||||
(km, context.context)
|
||||
}
|
||||
None => {
|
||||
self.last_blob = km.lazy_load_blob;
|
||||
km.lazy_load_blob = None;
|
||||
self.prompting_message = Some(km.clone());
|
||||
(km, None)
|
||||
}
|
||||
},
|
||||
},
|
||||
Err(e) => match self.contexts.remove(&e.id) {
|
||||
None => return Err((t::en_wit_send_error(e.error), None)),
|
||||
Some((context, _timeout_handle)) => {
|
||||
self.prompting_message = context.prompting_message;
|
||||
return Err((t::en_wit_send_error(e.error), context.context));
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
let pk = signature::UnparsedPublicKey::new(
|
||||
&signature::ED25519,
|
||||
self.keypair.as_ref().public_key(),
|
||||
);
|
||||
|
||||
// prune any invalid capabilities before handing to process
|
||||
// where invalid = supposedly issued by us, but not signed properly by us
|
||||
match &mut km.message {
|
||||
t::Message::Request(request) => {
|
||||
request.capabilities.retain(|(cap, sig)| {
|
||||
// The only time we verify a cap's signature is when a foreign node
|
||||
// sends us a cap that we (allegedly) issued
|
||||
if km.source.node != self.metadata.our.node
|
||||
&& cap.issuer.node == self.metadata.our.node
|
||||
{
|
||||
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
|
||||
Ok(_) => true,
|
||||
Err(_) => false,
|
||||
}
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
}
|
||||
t::Message::Response((response, _)) => {
|
||||
response.capabilities.retain(|(cap, sig)| {
|
||||
// The only time we verify a cap's signature is when a foreign node
|
||||
// sends us a cap that we (allegedly) issued
|
||||
if km.source.node != self.metadata.our.node
|
||||
&& cap.issuer.node == self.metadata.our.node
|
||||
{
|
||||
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
|
||||
Ok(_) => true,
|
||||
Err(_) => false,
|
||||
}
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
Ok((
|
||||
km.source.en_wit(),
|
||||
match km.message {
|
||||
t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)),
|
||||
// NOTE: we throw away whatever context came from the sender, that's not ours
|
||||
t::Message::Response((response, _sent_context)) => {
|
||||
wit::Message::Response((t::en_wit_response(response), context))
|
||||
}
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
/// takes Request generated by a process and sends it to the main event loop.
|
||||
/// will only fail if process does not have capability to send to target.
|
||||
/// if the request has a timeout (expects response), start a task to track
|
||||
@ -362,99 +479,6 @@ impl ProcessState {
|
||||
.await
|
||||
.expect("fatal: kernel couldn't send response");
|
||||
}
|
||||
|
||||
/// Convert a message from the main event loop into a result for the process to receive.
|
||||
/// If the message is a response or error, get context if we have one.
|
||||
fn kernel_message_to_process_receive(
|
||||
&mut self,
|
||||
incoming: Result<t::KernelMessage, t::WrappedSendError>,
|
||||
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
|
||||
let (mut km, context) = match incoming {
|
||||
Ok(mut km) => match km.message {
|
||||
t::Message::Request(_) => {
|
||||
self.last_blob = km.lazy_load_blob;
|
||||
km.lazy_load_blob = None;
|
||||
self.prompting_message = Some(km.clone());
|
||||
(km, None)
|
||||
}
|
||||
t::Message::Response(_) => {
|
||||
if let Some((context, timeout_handle)) = self.contexts.remove(&km.id) {
|
||||
timeout_handle.abort();
|
||||
self.last_blob = km.lazy_load_blob;
|
||||
km.lazy_load_blob = None;
|
||||
self.prompting_message = context.prompting_message;
|
||||
(km, context.context)
|
||||
} else {
|
||||
self.last_blob = km.lazy_load_blob;
|
||||
km.lazy_load_blob = None;
|
||||
self.prompting_message = Some(km.clone());
|
||||
(km, None)
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => match self.contexts.remove(&e.id) {
|
||||
None => return Err((t::en_wit_send_error(e.error), None)),
|
||||
Some((context, timeout_handle)) => {
|
||||
timeout_handle.abort();
|
||||
self.prompting_message = context.prompting_message;
|
||||
return Err((t::en_wit_send_error(e.error), context.context));
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
let pk = signature::UnparsedPublicKey::new(
|
||||
&signature::ED25519,
|
||||
self.keypair.as_ref().public_key(),
|
||||
);
|
||||
|
||||
// prune any invalid capabilities before handing to process
|
||||
// where invalid = supposedly issued by us, but not signed properly by us
|
||||
match &mut km.message {
|
||||
t::Message::Request(request) => {
|
||||
request.capabilities.retain(|(cap, sig)| {
|
||||
// The only time we verify a cap's signature is when a foreign node
|
||||
// sends us a cap that we (allegedly) issued
|
||||
if km.source.node != self.metadata.our.node
|
||||
&& cap.issuer.node == self.metadata.our.node
|
||||
{
|
||||
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
|
||||
Ok(_) => true,
|
||||
Err(_) => false,
|
||||
}
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
}
|
||||
t::Message::Response((response, _)) => {
|
||||
response.capabilities.retain(|(cap, sig)| {
|
||||
// The only time we verify a cap's signature is when a foreign node
|
||||
// sends us a cap that we (allegedly) issued
|
||||
if km.source.node != self.metadata.our.node
|
||||
&& cap.issuer.node == self.metadata.our.node
|
||||
{
|
||||
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
|
||||
Ok(_) => true,
|
||||
Err(_) => false,
|
||||
}
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
Ok((
|
||||
km.source.en_wit(),
|
||||
match km.message {
|
||||
t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)),
|
||||
// NOTE: we throw away whatever context came from the sender, that's not ours
|
||||
t::Message::Response((response, _sent_context)) => {
|
||||
wit::Message::Response((t::en_wit_response(response), context))
|
||||
}
|
||||
},
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// create a specific process, and generate a task that will run it.
|
||||
|
Loading…
Reference in New Issue
Block a user