Merge branch 'develop' into dr/app-store-auto-install

This commit is contained in:
doria 2024-02-14 13:30:38 -03:00 committed by GitHub
commit 67bde5b504
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 188 additions and 148 deletions

View File

@ -18,7 +18,7 @@
content="width=device-width, initial-scale=1, minimum-scale=1, maximum-scale=1.00001, viewport-fit=cover" content="width=device-width, initial-scale=1, minimum-scale=1, maximum-scale=1.00001, viewport-fit=cover"
/> />
<link href='https://fonts.googleapis.com/css?family=Montserrat' rel='stylesheet'> <link href='https://fonts.googleapis.com/css?family=Montserrat' rel='stylesheet'>
<script type="module" crossorigin src="/main:app_store:sys/assets/index-pkTLhk2L.js"></script> <script type="module" crossorigin src="/main:app_store:sys/assets/index-A09g5OKk.js"></script>
<link rel="stylesheet" crossorigin href="/main:app_store:sys/assets/index-aUqPNadJ.css"> <link rel="stylesheet" crossorigin href="/main:app_store:sys/assets/index-aUqPNadJ.css">
</head> </head>
<body> <body>

View File

@ -112,11 +112,7 @@ impl ProcessState {
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> { ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let res = match self.message_queue.pop_front() { let res = match self.message_queue.pop_front() {
Some(message_from_queue) => message_from_queue, Some(message_from_queue) => message_from_queue,
None => self None => self.ingest_message().await,
.recv_in_process
.recv()
.await
.expect("fatal: process couldn't receive next message"),
}; };
self.kernel_message_to_process_receive(res) self.kernel_message_to_process_receive(res)
} }
@ -138,11 +134,7 @@ impl ProcessState {
} }
// next, wait for the awaited message to arrive // next, wait for the awaited message to arrive
loop { loop {
let res = self let res = self.ingest_message().await;
.recv_in_process
.recv()
.await
.expect("fatal: process couldn't receive next message");
let id = match &res { let id = match &res {
Ok(km) => km.id, Ok(km) => km.id,
Err(e) => e.id, Err(e) => e.id,
@ -155,6 +147,131 @@ impl ProcessState {
} }
} }
/// ingest next valid message from kernel.
/// cancel any timeout task associated with this message.
/// if the message is a response, only enqueue if we have an outstanding request for it.
async fn ingest_message(&mut self) -> Result<t::KernelMessage, t::WrappedSendError> {
loop {
let message = self
.recv_in_process
.recv()
.await
.expect("fatal: process couldn't receive next message");
match &message {
Ok(km) => match &km.message {
t::Message::Response(_) => {
if let Some((_context, timeout_handle)) = self.contexts.get_mut(&km.id) {
timeout_handle.abort();
return message;
}
}
_ => {
return message;
}
},
Err(e) => {
if let Some((_context, timeout_handle)) = self.contexts.get_mut(&e.id) {
timeout_handle.abort();
return message;
}
}
}
}
}
/// Convert a message from the main event loop into a result for the process to receive.
/// If the message is a response or error, get context if we have one.
fn kernel_message_to_process_receive(
&mut self,
incoming: Result<t::KernelMessage, t::WrappedSendError>,
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let (mut km, context) = match incoming {
Ok(mut km) => match km.message {
t::Message::Request(_) => {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(km, None)
}
t::Message::Response(_) => match self.contexts.remove(&km.id) {
Some((context, _timeout_handle)) => {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = context.prompting_message;
(km, context.context)
}
None => {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(km, None)
}
},
},
Err(e) => match self.contexts.remove(&e.id) {
None => return Err((t::en_wit_send_error(e.error), None)),
Some((context, _timeout_handle)) => {
self.prompting_message = context.prompting_message;
return Err((t::en_wit_send_error(e.error), context.context));
}
},
};
let pk = signature::UnparsedPublicKey::new(
&signature::ED25519,
self.keypair.as_ref().public_key(),
);
// prune any invalid capabilities before handing to process
// where invalid = supposedly issued by us, but not signed properly by us
match &mut km.message {
t::Message::Request(request) => {
request.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
t::Message::Response((response, _)) => {
response.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
};
Ok((
km.source.en_wit(),
match km.message {
t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)),
// NOTE: we throw away whatever context came from the sender, that's not ours
t::Message::Response((response, _sent_context)) => {
wit::Message::Response((t::en_wit_response(response), context))
}
},
))
}
/// takes Request generated by a process and sends it to the main event loop. /// takes Request generated by a process and sends it to the main event loop.
/// will only fail if process does not have capability to send to target. /// will only fail if process does not have capability to send to target.
/// if the request has a timeout (expects response), start a task to track /// if the request has a timeout (expects response), start a task to track
@ -362,99 +479,6 @@ impl ProcessState {
.await .await
.expect("fatal: kernel couldn't send response"); .expect("fatal: kernel couldn't send response");
} }
/// Convert a message from the main event loop into a result for the process to receive.
/// If the message is a response or error, get context if we have one.
fn kernel_message_to_process_receive(
&mut self,
incoming: Result<t::KernelMessage, t::WrappedSendError>,
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let (mut km, context) = match incoming {
Ok(mut km) => match km.message {
t::Message::Request(_) => {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(km, None)
}
t::Message::Response(_) => {
if let Some((context, timeout_handle)) = self.contexts.remove(&km.id) {
timeout_handle.abort();
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = context.prompting_message;
(km, context.context)
} else {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(km, None)
}
}
},
Err(e) => match self.contexts.remove(&e.id) {
None => return Err((t::en_wit_send_error(e.error), None)),
Some((context, timeout_handle)) => {
timeout_handle.abort();
self.prompting_message = context.prompting_message;
return Err((t::en_wit_send_error(e.error), context.context));
}
},
};
let pk = signature::UnparsedPublicKey::new(
&signature::ED25519,
self.keypair.as_ref().public_key(),
);
// prune any invalid capabilities before handing to process
// where invalid = supposedly issued by us, but not signed properly by us
match &mut km.message {
t::Message::Request(request) => {
request.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
t::Message::Response((response, _)) => {
response.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
};
Ok((
km.source.en_wit(),
match km.message {
t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)),
// NOTE: we throw away whatever context came from the sender, that's not ours
t::Message::Response((response, _sent_context)) => {
wit::Message::Response((t::en_wit_response(response), context))
}
},
))
}
} }
/// create a specific process, and generate a task that will run it. /// create a specific process, and generate a task that will run it.

