downloads:app_store: init

This commit is contained in:
bitful-pannul 2024-08-08 14:26:44 +03:00
parent 5be42420ca
commit f86c408089
7 changed files with 474 additions and 121 deletions

20
Cargo.lock generated
View File

@ -2074,6 +2074,26 @@ dependencies = [
"wit-bindgen",
]
[[package]]
name = "downloads"
version = "0.5.0"
dependencies = [
"alloy-primitives",
"alloy-sol-types",
"anyhow",
"bincode",
"kinode_process_lib 0.9.0",
"rand 0.8.5",
"serde",
"serde_json",
"sha2",
"sha3",
"url",
"urlencoding 2.1.3",
"wit-bindgen",
"zip 1.1.4",
]
[[package]]
name = "dunce"
version = "1.0.5"

View File

@ -15,7 +15,7 @@ lib = { path = "lib" }
members = [
"lib", "kinode",
"kinode/packages/app_store/app_store", "kinode/packages/app_store/ft_worker",
"kinode/packages/app_store/download", "kinode/packages/app_store/install", "kinode/packages/app_store/uninstall",
"kinode/packages/app_store/download", "kinode/packages/app_store/install", "kinode/packages/app_store/uninstall", "kinode/packages/app_store/downloads",
"kinode/packages/chess/chess",
"kinode/packages/homepage/homepage",
"kinode/packages/kino_updates/blog", "kinode/packages/kino_updates/globe",

View File

@ -18,9 +18,6 @@ use crate::kinode::process::main::{
NewPackageResponse, Reason, RebuildIndexResponse, RemoteDownloadRequest, RemoteRequest,
RemoteResponse, UninstallResponse,
};
use ft_worker_lib::{
spawn_receive_transfer, spawn_transfer, FTWorkerCommand, FTWorkerResult, FileTransferContext,
};
use kinode_process_lib::{
await_message, call_init, eth, get_blob, http, kimap, println, vfs, Address, LazyLoadBlob,
Message, PackageId, Request, Response,
@ -36,7 +33,6 @@ wit_bindgen::generate!({
additional_derives: [serde::Deserialize, serde::Serialize],
});
mod ft_worker_lib;
mod http_api;
pub mod state;
pub mod utils;
@ -67,8 +63,6 @@ const KIMAP_FIRST_BLOCK: u64 = 1;
pub enum Req {
LocalRequest(LocalRequest),
RemoteRequest(RemoteRequest),
FTWorkerCommand(FTWorkerCommand),
FTWorkerResult(FTWorkerResult),
Eth(eth::EthSubResult),
Http(http::server::HttpServerRequest),
}
@ -78,7 +72,6 @@ pub enum Req {
pub enum Resp {
LocalResponse(LocalResponse),
RemoteResponse(RemoteResponse),
FTWorkerResult(FTWorkerResult),
HttpClient(Result<http::client::HttpClientResponse, http::client::HttpClientError>),
}
@ -136,44 +129,31 @@ fn handle_message(
response.send()?;
}
}
Req::RemoteRequest(remote_request) => {
let resp = handle_remote_request(state, message.source(), remote_request);
Response::new().body(serde_json::to_vec(&resp)?).send()?;
}
Req::FTWorkerCommand(_) => {
spawn_receive_transfer(&state.our, message.body())?;
}
Req::FTWorkerResult(FTWorkerResult::ReceiveSuccess(name)) => {
handle_receive_download(state, &name)?;
}
Req::FTWorkerResult(FTWorkerResult::ProgressUpdate {
file_name,
chunks_received,
total_chunks,
}) => {
// forward progress to UI
http_server.ws_push_all_channels(
"/",
http::server::WsMessageType::Text,
LazyLoadBlob {
mime: Some("application/json".to_string()),
bytes: serde_json::json!({
"kind": "progress",
"data": {
"file_name": file_name,
"chunks_received": chunks_received,
"total_chunks": total_chunks,
}
})
.to_string()
.as_bytes()
.to_vec(),
},
);
}
Req::FTWorkerResult(r) => {
println!("got weird ft_worker result: {r:?}");
}
// Req::FTWorkerResult(FTWorkerResult::ProgressUpdate {
// file_name,
// chunks_received,
// total_chunks,
// }) => {
// // forward progress to UI
// http_server.ws_push_all_channels(
// "/",
// http::server::WsMessageType::Text,
// LazyLoadBlob {
// mime: Some("application/json".to_string()),
// bytes: serde_json::json!({
// "kind": "progress",
// "data": {
// "file_name": file_name,
// "chunks_received": chunks_received,
// "total_chunks": total_chunks,
// }
// })
// .to_string()
// .as_bytes()
// .to_vec(),
// },
// );
// }
Req::Eth(eth_result) => {
if !message.is_local(&state.our) || message.source().process != "eth:distro:sys" {
return Err(anyhow::anyhow!(
@ -205,6 +185,7 @@ fn handle_message(
},
);
}
_ => {}
}
} else {
match serde_json::from_slice::<Resp>(message.body())? {
@ -225,9 +206,6 @@ fn handle_message(
println!("got http_client error: {resp:?}");
}
}
Resp::FTWorkerResult(ft_worker_result) => {
handle_ft_worker_result(ft_worker_result, message.context().unwrap_or(&vec![]))?;
}
Resp::LocalResponse(_) | Resp::RemoteResponse(_) => {
// don't need to handle these at the moment
}
@ -237,55 +215,6 @@ fn handle_message(
}
/// fielding requests to download packages and APIs from us
fn handle_remote_request(state: &mut State, source: &Address, request: RemoteRequest) -> Resp {
let (package_id, desired_version_hash) = match request {
RemoteRequest::Download(RemoteDownloadRequest {
package_id,
desired_version_hash,
}) => (package_id.to_process_lib(), desired_version_hash),
};
let Some(listing) = state.packages.get(&package_id) else {
return Resp::RemoteResponse(RemoteResponse::DownloadDenied(Reason::NoPackage));
};
let Some(ref package_state) = listing.state else {
return Resp::RemoteResponse(RemoteResponse::DownloadDenied(Reason::NoPackage));
};
if !package_state.mirroring {
return Resp::RemoteResponse(RemoteResponse::DownloadDenied(Reason::NotMirroring));
}
if let Some(hash) = desired_version_hash {
if package_state.our_version_hash != hash {
return Resp::RemoteResponse(RemoteResponse::DownloadDenied(Reason::HashMismatch(
HashMismatch {
requested: hash,
have: package_state.our_version_hash.clone(),
},
)));
}
}
let file_name = format!("/{package_id}.zip");
// get the .zip from VFS and attach as blob to response
let Ok(Ok(_)) = utils::vfs_request(
format!("/{package_id}/pkg{file_name}"),
vfs::VfsAction::Read,
)
.send_and_await_response(VFS_TIMEOUT) else {
return Resp::RemoteResponse(RemoteResponse::DownloadDenied(Reason::FileNotFound));
};
// transfer will *inherit* the blob bytes we receive from VFS
match spawn_transfer(&state.our, &file_name, None, APP_SHARE_TIMEOUT, &source) {
Ok(()) => Resp::RemoteResponse(RemoteResponse::DownloadApproved),
Err(_e) => Resp::RemoteResponse(RemoteResponse::DownloadDenied(Reason::WorkerSpawnFailed)),
}
}
/// only `our.node` can call this
fn handle_local_request(
state: &mut State,
@ -583,27 +512,6 @@ fn handle_receive_download_package(
Ok(())
}
fn handle_ft_worker_result(ft_worker_result: FTWorkerResult, context: &[u8]) -> anyhow::Result<()> {
let context = serde_json::from_slice::<FileTransferContext>(context)?;
if let FTWorkerResult::SendSuccess = ft_worker_result {
println!(
"successfully shared {} in {:.4}s",
context.file_name,
std::time::SystemTime::now()
.duration_since(context.start_time)
.unwrap()
.as_secs_f64(),
);
Ok(())
} else if let FTWorkerResult::Err(e) = ft_worker_result {
Err(anyhow::anyhow!("failed to share app: {e:?}"))
} else {
Err(anyhow::anyhow!(
"failed to share app: unknown FTWorkerResult {ft_worker_result:?}"
))
}
}
fn handle_eth_sub_event(
state: &mut State,
event: eth::SubscriptionResult,

View File

@ -0,0 +1,29 @@
[package]
name = "downloads"
version = "0.5.0"
edition = "2021"
[features]
simulation-mode = []
[dependencies]
alloy-primitives = "0.7.6"
alloy-sol-types = "0.7.6"
anyhow = "1.0"
bincode = "1.3.3"
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", branch = "develop" }
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10.8"
sha3 = "0.10.8"
url = "2.4.1"
urlencoding = "2.1.0"
wit-bindgen = "0.24.0"
zip = { version = "1.1.1", default-features = false }
[lib]
crate-type = ["cdylib"]
[package.metadata.component]
package = "kinode:process"

View File

@ -0,0 +1,74 @@
use crate::kinode::process::downloads::{DownloadRequest, PackageId};
use kinode_process_lib::*;
#[allow(dead_code)]
pub fn spawn_send_transfer(
our: &Address,
package_id: &PackageId,
version_hash: &str,
timeout: u64,
to_addr: &Address,
) -> anyhow::Result<()> {
let transfer_id: u64 = rand::random();
let Ok(worker_process_id) = spawn(
Some(&transfer_id.to_string()),
&format!("{}/pkg/ft_worker.wasm", our.package_id()),
OnExit::None,
our_capabilities(),
vec![],
false,
) else {
return Err(anyhow::anyhow!("failed to spawn ft_worker!"));
};
let req = Request::new()
.target((&our.node, worker_process_id))
.expects_response(timeout + 1)
.body(
serde_json::to_vec(&DownloadRequest {
package_id: package_id.clone(),
desired_version_hash: version_hash.to_string(),
download_from: Some(to_addr.to_string()),
})
.unwrap(),
);
req.send()?;
Ok(())
}
#[allow(dead_code)]
pub fn spawn_receive_transfer(
our: &Address,
package_id: &PackageId,
version_hash: &str,
timeout: u64,
) -> anyhow::Result<Address> {
let transfer_id: u64 = rand::random();
let Ok(worker_process_id) = spawn(
Some(&transfer_id.to_string()),
&format!("{}/pkg/ft_worker.wasm", our.package_id()),
OnExit::None,
our_capabilities(),
vec![],
false,
) else {
return Err(anyhow::anyhow!("failed to spawn ft_worker!"));
};
let req = Request::new()
.target((&our.node, worker_process_id.clone()))
.expects_response(timeout + 1)
.body(
serde_json::to_vec(&DownloadRequest {
package_id: package_id.clone(),
desired_version_hash: version_hash.to_string(),
download_from: None,
})
.unwrap(),
);
req.send()?;
Ok(Address::new(&our.node, worker_process_id))
}

View File

@ -0,0 +1,299 @@
#![feature(let_chains)]
//! downloads:app_store:sys
//! manages downloading and sharing of versioned packages.
//!
use std::{collections::HashSet, str::FromStr};
use crate::kinode::process::downloads::{
DownloadRequest, DownloadResponse, Downloads, ProgressUpdate,
};
use ft_worker_lib::{spawn_receive_transfer, spawn_send_transfer};
use kinode_process_lib::{
await_message, call_init, eth, get_blob, http, kimap, println, vfs, Address, Message,
PackageId, ProcessId, Request, Response,
};
use serde::{Deserialize, Serialize};
wit_bindgen::generate!({
path: "target/wit",
generate_unused_types: true,
world: "app-store-sys-v0",
additional_derives: [serde::Deserialize, serde::Serialize],
});
mod ft_worker_lib;
pub const VFS_TIMEOUT: u64 = 5; // 5s
pub const APP_SHARE_TIMEOUT: u64 = 120; // 120s
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)] // untagged as a meta-type for all incoming responses
pub enum Resp {
Download(DownloadResponse),
HttpClient(Result<http::client::HttpClientResponse, http::client::HttpClientError>),
}
// downloads:app_store:sys State. read in from vfs drive: /app_store:sys/downloads
#[derive(Debug, Serialize, Deserialize)]
pub struct State {
mirroring: HashSet<(PackageId, String)>, // (package_id, version_hash)
// Other implicit state needed?
// we could do something like a logfile for errors, downloads succeeded etc!
}
call_init!(init);
fn init(our: Address) {
println!("started");
// state should be a simple struct, which can be serialized (parts of it)
// it contains which packages we are mirroring
// // logfile
// load from state, okay to decouple from vfs. it's "app-state" for downloads.
let mut state = State {
mirroring: HashSet::new(),
};
loop {
match await_message() {
Err(send_error) => {
// TODO handle these based on what they are triggered by
println!("got network error: {send_error}");
}
Ok(message) => {
if let Err(e) = handle_message(&our, &mut state, &message) {
println!("error handling message: {:?}", e);
}
}
}
}
}
/// message router: parse into our Req and Resp types, then pass to
/// function defined for each kind of message. check whether the source
/// of the message is allowed to send that kind of message to us.
/// finally, fire a response if expected from a request.
fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow::Result<()> {
if message.is_request() {
match serde_json::from_slice::<Downloads>(message.body())? {
Downloads::Download(download_request) => {
let is_local = message.is_local(our);
handle_download_request(our, state, download_request, is_local)?;
}
Downloads::Progress(ProgressUpdate {
package_id,
version_hash,
downloaded,
total,
}) => {
// forward progress to UI
// http_server.ws_push_all_channels(
// "/",
// http::server::WsMessageType::Text,
// LazyLoadBlob {
// mime: Some("application/json".to_string()),
// bytes: serde_json::json!({
// "kind": "progress",
// "data": {
// "file_name": file_name,
// "chunks_received": chunks_received,
// "total_chunks": total_chunks,
// }
// })
// .to_string()
// .as_bytes()
// .to_vec(),
// },
// );
}
_ => {}
}
} else {
match serde_json::from_slice::<Resp>(message.body())? {
Resp::Download(download_response) => {
// TODO handle download response
}
Resp::HttpClient(resp) => {
let name = match message.context() {
Some(context) => std::str::from_utf8(context).unwrap_or_default(),
None => return Err(anyhow::anyhow!("http_client response without context")),
};
if let Ok(http::client::HttpClientResponse::Http(http::client::HttpResponse {
status,
..
})) = resp
{
if status == 200 {
handle_receive_http_download(state, &name)?;
}
} else {
println!("got http_client error: {resp:?}");
}
}
}
}
Ok(())
}
fn handle_download_request(
our: &Address,
state: &mut State,
download_request: DownloadRequest,
is_local: bool,
) -> anyhow::Result<()> {
let DownloadRequest {
package_id,
desired_version_hash,
download_from,
} = download_request;
match is_local {
true => {
// we are requesting this: (handle http types in main?), forwarding here?
if let Some(node_or_url) = download_from {
if node_or_url.starts_with("http") {
// use http_client to GET it
let Ok(url) = url::Url::parse(&node_or_url) else {
return Err(anyhow::anyhow!("bad url: {node_or_url}"));
};
// TODO: need context in this to get it back.
http::client::send_request(http::Method::GET, url, None, Some(60), vec![]);
}
// go download from the node or url
// spawn a worker, and send a downlaod to the node.
let our_worker = spawn_receive_transfer(
our,
&package_id,
&desired_version_hash,
APP_SHARE_TIMEOUT,
)?;
let target_node = Address::new(
node_or_url,
ProcessId::new(Some("downloads"), "app_store", "sys"),
);
Request::new()
.target(target_node)
.body(
serde_json::to_vec(&DownloadRequest {
package_id,
desired_version_hash,
download_from: Some(our_worker.to_string()),
})
.unwrap(),
)
.send()?;
// ok, now technically everything is ze ready. let's see what awaits and updates we send upstream/to the frontend.
}
}
false => {
// Someone wants to download something from us!
// // check mirrors first! :]
// handle the errors that come from spawning.
if let Some(worker) = download_from {
// handle this error too.
let target_worker = Address::from_str(&worker)?;
let _ = spawn_send_transfer(
our,
&package_id,
&desired_version_hash,
APP_SHARE_TIMEOUT,
&target_worker,
)?;
}
// bam, transfer should happen. again, handle errors.
}
}
// Update state to reflect that we're handling this download
// fix wit things
// state.mirroring.insert((package_id, desired_version_hash));
Ok(())
}
fn handle_receive_http_download(state: &mut State, name: &str) -> anyhow::Result<()> {
println!("Received HTTP download for: {}", name);
// Parse the name to extract package_id and version_hash
let parts: Vec<&str> = name.split('-').collect();
if parts.len() != 2 {
return Err(anyhow::anyhow!("Invalid download name format"));
}
let package_id = PackageId::from_str(parts[0])?;
let version_hash = parts[1].to_string();
// Move the downloaded file to the correct location
let source_path = format!("/tmp/{}", name);
let dest_path = format!(
"/app_store:sys/downloads/{}:{}-{}.zip",
package_id.package_name, package_id.publisher_node, version_hash
);
// vfs::rename(&source_path, &dest_path)?;
// Update state to reflect that we're now mirroring this package
state.mirroring.insert((package_id, version_hash.clone()));
// TODO: Verify the integrity of the downloaded file (e.g., checksum)
// TODO: Notify any waiting processes that the download is complete
println!("Successfully processed HTTP download for: {}", name);
Ok(())
}
// Some useful comments for the immediate future:
// NOTE: we should handle NewPackage, kit start-package should just work
// (
// match utils::new_package(
// &package_id.to_process_lib(),
// state,
// metadata.to_erc721_metadata(),
// mirror,
// blob.bytes,
// ) {
// Ok(()) => LocalResponse::NewPackageResponse(NewPackageResponse::Success),
// Err(_) => LocalResponse::NewPackageResponse(NewPackageResponse::InstallFailed),
// },
// None,
// )
// Need start/stop mirror commands here too.
// Auto updates... ?
// I'd imagine this would be triggered on the chain side almost right?
// that's where we hear about autoupdates first.
// then Apis.. we could do a get_apis, dependent on versions.
// but I'm going to punt for now, api_api can be moved to main_install section, 1 api per installed system: )
// that's actually a good system, because we don't really unzip unless we install (maybe in the future for inspecting files etc.)
// but that really should be done on remote
// LocalRequest::StartMirroring(package_id) => (
// match state.start_mirroring(&package_id.to_process_lib()) {
// true => LocalResponse::MirrorResponse(MirrorResponse::Success),
// false => LocalResponse::MirrorResponse(MirrorResponse::Failure),
// },
// None,
// ),
// LocalRequest::StopMirroring(package_id) => (
// match state.stop_mirroring(&package_id.to_process_lib()) {
// true => LocalResponse::MirrorResponse(MirrorResponse::Success),
// false => LocalResponse::MirrorResponse(MirrorResponse::Failure),
// },
// None,
//
// note this, might be tricky:
// let wit_version = match metadata {
// Some(metadata) => metadata.properties.wit_version,
// None => Some(0),
// };

View File

@ -1,4 +1,27 @@
[
{
"process_name": "downloads",
"process_wasm_path": "/downloads.wasm",
"on_exit": "Restart",
"request_networking": true,
"request_capabilities": [
"http_client:distro:sys",
"http_server:distro:sys",
"vfs:distro:sys",
{
"process": "vfs:distro:sys",
"params": {
"root": true
}
}
],
"grant_capabilities": [
"http_server:distro:sys",
"vfs:distro:sys",
"http_client:distro:sys"
],
"public": false
},
{
"process_name": "main",
"process_wasm_path": "/app_store.wasm",
@ -33,7 +56,7 @@
],
"grant_capabilities": [
"eth:distro:sys",
"net:distro:sys",
"net:distro:sys",
"http_client:distro:sys",
"http_server:distro:sys",
"kns_indexer:kns_indexer:sys",
@ -42,4 +65,4 @@
],
"public": false
}
]
]