From 8f57b67f3650ed7b82158ad064c4e8914b9f408f Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Mon, 29 Jan 2024 01:57:37 -0300 Subject: [PATCH] WIP everything working EXCEPT need to figure out kns_indexer syncing before app_store tries to resolve namehashes... --- modules/app_store/app_store/src/api.rs | 2 +- .../app_store/app_store/src/ft_worker_lib.rs | 136 ++++++++++++- modules/app_store/app_store/src/http_api.rs | 25 ++- modules/app_store/app_store/src/lib.rs | 48 ++--- modules/app_store/app_store/src/types.rs | 191 ++++++++++-------- modules/app_store/download/src/lib.rs | 2 +- modules/app_store/pkg/manifest.json | 3 + 7 files changed, 282 insertions(+), 125 deletions(-) mode change 120000 => 100644 modules/app_store/app_store/src/ft_worker_lib.rs diff --git a/modules/app_store/app_store/src/api.rs b/modules/app_store/app_store/src/api.rs index cc982245..10fba38b 100644 --- a/modules/app_store/app_store/src/api.rs +++ b/modules/app_store/app_store/src/api.rs @@ -45,7 +45,7 @@ pub enum LocalRequest { /// No blob is expected. Download { package: PackageId, - install_from: NodeId, + download_from: NodeId, /// Sets whether we will mirror this package for others mirror: bool, /// Sets whether we will try to automatically update this package diff --git a/modules/app_store/app_store/src/ft_worker_lib.rs b/modules/app_store/app_store/src/ft_worker_lib.rs deleted file mode 120000 index 1fe915a3..00000000 --- a/modules/app_store/app_store/src/ft_worker_lib.rs +++ /dev/null @@ -1 +0,0 @@ -../../ft_worker/src/ft_worker_lib.rs \ No newline at end of file diff --git a/modules/app_store/app_store/src/ft_worker_lib.rs b/modules/app_store/app_store/src/ft_worker_lib.rs new file mode 100644 index 00000000..2a892cba --- /dev/null +++ b/modules/app_store/app_store/src/ft_worker_lib.rs @@ -0,0 +1,135 @@ +use kinode_process_lib::*; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct FileTransferContext { + pub file_name: String, + pub file_size: Option, + pub start_time: std::time::SystemTime, +} + +/// sent as first Request to a newly spawned worker +/// the Receive command will be sent out to target +/// in order to prompt them to spawn a worker +#[derive(Debug, Serialize, Deserialize)] +pub enum FTWorkerCommand { + /// make sure to attach file itself as blob + Send { + target: Address, + file_name: String, + timeout: u64, + }, + Receive { + transfer_id: u64, + file_name: String, + file_size: u64, + total_chunks: u64, + timeout: u64, + }, +} + +/// sent as Response by worker to its parent +#[derive(Debug, Serialize, Deserialize)] +pub enum FTWorkerResult { + SendSuccess, + /// string is name of file. bytes in blob + ReceiveSuccess(String), + Err(TransferError), +} + +/// the possible errors that can be returned to the parent inside `FTWorkerResult` +#[derive(Debug, Serialize, Deserialize)] +pub enum TransferError { + TargetOffline, + TargetTimeout, + TargetRejected, + SourceFailed, +} + +/// A helper function to spawn a worker and initialize a file transfer. +/// The outcome will be sent as an [`FTWorkerResult`] to the caller process. +/// +/// if `file_bytes` is None, expects to inherit blob! +#[allow(dead_code)] +pub fn spawn_transfer( + our: &Address, + file_name: &str, + file_bytes: Option>, + timeout: u64, + to_addr: &Address, +) -> anyhow::Result<()> { + let transfer_id: u64 = rand::random(); + // spawn a worker and tell it to send the file + let Ok(worker_process_id) = spawn( + Some(&transfer_id.to_string()), + &format!("{}/pkg/ft_worker.wasm", our.package_id()), + OnExit::None, // can set message-on-panic here + our_capabilities(), + vec![], + false, // not public + ) else { + return Err(anyhow::anyhow!("failed to spawn ft_worker!")); + }; + // tell the worker what to do + let blob_or_inherit = match file_bytes { + Some(bytes) => Some(LazyLoadBlob { mime: None, bytes }), + None => None, + }; + let mut req = Request::new() + .target((our.node.as_ref(), worker_process_id)) + .inherit(!blob_or_inherit.is_some()) + .expects_response(timeout + 1) // don't call with 2^64 lol + .body( + serde_json::to_vec(&FTWorkerCommand::Send { + target: to_addr.clone(), + file_name: file_name.into(), + timeout, + }) + .unwrap(), + ) + .context( + serde_json::to_vec(&FileTransferContext { + file_name: file_name.into(), + file_size: match &blob_or_inherit { + Some(p) => Some(p.bytes.len() as u64), + None => None, // TODO + }, + start_time: std::time::SystemTime::now(), + }) + .unwrap(), + ); + + if let Some(blob) = blob_or_inherit { + req = req.blob(blob); + } + req.send() +} + +/// A helper function to allow a process to easily handle an incoming transfer +/// from an ft_worker. Call this when you get the initial [`FTWorkerCommand::Receive`] +/// and let it do the rest. The outcome will be sent as an [`FTWorkerResult`] inside +/// a Response to the caller. +#[allow(dead_code)] +pub fn spawn_receive_transfer(our: &Address, body: &[u8]) -> anyhow::Result<()> { + let Ok(FTWorkerCommand::Receive { transfer_id, .. }) = serde_json::from_slice(body) else { + return Err(anyhow::anyhow!( + "spawn_receive_transfer: got malformed request" + )); + }; + let Ok(worker_process_id) = spawn( + Some(&transfer_id.to_string()), + &format!("{}/pkg/ft_worker.wasm", our.package_id()), + OnExit::None, // can set message-on-panic here + our_capabilities(), + vec![], + false, // not public + ) else { + return Err(anyhow::anyhow!("failed to spawn ft_worker!")); + }; + // forward receive command to worker + Request::new() + .target((our.node.as_ref(), worker_process_id)) + .inherit(true) + .body(body) + .send() +} diff --git a/modules/app_store/app_store/src/http_api.rs b/modules/app_store/app_store/src/http_api.rs index 2795e2e0..e3a5653b 100644 --- a/modules/app_store/app_store/src/http_api.rs +++ b/modules/app_store/app_store/src/http_api.rs @@ -1,4 +1,4 @@ -use crate::{OnchainPackageMetadata, PackageListing, State}; +use crate::{OnchainPackageMetadata, PackageListing, PackageState, State}; use kinode_process_lib::{ eth::EthAddress, http::{send_response, IncomingHttpRequest, Method, StatusCode}, @@ -30,12 +30,18 @@ pub fn handle_http_request( req: &IncomingHttpRequest, ) -> anyhow::Result<()> { match serve_paths(state, req) { - Ok((status_code, headers, body)) => send_response(status_code, headers, body), - Err(e) => send_response( - StatusCode::INTERNAL_SERVER_ERROR, - None, - e.to_string().into_bytes(), + Ok((status_code, headers, body)) => send_response( + status_code, + Some(HashMap::from([( + String::from("Content-Type"), + String::from("application/json"), + )])), + body, ), + Err(e) => { + crate::print_to_terminal(1, &format!("http error: {:?}", e)); + send_response(StatusCode::INTERNAL_SERVER_ERROR, None, vec![]) + } } } @@ -58,7 +64,12 @@ fn serve_paths( return Ok(( StatusCode::OK, None, - serde_json::to_vec(&state.downloaded_packages)?, + serde_json::to_vec( + &state + .downloaded_packages + .iter() + .collect::>(), + )?, )); } // GET all listed apps diff --git a/modules/app_store/app_store/src/lib.rs b/modules/app_store/app_store/src/lib.rs index 6cb687f3..5438ae0a 100644 --- a/modules/app_store/app_store/src/lib.rs +++ b/modules/app_store/app_store/src/lib.rs @@ -66,39 +66,31 @@ call_init!(init); fn init(our: Address) { println!("{}: started", our.package()); - bind_http_path("/apps", true, false).expect("failed to bind http path"); - bind_http_path("/apps/:id", true, false).expect("failed to bind http path"); - bind_http_path("/apps/caps/:id", true, false).expect("failed to bind http path"); - bind_http_path("/apps/latest", true, false).expect("failed to bind http path"); - bind_http_path("/apps/search/:query", true, false).expect("failed to bind http path"); + for path in [ + "/apps", + "/apps/listed", + "/apps/:id", + "/apps/listed/:id", + "/apps/:id/caps", + "/apps/:id/mirror", + "/apps/:id/auto-update", + ] { + bind_http_path(path, true, false).expect("failed to bind http path"); + } // serve_ui(&our, "ui", true, false).expect("failed to serve static UI"); // load in our saved state or initalize a new one if none exists let mut state = get_typed_state(|bytes| Ok(bincode::deserialize(bytes)?)) - .unwrap_or(State::new(&our).unwrap()); + .unwrap_or(State::new(CONTRACT_ADDRESS.to_string()).unwrap()); - // TODO: first, await a message from the kernel which will contain the - // contract address for the KNS version we want to track. - let contract_address: Option = Some(CONTRACT_ADDRESS.to_string()); - // loop { - // let Ok(Message::Request { source, body, .. }) = await_message() else { - // continue; - // }; - // if source.process != "kernel:distro:sys" { - // continue; - // } - // contract_address = Some(std::str::from_utf8(&body).unwrap().to_string()); - // break; - // } - - // if contract address changed from a previous run, reset state - if state.contract_address != contract_address { - state = State::new(&our).unwrap(); + if state.contract_address != CONTRACT_ADDRESS { + println!("app store: warning: contract address mismatch--overwriting saved state"); + state = State::new(CONTRACT_ADDRESS.to_string()).unwrap(); } println!( "app store: indexing on contract address {}", - contract_address.as_ref().unwrap() + state.contract_address ); crate::print_to_terminal(1, &format!("starting state: {state:?}")); @@ -107,7 +99,7 @@ fn init(our: Address) { // subscribe to events on the app store contract SubscribeLogsRequest::new(1) // subscription id 1 - .address(EthAddress::from_str(contract_address.unwrap().as_str()).unwrap()) + .address(EthAddress::from_str(&state.contract_address).unwrap()) .from_block(state.last_saved_block - 1) .events(vec![ "AppRegistered(bytes32,uint256,string,string,bytes32)", @@ -284,12 +276,12 @@ fn handle_local_request( } LocalRequest::Download { package: package_id, - install_from, + download_from, mirror, auto_update, desired_version_hash, } => LocalResponse::DownloadResponse( - match Request::to((install_from.as_str(), our.process.clone())) + match Request::to((download_from.as_str(), our.process.clone())) .inherit(true) .body( serde_json::to_vec(&RemoteRequest::Download { @@ -306,7 +298,7 @@ fn handle_local_request( requested_packages.insert( package_id.clone(), RequestedPackage { - from: install_from.clone(), + from: download_from.clone(), mirror: *mirror, auto_update: *auto_update, desired_version_hash: desired_version_hash.clone(), diff --git a/modules/app_store/app_store/src/types.rs b/modules/app_store/app_store/src/types.rs index d70fcda7..c931b39d 100644 --- a/modules/app_store/app_store/src/types.rs +++ b/modules/app_store/app_store/src/types.rs @@ -1,3 +1,4 @@ +use alloy_primitives::FixedBytes; use alloy_rpc_types::Log; use alloy_sol_types::{sol, SolEvent}; use kinode_process_lib::kernel_types as kt; @@ -6,9 +7,23 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; sol! { - event AppRegistered(bytes32,uint256,string,string,bytes32); - event AppMetadataUpdated(uint256,string,bytes32); - event Transfer(address,address,uint256); + event AppRegistered( + bytes32 indexed publisherKnsNodeId, + uint256 indexed package, + string packageName, + string metadataUrl, + bytes32 metadataHash + ); + event AppMetadataUpdated( + uint256 indexed package, + string metadataUrl, + bytes32 metadataHash + ); + event Transfer( + address, + address, + uint256 + ); } // from kns_indexer @@ -85,8 +100,7 @@ pub struct PackageState { #[derive(Debug, Serialize, Deserialize)] pub struct State { /// the address of the contract we are using to read package listings - /// this is set by runtime distro at boot-time - pub contract_address: Option, + pub contract_address: String, /// the last block at which we saved the state of the listings to disk. /// we don't want to save the state every time we get a new listing, /// so we only save it every so often and then mark the block at which @@ -96,6 +110,7 @@ pub struct State { /// we keep the full state of the package manager here, calculated from /// the listings contract logs. in the future, we'll offload this and /// only track a certain number of packages... + pub package_hashes: HashMap, // TODO migrate to sqlite db pub listed_packages: HashMap, // TODO migrate to sqlite db /// we keep the full state of the packages we have downloaded here. /// in order to keep this synchronized with our filesystem, we will @@ -107,22 +122,25 @@ pub struct State { impl State { /// To create a new state, we populate the downloaded_packages map /// with all packages parseable from our filesystem. - pub fn new(our: &Address) -> anyhow::Result { + pub fn new(contract_address: String) -> anyhow::Result { + crate::print_to_terminal(1, "app store: producing new state"); let mut state = State { - contract_address: None, + contract_address, last_saved_block: 1, + package_hashes: HashMap::new(), listed_packages: HashMap::new(), downloaded_packages: HashMap::new(), }; - // state.populate_packages_from_filesystem()?; + crate::print_to_terminal( + 1, + &format!("populate: {:?}", state.populate_packages_from_filesystem()), + ); Ok(state) } pub fn get_listing(&self, package_id: &PackageId) -> Option<&PackageListing> { - self.listed_packages.get(&generate_package_hash( - package_id.package(), - &generate_namehash(package_id.publisher()), - )) + self.listed_packages + .get(self.package_hashes.get(package_id)?) } fn get_listing_with_hash_mut( @@ -133,16 +151,20 @@ impl State { } /// Done in response to any new onchain listing update other than 'delete' - fn insert_listing(&mut self, listing: PackageListing) { - self.listed_packages.insert( - generate_package_hash(&listing.name, &generate_namehash(&listing.publisher)), - listing, + fn insert_listing(&mut self, package_hash: PackageHash, listing: PackageListing) { + self.package_hashes.insert( + PackageId::new(&listing.name, &listing.publisher), + package_hash.clone(), ); + self.listed_packages.insert(package_hash, listing); } /// Done in response to an onchain listing update of 'delete' fn delete_listing(&mut self, package_hash: &PackageHash) { - self.listed_packages.remove(package_hash); + if let Some(old) = self.listed_packages.remove(package_hash) { + self.package_hashes + .remove(&PackageId::new(&old.name, &old.publisher)); + } } pub fn get_downloaded_package(&self, package_id: &PackageId) -> Option { @@ -194,55 +216,56 @@ impl State { } /// saves state - /// TODO need root dir access to do this operation - // pub fn populate_packages_from_filesystem(&mut self) -> anyhow::Result<()> { - // let Message::Response { body, .. } = Request::to(("our", "vfs", "distro", "sys")) - // .body(serde_json::to_vec(&vfs::VfsRequest { - // path: "/".to_string(), - // action: vfs::VfsAction::ReadDir, - // })?) - // .send_and_await_response(3)?? - // else { - // return Err(anyhow::anyhow!("vfs: bad response")); - // }; - // let response = serde_json::from_slice::(&body)?; - // let vfs::VfsResponse::ReadDir(entries) = response else { - // return Err(anyhow::anyhow!("vfs: unexpected response: {:?}", response)); - // }; - // for entry in entries { - // // ignore non-package dirs - // let Ok(package_id) = entry.path[1..].parse::() else { - // continue; - // }; - // if entry.file_type == vfs::FileType::Directory { - // let zip_file = vfs::File { - // path: format!("/{}/pkg/{}.zip", package_id, package_id), - // }; - // let Ok(zip_file_bytes) = zip_file.read() else { - // continue; - // }; - // // generate entry from this data - // // for the version hash, take the SHA-256 hash of the zip file - // let our_version = generate_version_hash(&zip_file_bytes); - // // the user will need to turn mirroring and auto-update back on if they - // // have to reset the state of their app store for some reason. the apps - // // themselves will remain on disk unless explicitly deleted. - // self.add_downloaded_package( - // &package_id, - // PackageState { - // mirrored_from: None, - // our_version, - // source_zip: None, // since it's already installed - // caps_approved: true, // since it's already installed this must be true - // mirroring: false, - // auto_update: false, - // metadata: None, - // }, - // ) - // } - // } - // Ok(()) - // } + pub fn populate_packages_from_filesystem(&mut self) -> anyhow::Result<()> { + let Message::Response { body, .. } = Request::to(("our", "vfs", "distro", "sys")) + .body(serde_json::to_vec(&vfs::VfsRequest { + path: "/".to_string(), + action: vfs::VfsAction::ReadDir, + })?) + .send_and_await_response(3)?? + else { + return Err(anyhow::anyhow!("vfs: bad response")); + }; + let response = serde_json::from_slice::(&body)?; + crate::print_to_terminal(1, &format!("vfs response: {:?}", response)); + let vfs::VfsResponse::ReadDir(entries) = response else { + return Err(anyhow::anyhow!("vfs: unexpected response: {:?}", response)); + }; + for entry in entries { + crate::print_to_terminal(1, &format!("entry: {:?}", entry)); + // ignore non-package dirs + let Ok(package_id) = entry.path.parse::() else { + continue; + }; + if entry.file_type == vfs::FileType::Directory { + let zip_file = vfs::File { + path: format!("/{}/pkg/{}.zip", package_id, package_id), + }; + let Ok(zip_file_bytes) = zip_file.read() else { + continue; + }; + // generate entry from this data + // for the version hash, take the SHA-256 hash of the zip file + let our_version = generate_version_hash(&zip_file_bytes); + // the user will need to turn mirroring and auto-update back on if they + // have to reset the state of their app store for some reason. the apps + // themselves will remain on disk unless explicitly deleted. + self.add_downloaded_package( + &package_id, + PackageState { + mirrored_from: None, + our_version, + source_zip: None, // since it's already installed + caps_approved: true, // since it's already installed this must be true + mirroring: false, + auto_update: false, + metadata: None, + }, + ) + } + } + Ok(()) + } pub fn install_downloaded_package(&mut self, package_id: &PackageId) -> anyhow::Result<()> { let Some(mut package_state) = self.get_downloaded_package(package_id) else { @@ -351,10 +374,10 @@ impl State { match log.topics[0] { AppRegistered::SIGNATURE_HASH => { - let (publisher_namehash, package_hash, package_name, metadata_url, metadata_hash) = - AppRegistered::abi_decode_data(&log.data, false)?; - - let publisher_namehash = publisher_namehash.to_string(); + let publisher_namehash = log.topics[1]; + let package_hash = log.topics[2]; + let (package_name, metadata_url, metadata_hash) = + AppRegistered::abi_decode_data(&log.data, true)?; let metadata_hash = metadata_hash.to_string(); crate::print_to_terminal( @@ -365,7 +388,7 @@ impl State { ) ); - if generate_package_hash(&package_name, &publisher_namehash) + if generate_package_hash(&package_name, publisher_namehash.as_slice()) != package_hash.to_string() { return Err(anyhow::anyhow!( @@ -375,7 +398,7 @@ impl State { let Ok(Ok(Message::Response { body, .. })) = Request::to(("our", "kns_indexer", "kns_indexer", "sys")) .body(serde_json::to_vec(&IndexerRequests::NamehashToName( - publisher_namehash, + publisher_namehash.to_string(), ))?) .send_and_await_response(3) else { return Err(anyhow::anyhow!("got invalid response from kns_indexer")); @@ -395,12 +418,12 @@ impl State { metadata_hash, metadata, }; - self.insert_listing(listing); + self.insert_listing(package_hash.to_string(), listing); } AppMetadataUpdated::SIGNATURE_HASH => { - let (package_hash, metadata_url, metadata_hash) = + let package_hash = log.topics[1].to_string(); + let (metadata_url, metadata_hash) = AppMetadataUpdated::abi_decode_data(&log.data, false)?; - let metadata_hash = metadata_hash.to_string(); crate::print_to_terminal( @@ -417,10 +440,10 @@ impl State { "app store: got log with no matching listing" ))?; - let metadata = fetch_metadata(&metadata_url, &metadata_hash)?; + let metadata = fetch_metadata(&metadata_url, &metadata_hash).ok(); current_listing.metadata_hash = metadata_hash; - current_listing.metadata = Some(metadata); + current_listing.metadata = metadata; } Transfer::SIGNATURE_HASH => { let from = alloy_primitives::Address::from_word(log.topics[1]); @@ -480,13 +503,6 @@ pub fn fetch_metadata( } } -pub fn generate_namehash(name: &str) -> String { - use sha3::{Digest, Keccak256}; - let mut hasher = Keccak256::new(); - hasher.update(name); - format!("{:x}", hasher.finalize()) -} - pub fn generate_metadata_hash(metadata: &[u8]) -> String { use sha3::{Digest, Keccak256}; let mut hasher = Keccak256::new(); @@ -494,11 +510,12 @@ pub fn generate_metadata_hash(metadata: &[u8]) -> String { format!("{:x}", hasher.finalize()) } -pub fn generate_package_hash(name: &str, publisher_namehash: &str) -> String { +pub fn generate_package_hash(name: &str, publisher_namehash: &[u8]) -> String { use sha3::{Digest, Keccak256}; let mut hasher = Keccak256::new(); - hasher.update([name, publisher_namehash].concat()); - format!("{:x}", hasher.finalize()) + hasher.update([name.as_bytes(), publisher_namehash].concat()); + let hash = hasher.finalize(); + format!("0x{:x}", hash) } pub fn generate_version_hash(zip_bytes: &[u8]) -> String { diff --git a/modules/app_store/download/src/lib.rs b/modules/app_store/download/src/lib.rs index 536ffdc4..9a5e246f 100644 --- a/modules/app_store/download/src/lib.rs +++ b/modules/app_store/download/src/lib.rs @@ -42,7 +42,7 @@ fn init(our: Address) { .body( serde_json::to_vec(&LocalRequest::Download { package: package_id.clone(), - install_from: download_from.clone(), + download_from: download_from.clone(), mirror: true, auto_update: true, desired_version_hash: None, diff --git a/modules/app_store/pkg/manifest.json b/modules/app_store/pkg/manifest.json index 4d56d88b..5112319d 100644 --- a/modules/app_store/pkg/manifest.json +++ b/modules/app_store/pkg/manifest.json @@ -25,7 +25,10 @@ } ], "grant_capabilities": [ + "eth:distro:sys", + "http_client:distro:sys", "http_server:distro:sys", + "kns_indexer:kns_indexer:sys", "terminal:terminal:sys", "vfs:distro:sys" ],