View File

@ -1,7 +1,7 @@
use anyhow::Result; use anyhow::Result;
use lib::types::core::{ use lib::types::core::{
Address, KernelMessage, Message, MessageReceiver, MessageSender, PrintSender, Printout, Address, KernelMessage, Message, MessageReceiver, MessageSender, PrintSender, Printout,
Response, TIMER_PROCESS_ID, Response, TimerAction, TIMER_PROCESS_ID,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -11,8 +11,8 @@ use serde::{Deserialize, Serialize};
/// requests made by other nodes. /// requests made by other nodes.
/// ///
/// The interface of the timer module is as follows: /// The interface of the timer module is as follows:
/// One kind of request is accepted: the IPC must be a little-endian byte-representation /// One kind of request is accepted: TimerAction::SetTimer(u64), where the u64 is the time to wait
/// of an unsigned 64-bit integer, in milliseconds. This request should always expect a Response. /// in milliseconds. This request should always expect a Response.
/// If the request does not expect a Response, the timer will not be set. /// If the request does not expect a Response, the timer will not be set.
/// ///
/// A proper Request will trigger the timer module to send a Response. The Response will be /// A proper Request will trigger the timer module to send a Response. The Response will be
@ -39,44 +39,53 @@ pub async fn timer_service(
// we only handle Requests which contain a little-endian u64 as IPC, // we only handle Requests which contain a little-endian u64 as IPC,
// except for a special "debug" message, which prints the current state // except for a special "debug" message, which prints the current state
let Message::Request(req) = km.message else { continue }; let Message::Request(req) = km.message else { continue };
if req.body == "debug".as_bytes() { let Ok(timer_action) = serde_json::from_slice::<TimerAction>(&req.body) else {
let _ = print_tx.send(Printout { let _ = print_tx.send(Printout {
verbosity: 0, verbosity: 1,
content: format!("timer service active timers ({}):", timer_map.timers.len()), content: "timer service received a request with an invalid body".to_string(),
}).await; }).await;
for (k, v) in timer_map.timers.iter() { continue
};
match timer_action {
TimerAction::Debug => {
let _ = print_tx.send(Printout { let _ = print_tx.send(Printout {
verbosity: 0, verbosity: 0,
content: format!("{}: {:?}", k, v), content: format!("timer service active timers ({}):", timer_map.timers.len()),
}).await; }).await;
for (k, v) in timer_map.timers.iter() {
let _ = print_tx.send(Printout {
verbosity: 0,
content: format!("{}: {:?}", k, v),
}).await;
}
continue
}
TimerAction::SetTimer(timer_millis) => {
// if the timer is set to pop in 0 millis, we immediately respond
// otherwise, store in our persisted map, and spawn a task that
// sleeps for the given time, then sends the response
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let pop_time = now + timer_millis;
if timer_millis == 0 {
send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await;
continue
}
let _ = print_tx.send(Printout {
verbosity: 1,
content: format!("set timer to pop in {}ms", timer_millis),
}).await;
if !timer_map.contains(pop_time) {
timer_tasks.spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(timer_millis - 1)).await;
pop_time
});
}
timer_map.insert(pop_time, km.id, km.rsvp.unwrap_or(km.source));
} }
continue
} }
let Ok(bytes): Result<[u8; 8], _> = req.body.try_into() else { continue };
let timer_millis = u64::from_le_bytes(bytes);
// if the timer is set to pop in 0 millis, we immediately respond
// otherwise, store in our persisted map, and spawn a task that
// sleeps for the given time, then sends the response
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let pop_time = now + timer_millis;
if timer_millis == 0 {
send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await;
continue
}
let _ = print_tx.send(Printout {
verbosity: 1,
content: format!("set timer to pop in {}ms", timer_millis),
}).await;
if !timer_map.contains(pop_time) {
timer_tasks.spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(timer_millis - 1)).await;
pop_time
});
}
timer_map.insert(pop_time, km.id, km.rsvp.unwrap_or(km.source));
} }
Some(Ok(time)) = timer_tasks.join_next() => { Some(Ok(time)) = timer_tasks.join_next() => {
// when a timer pops, we send the response to the process(es) that set // when a timer pops, we send the response to the process(es) that set

View File

@ -361,7 +361,8 @@ async fn handle_request(
) )
} }
VfsAction::RemoveFile => { VfsAction::RemoveFile => {
fs::remove_file(path).await?; fs::remove_file(&path).await?;
open_files.remove(&path);
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
} }
VfsAction::RemoveDir => { VfsAction::RemoveDir => {

View File

@ -1486,3 +1486,9 @@ impl From<tokio::sync::mpsc::error::SendError<CapMessage>> for SqliteError {
} }
} }
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TimerAction {
Debug,
SetTimer(u64),
}