Merge branch 'develop' into bp/new-eth

This commit is contained in:
bitful-pannul 2024-02-14 16:15:47 -03:00
commit 147ab09523
11 changed files with 323 additions and 170 deletions

1
Cargo.lock generated
View File

@ -3338,6 +3338,7 @@ dependencies = [
"chrono",
"clap",
"crossterm",
"curve25519-dalek",
"dashmap",
"digest 0.10.7",
"elliptic-curve",

View File

@ -42,6 +42,7 @@ chacha20poly1305 = "0.10.1"
chrono = "0.4.31"
clap = { version = "4.4", features = ["derive"] }
crossterm = { version = "0.26.1", features = ["event-stream", "bracketed-paste"] }
curve25519-dalek = "^4.1.2"
dashmap = "5.5.3"
digest = "0.10"
elliptic-curve = { version = "0.13.8", features = ["ecdh"] }

View File

@ -188,8 +188,38 @@ fn serve_paths(
}
Method::PUT => {
// update an app
// TODO
Ok((StatusCode::NO_CONTENT, None, format!("TODO").into_bytes()))
let pkg_listing: &PackageListing = state
.get_listing(&package_id)
.ok_or(anyhow::anyhow!("No package"))?;
let pkg_state: &PackageState = state
.downloaded_packages
.get(&package_id)
.ok_or(anyhow::anyhow!("No package"))?;
let download_from = pkg_state
.mirrored_from
.as_ref()
.ok_or(anyhow::anyhow!("No mirror for package {package_id}"))?
.to_string();
match crate::start_download(
our,
requested_packages,
&package_id,
&download_from,
pkg_state.mirroring,
pkg_state.auto_update,
&None,
) {
DownloadResponse::Started => Ok((
StatusCode::CREATED,
None,
format!("Downloading").into_bytes(),
)),
DownloadResponse::Failure => Ok((
StatusCode::SERVICE_UNAVAILABLE,
None,
format!("Failed to download").into_bytes(),
)),
}
}
Method::DELETE => {
// uninstall an app
@ -235,10 +265,15 @@ fn serve_paths(
}
Method::POST => {
// download an app
// TODO get fields from POST body
let pkg_listing: &PackageListing = state
.get_listing(&package_id)
.ok_or(anyhow::anyhow!("No package"))?;
// from POST body, look for download_from field and use that as the mirror
let body = crate::get_blob()
.ok_or(anyhow::anyhow!("missing blob"))?
.bytes;
let body_json: serde_json::Value =
serde_json::from_slice(&body).unwrap_or_default();
let mirrors: &Vec<NodeId> = pkg_listing
.metadata
.as_ref()
@ -246,11 +281,15 @@ fn serve_paths(
.mirrors
.as_ref()
.ok_or(anyhow::anyhow!("No mirrors for package {package_id}"))?;
// TODO select on FE
let download_from = mirrors
let download_from = body_json
.get("download_from")
.unwrap_or(&json!(mirrors
.first()
.ok_or(anyhow::anyhow!("No mirrors for package {package_id}"))?;
// TODO select on FE
.ok_or(anyhow::anyhow!("No mirrors for package {package_id}"))?))
.as_str()
.ok_or(anyhow::anyhow!("download_from not a string"))?
.to_string();
// TODO select on FE? or after download but before install?
let mirror = false;
let auto_update = false;
let desired_version_hash = None;
@ -258,7 +297,7 @@ fn serve_paths(
our,
requested_packages,
&package_id,
download_from,
&download_from,
mirror,
auto_update,
&desired_version_hash,

View File

@ -1,6 +1,5 @@
use kinode_process_lib::eth::{
get_logs, subscribe, unsubscribe, Address as EthAddress, EthAction, EthSub, Filter, Params,
SubscriptionResult,
get_logs, subscribe, unsubscribe, Address as EthAddress, EthSub, Filter, SubscriptionResult,
};
use kinode_process_lib::http::{bind_http_path, serve_ui, HttpServerRequest};
use kinode_process_lib::kernel_types as kt;
@ -121,7 +120,7 @@ fn init(our: Address) {
if let Ok(logs) = logs {
for log in logs {
state.ingest_listings_contract_event(log);
state.ingest_listings_contract_event(&our, log);
}
}
@ -189,7 +188,7 @@ fn handle_message(
if source.node() != our.node() || source.process != "eth:distro:sys" {
return Err(anyhow::anyhow!("eth sub event from weird addr: {source}"));
}
handle_eth_sub_event(&mut state, sub.result)?;
handle_eth_sub_event(our, &mut state, sub.result)?;
}
Req::Http(incoming) => {
if source.node() != our.node()
@ -198,7 +197,7 @@ fn handle_message(
return Err(anyhow::anyhow!("http_server from non-local node"));
}
if let HttpServerRequest::Http(req) = incoming {
http_api::handle_http_request(&our, &mut state, requested_packages, &req)?;
http_api::handle_http_request(our, &mut state, requested_packages, &req)?;
}
}
},
@ -280,6 +279,7 @@ fn handle_local_request(
our_version,
installed: false,
caps_approved: true, // TODO see if we want to auto-approve local installs
manifest_hash: None, // generated in the add fn
mirroring: *mirror,
auto_update: false, // can't auto-update a local package
metadata: None, // TODO
@ -343,7 +343,7 @@ fn handle_local_request(
if let Ok(logs) = logs {
for log in logs {
state.ingest_listings_contract_event(log);
state.ingest_listings_contract_event(our, log);
}
}
subscribe(1, filter).unwrap();
@ -424,7 +424,7 @@ fn handle_receive_download(
Some(hash) => {
if download_hash != hash {
return Err(anyhow::anyhow!(
"app store: downloaded package is not latest version--rejecting download!"
"app store: downloaded package is not desired version--rejecting download! download hash: {download_hash}, desired hash: {hash}"
));
}
}
@ -439,7 +439,7 @@ fn handle_receive_download(
if let Some(latest_hash) = metadata.versions.clone().unwrap_or(vec![]).last() {
if &download_hash != latest_hash {
return Err(anyhow::anyhow!(
"app store: downloaded package is not latest version--rejecting download!"
"app store: downloaded package is not latest version--rejecting download! download hash: {download_hash}, latest hash: {latest_hash}"
));
}
} else {
@ -453,6 +453,14 @@ fn handle_receive_download(
}
}
let old_manifest_hash = match state.downloaded_packages.get(&package_id) {
Some(package_state) => package_state
.manifest_hash
.clone()
.unwrap_or("OLD".to_string()),
_ => "OLD".to_string(),
};
state.add_downloaded_package(
&package_id,
PackageState {
@ -460,12 +468,28 @@ fn handle_receive_download(
our_version: download_hash,
installed: false,
caps_approved: false,
manifest_hash: None, // generated in the add fn
mirroring: requested_package.mirror,
auto_update: requested_package.auto_update,
metadata: None, // TODO
},
Some(blob.bytes),
)
)?;
let new_manifest_hash = match state.downloaded_packages.get(&package_id) {
Some(package_state) => package_state
.manifest_hash
.clone()
.unwrap_or("NEW".to_string()),
_ => "NEW".to_string(),
};
// lastly, if auto_update is true, AND the caps_hash has NOT changed,
// trigger install!
if requested_package.auto_update && old_manifest_hash == new_manifest_hash {
handle_install(our, state, &package_id)?;
}
Ok(())
}
fn handle_ft_worker_result(body: &[u8], context: &[u8]) -> anyhow::Result<()> {
@ -487,11 +511,15 @@ fn handle_ft_worker_result(body: &[u8], context: &[u8]) -> anyhow::Result<()> {
Ok(())
}
fn handle_eth_sub_event(state: &mut State, event: SubscriptionResult) -> anyhow::Result<()> {
fn handle_eth_sub_event(
our: &Address,
state: &mut State,
event: SubscriptionResult,
) -> anyhow::Result<()> {
let SubscriptionResult::Log(log) = event else {
return Err(anyhow::anyhow!("app store: got non-log event"));
};
state.ingest_listings_contract_event(*log)
state.ingest_listings_contract_event(our, *log)
}
fn fetch_package_manifest(package: &PackageId) -> anyhow::Result<Vec<kt::PackageManifestEntry>> {

View File

@ -86,6 +86,7 @@ pub struct PackageState {
pub our_version: String,
pub installed: bool,
pub caps_approved: bool,
pub manifest_hash: Option<String>,
/// are we serving this package to others?
pub mirroring: bool,
/// if we get a listing data update, will we try to download it?
@ -171,7 +172,7 @@ impl State {
pub fn add_downloaded_package(
&mut self,
package_id: &PackageId,
package_state: PackageState,
mut package_state: PackageState,
package_bytes: Option<Vec<u8>>,
) -> anyhow::Result<()> {
if let Some(package_bytes) = package_bytes {
@ -215,6 +216,13 @@ impl State {
})?)
.blob(blob)
.send_and_await_response(5)??;
let manifest_file = vfs::File {
path: format!("/{}/pkg/manifest.json", package_id),
};
let manifest_bytes = manifest_file.read()?;
let manifest_hash = generate_metadata_hash(&manifest_bytes);
package_state.manifest_hash = Some(manifest_hash);
}
self.downloaded_packages
.insert(package_id.to_owned(), package_state);
@ -296,6 +304,10 @@ impl State {
// generate entry from this data
// for the version hash, take the SHA-256 hash of the zip file
let our_version = generate_version_hash(&zip_file_bytes);
let manifest_file = vfs::File {
path: format!("/{}/pkg/manifest.json", package_id),
};
let manifest_bytes = manifest_file.read()?;
// the user will need to turn mirroring and auto-update back on if they
// have to reset the state of their app store for some reason. the apps
// themselves will remain on disk unless explicitly deleted.
@ -306,6 +318,7 @@ impl State {
our_version,
installed: true,
caps_approved: true, // since it's already installed this must be true
manifest_hash: Some(generate_metadata_hash(&manifest_bytes)),
mirroring: false,
auto_update: false,
metadata: None,
@ -362,7 +375,11 @@ impl State {
}
/// only saves state if last_saved_block is more than 1000 blocks behind
pub fn ingest_listings_contract_event(&mut self, log: Log) -> anyhow::Result<()> {
pub fn ingest_listings_contract_event(
&mut self,
our: &Address,
log: Log,
) -> anyhow::Result<()> {
let block_number: u64 = log
.block_number
.ok_or(anyhow::anyhow!("app store: got log with no block number"))?
@ -454,6 +471,33 @@ impl State {
current_listing.metadata_hash = metadata_hash;
current_listing.metadata = metadata;
let package_id = PackageId::new(&current_listing.name, &current_listing.publisher);
// if we have this app installed, and we have auto_update set to true,
// we should try to download new version from the mirrored_from node
// and install it if successful.
if let Some(package_state) = self.downloaded_packages.get(&package_id) {
if package_state.auto_update {
if let Some(mirrored_from) = &package_state.mirrored_from {
crate::print_to_terminal(
1,
&format!(
"app store: auto-updating package {package_id} from {mirrored_from}"
),
);
Request::to(our)
.body(serde_json::to_vec(&LocalRequest::Download {
package: package_id,
download_from: mirrored_from.clone(),
mirror: package_state.mirroring,
auto_update: package_state.auto_update,
desired_version_hash: None,
})?)
.send()?;
}
}
}
}
Transfer::SIGNATURE_HASH => {
let from = alloy_primitives::Address::from_word(log.topics[1]);

View File

@ -18,7 +18,7 @@
content="width=device-width, initial-scale=1, minimum-scale=1, maximum-scale=1.00001, viewport-fit=cover"
/>
<link href='https://fonts.googleapis.com/css?family=Montserrat' rel='stylesheet'>
<script type="module" crossorigin src="/main:app_store:sys/assets/index-pkTLhk2L.js"></script>
<script type="module" crossorigin src="/main:app_store:sys/assets/index-A09g5OKk.js"></script>
<link rel="stylesheet" crossorigin href="/main:app_store:sys/assets/index-aUqPNadJ.css">
</head>
<body>

View File

@ -112,11 +112,7 @@ impl ProcessState {
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let res = match self.message_queue.pop_front() {
Some(message_from_queue) => message_from_queue,
None => self
.recv_in_process
.recv()
.await
.expect("fatal: process couldn't receive next message"),
None => self.ingest_message().await,
};
self.kernel_message_to_process_receive(res)
}
@ -138,11 +134,7 @@ impl ProcessState {
}
// next, wait for the awaited message to arrive
loop {
let res = self
.recv_in_process
.recv()
.await
.expect("fatal: process couldn't receive next message");
let res = self.ingest_message().await;
let id = match &res {
Ok(km) => km.id,
Err(e) => e.id,
@ -155,6 +147,131 @@ impl ProcessState {
}
}
/// ingest next valid message from kernel.
/// cancel any timeout task associated with this message.
/// if the message is a response, only enqueue if we have an outstanding request for it.
async fn ingest_message(&mut self) -> Result<t::KernelMessage, t::WrappedSendError> {
loop {
let message = self
.recv_in_process
.recv()
.await
.expect("fatal: process couldn't receive next message");
match &message {
Ok(km) => match &km.message {
t::Message::Response(_) => {
if let Some((_context, timeout_handle)) = self.contexts.get_mut(&km.id) {
timeout_handle.abort();
return message;
}
}
_ => {
return message;
}
},
Err(e) => {
if let Some((_context, timeout_handle)) = self.contexts.get_mut(&e.id) {
timeout_handle.abort();
return message;
}
}
}
}
}
/// Convert a message from the main event loop into a result for the process to receive.
/// If the message is a response or error, get context if we have one.
fn kernel_message_to_process_receive(
&mut self,
incoming: Result<t::KernelMessage, t::WrappedSendError>,
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let (mut km, context) = match incoming {
Ok(mut km) => match km.message {
t::Message::Request(_) => {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(km, None)
}
t::Message::Response(_) => match self.contexts.remove(&km.id) {
Some((context, _timeout_handle)) => {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = context.prompting_message;
(km, context.context)
}
None => {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(km, None)
}
},
},
Err(e) => match self.contexts.remove(&e.id) {
None => return Err((t::en_wit_send_error(e.error), None)),
Some((context, _timeout_handle)) => {
self.prompting_message = context.prompting_message;
return Err((t::en_wit_send_error(e.error), context.context));
}
},
};
let pk = signature::UnparsedPublicKey::new(
&signature::ED25519,
self.keypair.as_ref().public_key(),
);
// prune any invalid capabilities before handing to process
// where invalid = supposedly issued by us, but not signed properly by us
match &mut km.message {
t::Message::Request(request) => {
request.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
t::Message::Response((response, _)) => {
response.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
};
Ok((
km.source.en_wit(),
match km.message {
t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)),
// NOTE: we throw away whatever context came from the sender, that's not ours
t::Message::Response((response, _sent_context)) => {
wit::Message::Response((t::en_wit_response(response), context))
}
},
))
}
/// takes Request generated by a process and sends it to the main event loop.
/// will only fail if process does not have capability to send to target.
/// if the request has a timeout (expects response), start a task to track
@ -362,99 +479,6 @@ impl ProcessState {
.await
.expect("fatal: kernel couldn't send response");
}
/// Convert a message from the main event loop into a result for the process to receive.
/// If the message is a response or error, get context if we have one.
fn kernel_message_to_process_receive(
&mut self,
incoming: Result<t::KernelMessage, t::WrappedSendError>,
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let (mut km, context) = match incoming {
Ok(mut km) => match km.message {
t::Message::Request(_) => {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(km, None)
}
t::Message::Response(_) => {
if let Some((context, timeout_handle)) = self.contexts.remove(&km.id) {
timeout_handle.abort();
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = context.prompting_message;
(km, context.context)
} else {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(km, None)
}
}
},
Err(e) => match self.contexts.remove(&e.id) {
None => return Err((t::en_wit_send_error(e.error), None)),
Some((context, timeout_handle)) => {
timeout_handle.abort();
self.prompting_message = context.prompting_message;
return Err((t::en_wit_send_error(e.error), context.context));
}
},
};
let pk = signature::UnparsedPublicKey::new(
&signature::ED25519,
self.keypair.as_ref().public_key(),
);
// prune any invalid capabilities before handing to process
// where invalid = supposedly issued by us, but not signed properly by us
match &mut km.message {
t::Message::Request(request) => {
request.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
t::Message::Response((response, _)) => {
response.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
};
Ok((
km.source.en_wit(),
match km.message {
t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)),
// NOTE: we throw away whatever context came from the sender, that's not ours
t::Message::Response((response, _sent_context)) => {
wit::Message::Response((t::en_wit_response(response), context))
}
},
))
}
}
/// create a specific process, and generate a task that will run it.

View File

@ -1,7 +1,7 @@
use anyhow::Result;
use lib::types::core::{
Address, KernelMessage, Message, MessageReceiver, MessageSender, PrintSender, Printout,
Response, TIMER_PROCESS_ID,
Response, TimerAction, TIMER_PROCESS_ID,
};
use serde::{Deserialize, Serialize};
@ -11,8 +11,8 @@ use serde::{Deserialize, Serialize};
/// requests made by other nodes.
///
/// The interface of the timer module is as follows:
/// One kind of request is accepted: the IPC must be a little-endian byte-representation
/// of an unsigned 64-bit integer, in milliseconds. This request should always expect a Response.
/// One kind of request is accepted: TimerAction::SetTimer(u64), where the u64 is the time to wait
/// in milliseconds. This request should always expect a Response.
/// If the request does not expect a Response, the timer will not be set.
///
/// A proper Request will trigger the timer module to send a Response. The Response will be
@ -39,7 +39,15 @@ pub async fn timer_service(
// we only handle Requests which contain a little-endian u64 as IPC,
// except for a special "debug" message, which prints the current state
let Message::Request(req) = km.message else { continue };
if req.body == "debug".as_bytes() {
let Ok(timer_action) = serde_json::from_slice::<TimerAction>(&req.body) else {
let _ = print_tx.send(Printout {
verbosity: 1,
content: "timer service received a request with an invalid body".to_string(),
}).await;
continue
};
match timer_action {
TimerAction::Debug => {
let _ = print_tx.send(Printout {
verbosity: 0,
content: format!("timer service active timers ({}):", timer_map.timers.len()),
@ -52,8 +60,7 @@ pub async fn timer_service(
}
continue
}
let Ok(bytes): Result<[u8; 8], _> = req.body.try_into() else { continue };
let timer_millis = u64::from_le_bytes(bytes);
TimerAction::SetTimer(timer_millis) => {
// if the timer is set to pop in 0 millis, we immediately respond
// otherwise, store in our persisted map, and spawn a task that
// sleeps for the given time, then sends the response
@ -78,6 +85,8 @@ pub async fn timer_service(
}
timer_map.insert(pop_time, km.id, km.rsvp.unwrap_or(km.source));
}
}
}
Some(Ok(time)) = timer_tasks.join_next() => {
// when a timer pops, we send the response to the process(es) that set
// the timer(s), and then remove it from our persisted map

View File

@ -361,7 +361,8 @@ async fn handle_request(
)
}
VfsAction::RemoveFile => {
fs::remove_file(path).await?;
fs::remove_file(&path).await?;
open_files.remove(&path);
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::RemoveDir => {

View File

@ -1486,3 +1486,9 @@ impl From<tokio::sync::mpsc::error::SendError<CapMessage>> for SqliteError {
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TimerAction {
Debug,
SetTimer(u64),
}