mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-29 03:21:33 +03:00
Merge branch 'develop' into da/metadata
This commit is contained in:
commit
6af15a0f26
@ -189,8 +189,38 @@ fn serve_paths(
|
|||||||
}
|
}
|
||||||
Method::PUT => {
|
Method::PUT => {
|
||||||
// update an app
|
// update an app
|
||||||
// TODO
|
let pkg_listing: &PackageListing = state
|
||||||
Ok((StatusCode::NO_CONTENT, None, format!("TODO").into_bytes()))
|
.get_listing(&package_id)
|
||||||
|
.ok_or(anyhow::anyhow!("No package"))?;
|
||||||
|
let pkg_state: &PackageState = state
|
||||||
|
.downloaded_packages
|
||||||
|
.get(&package_id)
|
||||||
|
.ok_or(anyhow::anyhow!("No package"))?;
|
||||||
|
let download_from = pkg_state
|
||||||
|
.mirrored_from
|
||||||
|
.as_ref()
|
||||||
|
.ok_or(anyhow::anyhow!("No mirror for package {package_id}"))?
|
||||||
|
.to_string();
|
||||||
|
match crate::start_download(
|
||||||
|
our,
|
||||||
|
requested_packages,
|
||||||
|
&package_id,
|
||||||
|
&download_from,
|
||||||
|
pkg_state.mirroring,
|
||||||
|
pkg_state.auto_update,
|
||||||
|
&None,
|
||||||
|
) {
|
||||||
|
DownloadResponse::Started => Ok((
|
||||||
|
StatusCode::CREATED,
|
||||||
|
None,
|
||||||
|
format!("Downloading").into_bytes(),
|
||||||
|
)),
|
||||||
|
DownloadResponse::Failure => Ok((
|
||||||
|
StatusCode::SERVICE_UNAVAILABLE,
|
||||||
|
None,
|
||||||
|
format!("Failed to download").into_bytes(),
|
||||||
|
)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Method::DELETE => {
|
Method::DELETE => {
|
||||||
// uninstall an app
|
// uninstall an app
|
||||||
@ -236,22 +266,34 @@ fn serve_paths(
|
|||||||
}
|
}
|
||||||
Method::POST => {
|
Method::POST => {
|
||||||
// download an app
|
// download an app
|
||||||
// TODO get fields from POST body
|
|
||||||
let pkg_listing: &PackageListing = state
|
let pkg_listing: &PackageListing = state
|
||||||
.get_listing(&package_id)
|
.get_listing(&package_id)
|
||||||
.ok_or(anyhow::anyhow!("No package"))?;
|
.ok_or(anyhow::anyhow!("No package"))?;
|
||||||
|
// from POST body, look for download_from field and use that as the mirror
|
||||||
|
let body = crate::get_blob()
|
||||||
|
.ok_or(anyhow::anyhow!("missing blob"))?
|
||||||
|
.bytes;
|
||||||
|
let body_json: serde_json::Value =
|
||||||
|
serde_json::from_slice(&body).unwrap_or_default();
|
||||||
let mirrors: &Vec<NodeId> = pkg_listing
|
let mirrors: &Vec<NodeId> = pkg_listing
|
||||||
.metadata
|
.metadata
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.expect("Package does not have metadata")
|
.expect("Package does not have metadata")
|
||||||
.properties
|
.properties
|
||||||
.mirrors
|
.mirrors
|
||||||
.as_ref();
|
.as_ref()
|
||||||
// TODO select on FE
|
|
||||||
let download_from = mirrors
|
|
||||||
.first()
|
|
||||||
.ok_or(anyhow::anyhow!("No mirrors for package {package_id}"))?;
|
.ok_or(anyhow::anyhow!("No mirrors for package {package_id}"))?;
|
||||||
|
|
||||||
// TODO select on FE
|
// TODO select on FE
|
||||||
|
let download_from = body_json
|
||||||
|
.get("download_from")
|
||||||
|
.unwrap_or(&json!(mirrors
|
||||||
|
.first()
|
||||||
|
.ok_or(anyhow::anyhow!("No mirrors for package {package_id}"))?))
|
||||||
|
.as_str()
|
||||||
|
.ok_or(anyhow::anyhow!("download_from not a string"))?
|
||||||
|
.to_string();
|
||||||
|
// TODO select on FE? or after download but before install?
|
||||||
let mirror = false;
|
let mirror = false;
|
||||||
let auto_update = false;
|
let auto_update = false;
|
||||||
let desired_version_hash = None;
|
let desired_version_hash = None;
|
||||||
@ -259,7 +301,7 @@ fn serve_paths(
|
|||||||
our,
|
our,
|
||||||
requested_packages,
|
requested_packages,
|
||||||
&package_id,
|
&package_id,
|
||||||
download_from,
|
&download_from,
|
||||||
mirror,
|
mirror,
|
||||||
auto_update,
|
auto_update,
|
||||||
&desired_version_hash,
|
&desired_version_hash,
|
||||||
|
@ -178,7 +178,7 @@ fn handle_message(
|
|||||||
if source.node() != our.node() || source.process != "eth:distro:sys" {
|
if source.node() != our.node() || source.process != "eth:distro:sys" {
|
||||||
return Err(anyhow::anyhow!("eth sub event from weird addr: {source}"));
|
return Err(anyhow::anyhow!("eth sub event from weird addr: {source}"));
|
||||||
}
|
}
|
||||||
handle_eth_sub_event(&mut state, e)?;
|
handle_eth_sub_event(our, &mut state, e)?;
|
||||||
}
|
}
|
||||||
Req::Http(incoming) => {
|
Req::Http(incoming) => {
|
||||||
if source.node() != our.node()
|
if source.node() != our.node()
|
||||||
@ -187,7 +187,7 @@ fn handle_message(
|
|||||||
return Err(anyhow::anyhow!("http_server from non-local node"));
|
return Err(anyhow::anyhow!("http_server from non-local node"));
|
||||||
}
|
}
|
||||||
if let HttpServerRequest::Http(req) = incoming {
|
if let HttpServerRequest::Http(req) = incoming {
|
||||||
http_api::handle_http_request(&our, &mut state, requested_packages, &req)?;
|
http_api::handle_http_request(our, &mut state, requested_packages, &req)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -270,6 +270,7 @@ fn handle_local_request(
|
|||||||
installed: false,
|
installed: false,
|
||||||
verified: true, // side loaded apps are implicitly verified because there is no "source" to verify against
|
verified: true, // side loaded apps are implicitly verified because there is no "source" to verify against
|
||||||
caps_approved: true, // TODO see if we want to auto-approve local installs
|
caps_approved: true, // TODO see if we want to auto-approve local installs
|
||||||
|
manifest_hash: None, // generated in the add fn
|
||||||
mirroring: *mirror,
|
mirroring: *mirror,
|
||||||
auto_update: false, // can't auto-update a local package
|
auto_update: false, // can't auto-update a local package
|
||||||
metadata: None, // TODO
|
metadata: None, // TODO
|
||||||
@ -414,7 +415,7 @@ fn handle_receive_download(
|
|||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
return Err(anyhow::anyhow!(
|
return Err(anyhow::anyhow!(
|
||||||
"downloaded package is not latest version--rejecting download!"
|
"app store: downloaded package is not desired version--rejecting download! download hash: {download_hash}, desired hash: {hash}"
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -449,7 +450,7 @@ fn handle_receive_download(
|
|||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
return Err(anyhow::anyhow!(
|
return Err(anyhow::anyhow!(
|
||||||
"downloaded package is not latest version--rejecting download!"
|
"app store: downloaded package is not latest version--rejecting download! download hash: {download_hash}, latest hash: {latest_hash}"
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -458,6 +459,14 @@ fn handle_receive_download(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let old_manifest_hash = match state.downloaded_packages.get(&package_id) {
|
||||||
|
Some(package_state) => package_state
|
||||||
|
.manifest_hash
|
||||||
|
.clone()
|
||||||
|
.unwrap_or("OLD".to_string()),
|
||||||
|
_ => "OLD".to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
state.add_downloaded_package(
|
state.add_downloaded_package(
|
||||||
&package_id,
|
&package_id,
|
||||||
PackageState {
|
PackageState {
|
||||||
@ -466,12 +475,28 @@ fn handle_receive_download(
|
|||||||
installed: false,
|
installed: false,
|
||||||
verified,
|
verified,
|
||||||
caps_approved: false,
|
caps_approved: false,
|
||||||
|
manifest_hash: None, // generated in the add fn
|
||||||
mirroring: requested_package.mirror,
|
mirroring: requested_package.mirror,
|
||||||
auto_update: requested_package.auto_update,
|
auto_update: requested_package.auto_update,
|
||||||
metadata: None, // TODO
|
metadata: None, // TODO
|
||||||
},
|
},
|
||||||
Some(blob.bytes),
|
Some(blob.bytes),
|
||||||
)
|
)?;
|
||||||
|
|
||||||
|
let new_manifest_hash = match state.downloaded_packages.get(&package_id) {
|
||||||
|
Some(package_state) => package_state
|
||||||
|
.manifest_hash
|
||||||
|
.clone()
|
||||||
|
.unwrap_or("NEW".to_string()),
|
||||||
|
_ => "NEW".to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// lastly, if auto_update is true, AND the caps_hash has NOT changed,
|
||||||
|
// trigger install!
|
||||||
|
if requested_package.auto_update && old_manifest_hash == new_manifest_hash {
|
||||||
|
handle_install(our, state, &package_id)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_ft_worker_result(body: &[u8], context: &[u8]) -> anyhow::Result<()> {
|
fn handle_ft_worker_result(body: &[u8], context: &[u8]) -> anyhow::Result<()> {
|
||||||
@ -493,11 +518,15 @@ fn handle_ft_worker_result(body: &[u8], context: &[u8]) -> anyhow::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_eth_sub_event(state: &mut State, event: EthSubEvent) -> anyhow::Result<()> {
|
fn handle_eth_sub_event(
|
||||||
|
our: &Address,
|
||||||
|
state: &mut State,
|
||||||
|
event: EthSubEvent,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
let EthSubEvent::Log(log) = event else {
|
let EthSubEvent::Log(log) = event else {
|
||||||
return Err(anyhow::anyhow!("app store: got non-log event"));
|
return Err(anyhow::anyhow!("app store: got non-log event"));
|
||||||
};
|
};
|
||||||
state.ingest_listings_contract_event(log)
|
state.ingest_listings_contract_event(our, log)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fetch_package_manifest(package: &PackageId) -> anyhow::Result<Vec<kt::PackageManifestEntry>> {
|
fn fetch_package_manifest(package: &PackageId) -> anyhow::Result<Vec<kt::PackageManifestEntry>> {
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
use crate::LocalRequest;
|
||||||
use alloy_rpc_types::Log;
|
use alloy_rpc_types::Log;
|
||||||
use alloy_sol_types::{sol, SolEvent};
|
use alloy_sol_types::{sol, SolEvent};
|
||||||
use kinode_process_lib::kernel_types as kt;
|
use kinode_process_lib::kernel_types as kt;
|
||||||
@ -72,6 +73,7 @@ pub struct PackageState {
|
|||||||
pub installed: bool,
|
pub installed: bool,
|
||||||
pub verified: bool,
|
pub verified: bool,
|
||||||
pub caps_approved: bool,
|
pub caps_approved: bool,
|
||||||
|
pub manifest_hash: Option<String>,
|
||||||
/// are we serving this package to others?
|
/// are we serving this package to others?
|
||||||
pub mirroring: bool,
|
pub mirroring: bool,
|
||||||
/// if we get a listing data update, will we try to download it?
|
/// if we get a listing data update, will we try to download it?
|
||||||
@ -157,7 +159,7 @@ impl State {
|
|||||||
pub fn add_downloaded_package(
|
pub fn add_downloaded_package(
|
||||||
&mut self,
|
&mut self,
|
||||||
package_id: &PackageId,
|
package_id: &PackageId,
|
||||||
package_state: PackageState,
|
mut package_state: PackageState,
|
||||||
package_bytes: Option<Vec<u8>>,
|
package_bytes: Option<Vec<u8>>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
if let Some(package_bytes) = package_bytes {
|
if let Some(package_bytes) = package_bytes {
|
||||||
@ -201,6 +203,13 @@ impl State {
|
|||||||
})?)
|
})?)
|
||||||
.blob(blob)
|
.blob(blob)
|
||||||
.send_and_await_response(5)??;
|
.send_and_await_response(5)??;
|
||||||
|
|
||||||
|
let manifest_file = vfs::File {
|
||||||
|
path: format!("/{}/pkg/manifest.json", package_id),
|
||||||
|
};
|
||||||
|
let manifest_bytes = manifest_file.read()?;
|
||||||
|
let manifest_hash = generate_metadata_hash(&manifest_bytes);
|
||||||
|
package_state.manifest_hash = Some(manifest_hash);
|
||||||
}
|
}
|
||||||
self.downloaded_packages
|
self.downloaded_packages
|
||||||
.insert(package_id.to_owned(), package_state);
|
.insert(package_id.to_owned(), package_state);
|
||||||
@ -282,6 +291,10 @@ impl State {
|
|||||||
// generate entry from this data
|
// generate entry from this data
|
||||||
// for the version hash, take the SHA-256 hash of the zip file
|
// for the version hash, take the SHA-256 hash of the zip file
|
||||||
let our_version = generate_version_hash(&zip_file_bytes);
|
let our_version = generate_version_hash(&zip_file_bytes);
|
||||||
|
let manifest_file = vfs::File {
|
||||||
|
path: format!("/{}/pkg/manifest.json", package_id),
|
||||||
|
};
|
||||||
|
let manifest_bytes = manifest_file.read()?;
|
||||||
// the user will need to turn mirroring and auto-update back on if they
|
// the user will need to turn mirroring and auto-update back on if they
|
||||||
// have to reset the state of their app store for some reason. the apps
|
// have to reset the state of their app store for some reason. the apps
|
||||||
// themselves will remain on disk unless explicitly deleted.
|
// themselves will remain on disk unless explicitly deleted.
|
||||||
@ -293,6 +306,7 @@ impl State {
|
|||||||
installed: true,
|
installed: true,
|
||||||
verified: true, // implicity verified
|
verified: true, // implicity verified
|
||||||
caps_approved: true, // since it's already installed this must be true
|
caps_approved: true, // since it's already installed this must be true
|
||||||
|
manifest_hash: Some(generate_metadata_hash(&manifest_bytes)),
|
||||||
mirroring: false,
|
mirroring: false,
|
||||||
auto_update: false,
|
auto_update: false,
|
||||||
metadata: None,
|
metadata: None,
|
||||||
@ -349,7 +363,11 @@ impl State {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// only saves state if last_saved_block is more than 1000 blocks behind
|
/// only saves state if last_saved_block is more than 1000 blocks behind
|
||||||
pub fn ingest_listings_contract_event(&mut self, log: Log) -> anyhow::Result<()> {
|
pub fn ingest_listings_contract_event(
|
||||||
|
&mut self,
|
||||||
|
our: &Address,
|
||||||
|
log: Log,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
let block_number: u64 = log
|
let block_number: u64 = log
|
||||||
.block_number
|
.block_number
|
||||||
.ok_or(anyhow::anyhow!("app store: got log with no block number"))?
|
.ok_or(anyhow::anyhow!("app store: got log with no block number"))?
|
||||||
@ -458,6 +476,33 @@ impl State {
|
|||||||
|
|
||||||
current_listing.metadata_hash = metadata_hash;
|
current_listing.metadata_hash = metadata_hash;
|
||||||
current_listing.metadata = metadata;
|
current_listing.metadata = metadata;
|
||||||
|
|
||||||
|
let package_id = PackageId::new(¤t_listing.name, ¤t_listing.publisher);
|
||||||
|
|
||||||
|
// if we have this app installed, and we have auto_update set to true,
|
||||||
|
// we should try to download new version from the mirrored_from node
|
||||||
|
// and install it if successful.
|
||||||
|
if let Some(package_state) = self.downloaded_packages.get(&package_id) {
|
||||||
|
if package_state.auto_update {
|
||||||
|
if let Some(mirrored_from) = &package_state.mirrored_from {
|
||||||
|
crate::print_to_terminal(
|
||||||
|
1,
|
||||||
|
&format!(
|
||||||
|
"app store: auto-updating package {package_id} from {mirrored_from}"
|
||||||
|
),
|
||||||
|
);
|
||||||
|
Request::to(our)
|
||||||
|
.body(serde_json::to_vec(&LocalRequest::Download {
|
||||||
|
package: package_id,
|
||||||
|
download_from: mirrored_from.clone(),
|
||||||
|
mirror: package_state.mirroring,
|
||||||
|
auto_update: package_state.auto_update,
|
||||||
|
desired_version_hash: None,
|
||||||
|
})?)
|
||||||
|
.send()?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Transfer::SIGNATURE_HASH => {
|
Transfer::SIGNATURE_HASH => {
|
||||||
let from = alloy_primitives::Address::from_word(log.topics[1]);
|
let from = alloy_primitives::Address::from_word(log.topics[1]);
|
||||||
|
90
kinode/packages/app_store/pkg/ui/assets/index-A09g5OKk.js
Normal file
90
kinode/packages/app_store/pkg/ui/assets/index-A09g5OKk.js
Normal file
File diff suppressed because one or more lines are too long
@ -112,11 +112,7 @@ impl ProcessState {
|
|||||||
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
|
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
|
||||||
let res = match self.message_queue.pop_front() {
|
let res = match self.message_queue.pop_front() {
|
||||||
Some(message_from_queue) => message_from_queue,
|
Some(message_from_queue) => message_from_queue,
|
||||||
None => self
|
None => self.ingest_message().await,
|
||||||
.recv_in_process
|
|
||||||
.recv()
|
|
||||||
.await
|
|
||||||
.expect("fatal: process couldn't receive next message"),
|
|
||||||
};
|
};
|
||||||
self.kernel_message_to_process_receive(res)
|
self.kernel_message_to_process_receive(res)
|
||||||
}
|
}
|
||||||
@ -138,11 +134,7 @@ impl ProcessState {
|
|||||||
}
|
}
|
||||||
// next, wait for the awaited message to arrive
|
// next, wait for the awaited message to arrive
|
||||||
loop {
|
loop {
|
||||||
let res = self
|
let res = self.ingest_message().await;
|
||||||
.recv_in_process
|
|
||||||
.recv()
|
|
||||||
.await
|
|
||||||
.expect("fatal: process couldn't receive next message");
|
|
||||||
let id = match &res {
|
let id = match &res {
|
||||||
Ok(km) => km.id,
|
Ok(km) => km.id,
|
||||||
Err(e) => e.id,
|
Err(e) => e.id,
|
||||||
@ -155,6 +147,131 @@ impl ProcessState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// ingest next valid message from kernel.
|
||||||
|
/// cancel any timeout task associated with this message.
|
||||||
|
/// if the message is a response, only enqueue if we have an outstanding request for it.
|
||||||
|
async fn ingest_message(&mut self) -> Result<t::KernelMessage, t::WrappedSendError> {
|
||||||
|
loop {
|
||||||
|
let message = self
|
||||||
|
.recv_in_process
|
||||||
|
.recv()
|
||||||
|
.await
|
||||||
|
.expect("fatal: process couldn't receive next message");
|
||||||
|
|
||||||
|
match &message {
|
||||||
|
Ok(km) => match &km.message {
|
||||||
|
t::Message::Response(_) => {
|
||||||
|
if let Some((_context, timeout_handle)) = self.contexts.get_mut(&km.id) {
|
||||||
|
timeout_handle.abort();
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
if let Some((_context, timeout_handle)) = self.contexts.get_mut(&e.id) {
|
||||||
|
timeout_handle.abort();
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert a message from the main event loop into a result for the process to receive.
|
||||||
|
/// If the message is a response or error, get context if we have one.
|
||||||
|
fn kernel_message_to_process_receive(
|
||||||
|
&mut self,
|
||||||
|
incoming: Result<t::KernelMessage, t::WrappedSendError>,
|
||||||
|
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
|
||||||
|
let (mut km, context) = match incoming {
|
||||||
|
Ok(mut km) => match km.message {
|
||||||
|
t::Message::Request(_) => {
|
||||||
|
self.last_blob = km.lazy_load_blob;
|
||||||
|
km.lazy_load_blob = None;
|
||||||
|
self.prompting_message = Some(km.clone());
|
||||||
|
(km, None)
|
||||||
|
}
|
||||||
|
t::Message::Response(_) => match self.contexts.remove(&km.id) {
|
||||||
|
Some((context, _timeout_handle)) => {
|
||||||
|
self.last_blob = km.lazy_load_blob;
|
||||||
|
km.lazy_load_blob = None;
|
||||||
|
self.prompting_message = context.prompting_message;
|
||||||
|
(km, context.context)
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
self.last_blob = km.lazy_load_blob;
|
||||||
|
km.lazy_load_blob = None;
|
||||||
|
self.prompting_message = Some(km.clone());
|
||||||
|
(km, None)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Err(e) => match self.contexts.remove(&e.id) {
|
||||||
|
None => return Err((t::en_wit_send_error(e.error), None)),
|
||||||
|
Some((context, _timeout_handle)) => {
|
||||||
|
self.prompting_message = context.prompting_message;
|
||||||
|
return Err((t::en_wit_send_error(e.error), context.context));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let pk = signature::UnparsedPublicKey::new(
|
||||||
|
&signature::ED25519,
|
||||||
|
self.keypair.as_ref().public_key(),
|
||||||
|
);
|
||||||
|
|
||||||
|
// prune any invalid capabilities before handing to process
|
||||||
|
// where invalid = supposedly issued by us, but not signed properly by us
|
||||||
|
match &mut km.message {
|
||||||
|
t::Message::Request(request) => {
|
||||||
|
request.capabilities.retain(|(cap, sig)| {
|
||||||
|
// The only time we verify a cap's signature is when a foreign node
|
||||||
|
// sends us a cap that we (allegedly) issued
|
||||||
|
if km.source.node != self.metadata.our.node
|
||||||
|
&& cap.issuer.node == self.metadata.our.node
|
||||||
|
{
|
||||||
|
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
|
||||||
|
Ok(_) => true,
|
||||||
|
Err(_) => false,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
t::Message::Response((response, _)) => {
|
||||||
|
response.capabilities.retain(|(cap, sig)| {
|
||||||
|
// The only time we verify a cap's signature is when a foreign node
|
||||||
|
// sends us a cap that we (allegedly) issued
|
||||||
|
if km.source.node != self.metadata.our.node
|
||||||
|
&& cap.issuer.node == self.metadata.our.node
|
||||||
|
{
|
||||||
|
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
|
||||||
|
Ok(_) => true,
|
||||||
|
Err(_) => false,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
km.source.en_wit(),
|
||||||
|
match km.message {
|
||||||
|
t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)),
|
||||||
|
// NOTE: we throw away whatever context came from the sender, that's not ours
|
||||||
|
t::Message::Response((response, _sent_context)) => {
|
||||||
|
wit::Message::Response((t::en_wit_response(response), context))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
/// takes Request generated by a process and sends it to the main event loop.
|
/// takes Request generated by a process and sends it to the main event loop.
|
||||||
/// will only fail if process does not have capability to send to target.
|
/// will only fail if process does not have capability to send to target.
|
||||||
/// if the request has a timeout (expects response), start a task to track
|
/// if the request has a timeout (expects response), start a task to track
|
||||||
@ -362,99 +479,6 @@ impl ProcessState {
|
|||||||
.await
|
.await
|
||||||
.expect("fatal: kernel couldn't send response");
|
.expect("fatal: kernel couldn't send response");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Convert a message from the main event loop into a result for the process to receive.
|
|
||||||
/// If the message is a response or error, get context if we have one.
|
|
||||||
fn kernel_message_to_process_receive(
|
|
||||||
&mut self,
|
|
||||||
incoming: Result<t::KernelMessage, t::WrappedSendError>,
|
|
||||||
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
|
|
||||||
let (mut km, context) = match incoming {
|
|
||||||
Ok(mut km) => match km.message {
|
|
||||||
t::Message::Request(_) => {
|
|
||||||
self.last_blob = km.lazy_load_blob;
|
|
||||||
km.lazy_load_blob = None;
|
|
||||||
self.prompting_message = Some(km.clone());
|
|
||||||
(km, None)
|
|
||||||
}
|
|
||||||
t::Message::Response(_) => {
|
|
||||||
if let Some((context, timeout_handle)) = self.contexts.remove(&km.id) {
|
|
||||||
timeout_handle.abort();
|
|
||||||
self.last_blob = km.lazy_load_blob;
|
|
||||||
km.lazy_load_blob = None;
|
|
||||||
self.prompting_message = context.prompting_message;
|
|
||||||
(km, context.context)
|
|
||||||
} else {
|
|
||||||
self.last_blob = km.lazy_load_blob;
|
|
||||||
km.lazy_load_blob = None;
|
|
||||||
self.prompting_message = Some(km.clone());
|
|
||||||
(km, None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(e) => match self.contexts.remove(&e.id) {
|
|
||||||
None => return Err((t::en_wit_send_error(e.error), None)),
|
|
||||||
Some((context, timeout_handle)) => {
|
|
||||||
timeout_handle.abort();
|
|
||||||
self.prompting_message = context.prompting_message;
|
|
||||||
return Err((t::en_wit_send_error(e.error), context.context));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
let pk = signature::UnparsedPublicKey::new(
|
|
||||||
&signature::ED25519,
|
|
||||||
self.keypair.as_ref().public_key(),
|
|
||||||
);
|
|
||||||
|
|
||||||
// prune any invalid capabilities before handing to process
|
|
||||||
// where invalid = supposedly issued by us, but not signed properly by us
|
|
||||||
match &mut km.message {
|
|
||||||
t::Message::Request(request) => {
|
|
||||||
request.capabilities.retain(|(cap, sig)| {
|
|
||||||
// The only time we verify a cap's signature is when a foreign node
|
|
||||||
// sends us a cap that we (allegedly) issued
|
|
||||||
if km.source.node != self.metadata.our.node
|
|
||||||
&& cap.issuer.node == self.metadata.our.node
|
|
||||||
{
|
|
||||||
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
|
|
||||||
Ok(_) => true,
|
|
||||||
Err(_) => false,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
t::Message::Response((response, _)) => {
|
|
||||||
response.capabilities.retain(|(cap, sig)| {
|
|
||||||
// The only time we verify a cap's signature is when a foreign node
|
|
||||||
// sends us a cap that we (allegedly) issued
|
|
||||||
if km.source.node != self.metadata.our.node
|
|
||||||
&& cap.issuer.node == self.metadata.our.node
|
|
||||||
{
|
|
||||||
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
|
|
||||||
Ok(_) => true,
|
|
||||||
Err(_) => false,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok((
|
|
||||||
km.source.en_wit(),
|
|
||||||
match km.message {
|
|
||||||
t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)),
|
|
||||||
// NOTE: we throw away whatever context came from the sender, that's not ours
|
|
||||||
t::Message::Response((response, _sent_context)) => {
|
|
||||||
wit::Message::Response((t::en_wit_response(response), context))
|
|
||||||
}
|
|
||||||
},
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// create a specific process, and generate a task that will run it.
|
/// create a specific process, and generate a task that will run it.
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use lib::types::core::{
|
use lib::types::core::{
|
||||||
Address, KernelMessage, Message, MessageReceiver, MessageSender, PrintSender, Printout,
|
Address, KernelMessage, Message, MessageReceiver, MessageSender, PrintSender, Printout,
|
||||||
Response, TIMER_PROCESS_ID,
|
Response, TimerAction, TIMER_PROCESS_ID,
|
||||||
};
|
};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
@ -11,8 +11,8 @@ use serde::{Deserialize, Serialize};
|
|||||||
/// requests made by other nodes.
|
/// requests made by other nodes.
|
||||||
///
|
///
|
||||||
/// The interface of the timer module is as follows:
|
/// The interface of the timer module is as follows:
|
||||||
/// One kind of request is accepted: the IPC must be a little-endian byte-representation
|
/// One kind of request is accepted: TimerAction::SetTimer(u64), where the u64 is the time to wait
|
||||||
/// of an unsigned 64-bit integer, in milliseconds. This request should always expect a Response.
|
/// in milliseconds. This request should always expect a Response.
|
||||||
/// If the request does not expect a Response, the timer will not be set.
|
/// If the request does not expect a Response, the timer will not be set.
|
||||||
///
|
///
|
||||||
/// A proper Request will trigger the timer module to send a Response. The Response will be
|
/// A proper Request will trigger the timer module to send a Response. The Response will be
|
||||||
@ -39,7 +39,15 @@ pub async fn timer_service(
|
|||||||
// we only handle Requests which contain a little-endian u64 as IPC,
|
// we only handle Requests which contain a little-endian u64 as IPC,
|
||||||
// except for a special "debug" message, which prints the current state
|
// except for a special "debug" message, which prints the current state
|
||||||
let Message::Request(req) = km.message else { continue };
|
let Message::Request(req) = km.message else { continue };
|
||||||
if req.body == "debug".as_bytes() {
|
let Ok(timer_action) = serde_json::from_slice::<TimerAction>(&req.body) else {
|
||||||
|
let _ = print_tx.send(Printout {
|
||||||
|
verbosity: 1,
|
||||||
|
content: "timer service received a request with an invalid body".to_string(),
|
||||||
|
}).await;
|
||||||
|
continue
|
||||||
|
};
|
||||||
|
match timer_action {
|
||||||
|
TimerAction::Debug => {
|
||||||
let _ = print_tx.send(Printout {
|
let _ = print_tx.send(Printout {
|
||||||
verbosity: 0,
|
verbosity: 0,
|
||||||
content: format!("timer service active timers ({}):", timer_map.timers.len()),
|
content: format!("timer service active timers ({}):", timer_map.timers.len()),
|
||||||
@ -52,8 +60,7 @@ pub async fn timer_service(
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
let Ok(bytes): Result<[u8; 8], _> = req.body.try_into() else { continue };
|
TimerAction::SetTimer(timer_millis) => {
|
||||||
let timer_millis = u64::from_le_bytes(bytes);
|
|
||||||
// if the timer is set to pop in 0 millis, we immediately respond
|
// if the timer is set to pop in 0 millis, we immediately respond
|
||||||
// otherwise, store in our persisted map, and spawn a task that
|
// otherwise, store in our persisted map, and spawn a task that
|
||||||
// sleeps for the given time, then sends the response
|
// sleeps for the given time, then sends the response
|
||||||
@ -78,6 +85,8 @@ pub async fn timer_service(
|
|||||||
}
|
}
|
||||||
timer_map.insert(pop_time, km.id, km.rsvp.unwrap_or(km.source));
|
timer_map.insert(pop_time, km.id, km.rsvp.unwrap_or(km.source));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Some(Ok(time)) = timer_tasks.join_next() => {
|
Some(Ok(time)) = timer_tasks.join_next() => {
|
||||||
// when a timer pops, we send the response to the process(es) that set
|
// when a timer pops, we send the response to the process(es) that set
|
||||||
// the timer(s), and then remove it from our persisted map
|
// the timer(s), and then remove it from our persisted map
|
||||||
|
@ -1496,3 +1496,9 @@ impl From<tokio::sync::mpsc::error::SendError<CapMessage>> for SqliteError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub enum TimerAction {
|
||||||
|
Debug,
|
||||||
|
SetTimer(u64),
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user