WIP everything working EXCEPT need to figure out kns_indexer syncing before app_store tries to resolve namehashes...

This commit is contained in:
dr-frmr 2024-01-29 01:57:37 -03:00
parent f488d88ae3
commit 8f57b67f36
No known key found for this signature in database
7 changed files with 282 additions and 125 deletions

View File

@ -45,7 +45,7 @@ pub enum LocalRequest {
/// No blob is expected.
Download {
package: PackageId,
install_from: NodeId,
download_from: NodeId,
/// Sets whether we will mirror this package for others
mirror: bool,
/// Sets whether we will try to automatically update this package

View File

@ -1 +0,0 @@
../../ft_worker/src/ft_worker_lib.rs

View File

@ -0,0 +1,135 @@
use kinode_process_lib::*;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct FileTransferContext {
pub file_name: String,
pub file_size: Option<u64>,
pub start_time: std::time::SystemTime,
}
/// sent as first Request to a newly spawned worker
/// the Receive command will be sent out to target
/// in order to prompt them to spawn a worker
#[derive(Debug, Serialize, Deserialize)]
pub enum FTWorkerCommand {
/// make sure to attach file itself as blob
Send {
target: Address,
file_name: String,
timeout: u64,
},
Receive {
transfer_id: u64,
file_name: String,
file_size: u64,
total_chunks: u64,
timeout: u64,
},
}
/// sent as Response by worker to its parent
#[derive(Debug, Serialize, Deserialize)]
pub enum FTWorkerResult {
SendSuccess,
/// string is name of file. bytes in blob
ReceiveSuccess(String),
Err(TransferError),
}
/// the possible errors that can be returned to the parent inside `FTWorkerResult`
#[derive(Debug, Serialize, Deserialize)]
pub enum TransferError {
TargetOffline,
TargetTimeout,
TargetRejected,
SourceFailed,
}
/// A helper function to spawn a worker and initialize a file transfer.
/// The outcome will be sent as an [`FTWorkerResult`] to the caller process.
///
/// if `file_bytes` is None, expects to inherit blob!
#[allow(dead_code)]
pub fn spawn_transfer(
our: &Address,
file_name: &str,
file_bytes: Option<Vec<u8>>,
timeout: u64,
to_addr: &Address,
) -> anyhow::Result<()> {
let transfer_id: u64 = rand::random();
// spawn a worker and tell it to send the file
let Ok(worker_process_id) = spawn(
Some(&transfer_id.to_string()),
&format!("{}/pkg/ft_worker.wasm", our.package_id()),
OnExit::None, // can set message-on-panic here
our_capabilities(),
vec![],
false, // not public
) else {
return Err(anyhow::anyhow!("failed to spawn ft_worker!"));
};
// tell the worker what to do
let blob_or_inherit = match file_bytes {
Some(bytes) => Some(LazyLoadBlob { mime: None, bytes }),
None => None,
};
let mut req = Request::new()
.target((our.node.as_ref(), worker_process_id))
.inherit(!blob_or_inherit.is_some())
.expects_response(timeout + 1) // don't call with 2^64 lol
.body(
serde_json::to_vec(&FTWorkerCommand::Send {
target: to_addr.clone(),
file_name: file_name.into(),
timeout,
})
.unwrap(),
)
.context(
serde_json::to_vec(&FileTransferContext {
file_name: file_name.into(),
file_size: match &blob_or_inherit {
Some(p) => Some(p.bytes.len() as u64),
None => None, // TODO
},
start_time: std::time::SystemTime::now(),
})
.unwrap(),
);
if let Some(blob) = blob_or_inherit {
req = req.blob(blob);
}
req.send()
}
/// A helper function to allow a process to easily handle an incoming transfer
/// from an ft_worker. Call this when you get the initial [`FTWorkerCommand::Receive`]
/// and let it do the rest. The outcome will be sent as an [`FTWorkerResult`] inside
/// a Response to the caller.
#[allow(dead_code)]
pub fn spawn_receive_transfer(our: &Address, body: &[u8]) -> anyhow::Result<()> {
let Ok(FTWorkerCommand::Receive { transfer_id, .. }) = serde_json::from_slice(body) else {
return Err(anyhow::anyhow!(
"spawn_receive_transfer: got malformed request"
));
};
let Ok(worker_process_id) = spawn(
Some(&transfer_id.to_string()),
&format!("{}/pkg/ft_worker.wasm", our.package_id()),
OnExit::None, // can set message-on-panic here
our_capabilities(),
vec![],
false, // not public
) else {
return Err(anyhow::anyhow!("failed to spawn ft_worker!"));
};
// forward receive command to worker
Request::new()
.target((our.node.as_ref(), worker_process_id))
.inherit(true)
.body(body)
.send()
}

View File

@ -1,4 +1,4 @@
use crate::{OnchainPackageMetadata, PackageListing, State};
use crate::{OnchainPackageMetadata, PackageListing, PackageState, State};
use kinode_process_lib::{
eth::EthAddress,
http::{send_response, IncomingHttpRequest, Method, StatusCode},
@ -30,12 +30,18 @@ pub fn handle_http_request(
req: &IncomingHttpRequest,
) -> anyhow::Result<()> {
match serve_paths(state, req) {
Ok((status_code, headers, body)) => send_response(status_code, headers, body),
Err(e) => send_response(
StatusCode::INTERNAL_SERVER_ERROR,
None,
e.to_string().into_bytes(),
Ok((status_code, headers, body)) => send_response(
status_code,
Some(HashMap::from([(
String::from("Content-Type"),
String::from("application/json"),
)])),
body,
),
Err(e) => {
crate::print_to_terminal(1, &format!("http error: {:?}", e));
send_response(StatusCode::INTERNAL_SERVER_ERROR, None, vec![])
}
}
}
@ -58,7 +64,12 @@ fn serve_paths(
return Ok((
StatusCode::OK,
None,
serde_json::to_vec(&state.downloaded_packages)?,
serde_json::to_vec(
&state
.downloaded_packages
.iter()
.collect::<Vec<(&PackageId, &PackageState)>>(),
)?,
));
}
// GET all listed apps

View File

@ -66,39 +66,31 @@ call_init!(init);
fn init(our: Address) {
println!("{}: started", our.package());
bind_http_path("/apps", true, false).expect("failed to bind http path");
bind_http_path("/apps/:id", true, false).expect("failed to bind http path");
bind_http_path("/apps/caps/:id", true, false).expect("failed to bind http path");
bind_http_path("/apps/latest", true, false).expect("failed to bind http path");
bind_http_path("/apps/search/:query", true, false).expect("failed to bind http path");
for path in [
"/apps",
"/apps/listed",
"/apps/:id",
"/apps/listed/:id",
"/apps/:id/caps",
"/apps/:id/mirror",
"/apps/:id/auto-update",
] {
bind_http_path(path, true, false).expect("failed to bind http path");
}
// serve_ui(&our, "ui", true, false).expect("failed to serve static UI");
// load in our saved state or initalize a new one if none exists
let mut state = get_typed_state(|bytes| Ok(bincode::deserialize(bytes)?))
.unwrap_or(State::new(&our).unwrap());
.unwrap_or(State::new(CONTRACT_ADDRESS.to_string()).unwrap());
// TODO: first, await a message from the kernel which will contain the
// contract address for the KNS version we want to track.
let contract_address: Option<String> = Some(CONTRACT_ADDRESS.to_string());
// loop {
// let Ok(Message::Request { source, body, .. }) = await_message() else {
// continue;
// };
// if source.process != "kernel:distro:sys" {
// continue;
// }
// contract_address = Some(std::str::from_utf8(&body).unwrap().to_string());
// break;
// }
// if contract address changed from a previous run, reset state
if state.contract_address != contract_address {
state = State::new(&our).unwrap();
if state.contract_address != CONTRACT_ADDRESS {
println!("app store: warning: contract address mismatch--overwriting saved state");
state = State::new(CONTRACT_ADDRESS.to_string()).unwrap();
}
println!(
"app store: indexing on contract address {}",
contract_address.as_ref().unwrap()
state.contract_address
);
crate::print_to_terminal(1, &format!("starting state: {state:?}"));
@ -107,7 +99,7 @@ fn init(our: Address) {
// subscribe to events on the app store contract
SubscribeLogsRequest::new(1) // subscription id 1
.address(EthAddress::from_str(contract_address.unwrap().as_str()).unwrap())
.address(EthAddress::from_str(&state.contract_address).unwrap())
.from_block(state.last_saved_block - 1)
.events(vec![
"AppRegistered(bytes32,uint256,string,string,bytes32)",
@ -284,12 +276,12 @@ fn handle_local_request(
}
LocalRequest::Download {
package: package_id,
install_from,
download_from,
mirror,
auto_update,
desired_version_hash,
} => LocalResponse::DownloadResponse(
match Request::to((install_from.as_str(), our.process.clone()))
match Request::to((download_from.as_str(), our.process.clone()))
.inherit(true)
.body(
serde_json::to_vec(&RemoteRequest::Download {
@ -306,7 +298,7 @@ fn handle_local_request(
requested_packages.insert(
package_id.clone(),
RequestedPackage {
from: install_from.clone(),
from: download_from.clone(),
mirror: *mirror,
auto_update: *auto_update,
desired_version_hash: desired_version_hash.clone(),

View File

@ -1,3 +1,4 @@
use alloy_primitives::FixedBytes;
use alloy_rpc_types::Log;
use alloy_sol_types::{sol, SolEvent};
use kinode_process_lib::kernel_types as kt;
@ -6,9 +7,23 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
sol! {
event AppRegistered(bytes32,uint256,string,string,bytes32);
event AppMetadataUpdated(uint256,string,bytes32);
event Transfer(address,address,uint256);
event AppRegistered(
bytes32 indexed publisherKnsNodeId,
uint256 indexed package,
string packageName,
string metadataUrl,
bytes32 metadataHash
);
event AppMetadataUpdated(
uint256 indexed package,
string metadataUrl,
bytes32 metadataHash
);
event Transfer(
address,
address,
uint256
);
}
// from kns_indexer
@ -85,8 +100,7 @@ pub struct PackageState {
#[derive(Debug, Serialize, Deserialize)]
pub struct State {
/// the address of the contract we are using to read package listings
/// this is set by runtime distro at boot-time
pub contract_address: Option<String>,
pub contract_address: String,
/// the last block at which we saved the state of the listings to disk.
/// we don't want to save the state every time we get a new listing,
/// so we only save it every so often and then mark the block at which
@ -96,6 +110,7 @@ pub struct State {
/// we keep the full state of the package manager here, calculated from
/// the listings contract logs. in the future, we'll offload this and
/// only track a certain number of packages...
pub package_hashes: HashMap<PackageId, PackageHash>, // TODO migrate to sqlite db
pub listed_packages: HashMap<PackageHash, PackageListing>, // TODO migrate to sqlite db
/// we keep the full state of the packages we have downloaded here.
/// in order to keep this synchronized with our filesystem, we will
@ -107,22 +122,25 @@ pub struct State {
impl State {
/// To create a new state, we populate the downloaded_packages map
/// with all packages parseable from our filesystem.
pub fn new(our: &Address) -> anyhow::Result<Self> {
pub fn new(contract_address: String) -> anyhow::Result<Self> {
crate::print_to_terminal(1, "app store: producing new state");
let mut state = State {
contract_address: None,
contract_address,
last_saved_block: 1,
package_hashes: HashMap::new(),
listed_packages: HashMap::new(),
downloaded_packages: HashMap::new(),
};
// state.populate_packages_from_filesystem()?;
crate::print_to_terminal(
1,
&format!("populate: {:?}", state.populate_packages_from_filesystem()),
);
Ok(state)
}
pub fn get_listing(&self, package_id: &PackageId) -> Option<&PackageListing> {
self.listed_packages.get(&generate_package_hash(
package_id.package(),
&generate_namehash(package_id.publisher()),
))
self.listed_packages
.get(self.package_hashes.get(package_id)?)
}
fn get_listing_with_hash_mut(
@ -133,16 +151,20 @@ impl State {
}
/// Done in response to any new onchain listing update other than 'delete'
fn insert_listing(&mut self, listing: PackageListing) {
self.listed_packages.insert(
generate_package_hash(&listing.name, &generate_namehash(&listing.publisher)),
listing,
fn insert_listing(&mut self, package_hash: PackageHash, listing: PackageListing) {
self.package_hashes.insert(
PackageId::new(&listing.name, &listing.publisher),
package_hash.clone(),
);
self.listed_packages.insert(package_hash, listing);
}
/// Done in response to an onchain listing update of 'delete'
fn delete_listing(&mut self, package_hash: &PackageHash) {
self.listed_packages.remove(package_hash);
if let Some(old) = self.listed_packages.remove(package_hash) {
self.package_hashes
.remove(&PackageId::new(&old.name, &old.publisher));
}
}
pub fn get_downloaded_package(&self, package_id: &PackageId) -> Option<PackageState> {
@ -194,55 +216,56 @@ impl State {
}
/// saves state
/// TODO need root dir access to do this operation
// pub fn populate_packages_from_filesystem(&mut self) -> anyhow::Result<()> {
// let Message::Response { body, .. } = Request::to(("our", "vfs", "distro", "sys"))
// .body(serde_json::to_vec(&vfs::VfsRequest {
// path: "/".to_string(),
// action: vfs::VfsAction::ReadDir,
// })?)
// .send_and_await_response(3)??
// else {
// return Err(anyhow::anyhow!("vfs: bad response"));
// };
// let response = serde_json::from_slice::<vfs::VfsResponse>(&body)?;
// let vfs::VfsResponse::ReadDir(entries) = response else {
// return Err(anyhow::anyhow!("vfs: unexpected response: {:?}", response));
// };
// for entry in entries {
// // ignore non-package dirs
// let Ok(package_id) = entry.path[1..].parse::<PackageId>() else {
// continue;
// };
// if entry.file_type == vfs::FileType::Directory {
// let zip_file = vfs::File {
// path: format!("/{}/pkg/{}.zip", package_id, package_id),
// };
// let Ok(zip_file_bytes) = zip_file.read() else {
// continue;
// };
// // generate entry from this data
// // for the version hash, take the SHA-256 hash of the zip file
// let our_version = generate_version_hash(&zip_file_bytes);
// // the user will need to turn mirroring and auto-update back on if they
// // have to reset the state of their app store for some reason. the apps
// // themselves will remain on disk unless explicitly deleted.
// self.add_downloaded_package(
// &package_id,
// PackageState {
// mirrored_from: None,
// our_version,
// source_zip: None, // since it's already installed
// caps_approved: true, // since it's already installed this must be true
// mirroring: false,
// auto_update: false,
// metadata: None,
// },
// )
// }
// }
// Ok(())
// }
pub fn populate_packages_from_filesystem(&mut self) -> anyhow::Result<()> {
let Message::Response { body, .. } = Request::to(("our", "vfs", "distro", "sys"))
.body(serde_json::to_vec(&vfs::VfsRequest {
path: "/".to_string(),
action: vfs::VfsAction::ReadDir,
})?)
.send_and_await_response(3)??
else {
return Err(anyhow::anyhow!("vfs: bad response"));
};
let response = serde_json::from_slice::<vfs::VfsResponse>(&body)?;
crate::print_to_terminal(1, &format!("vfs response: {:?}", response));
let vfs::VfsResponse::ReadDir(entries) = response else {
return Err(anyhow::anyhow!("vfs: unexpected response: {:?}", response));
};
for entry in entries {
crate::print_to_terminal(1, &format!("entry: {:?}", entry));
// ignore non-package dirs
let Ok(package_id) = entry.path.parse::<PackageId>() else {
continue;
};
if entry.file_type == vfs::FileType::Directory {
let zip_file = vfs::File {
path: format!("/{}/pkg/{}.zip", package_id, package_id),
};
let Ok(zip_file_bytes) = zip_file.read() else {
continue;
};
// generate entry from this data
// for the version hash, take the SHA-256 hash of the zip file
let our_version = generate_version_hash(&zip_file_bytes);
// the user will need to turn mirroring and auto-update back on if they
// have to reset the state of their app store for some reason. the apps
// themselves will remain on disk unless explicitly deleted.
self.add_downloaded_package(
&package_id,
PackageState {
mirrored_from: None,
our_version,
source_zip: None, // since it's already installed
caps_approved: true, // since it's already installed this must be true
mirroring: false,
auto_update: false,
metadata: None,
},
)
}
}
Ok(())
}
pub fn install_downloaded_package(&mut self, package_id: &PackageId) -> anyhow::Result<()> {
let Some(mut package_state) = self.get_downloaded_package(package_id) else {
@ -351,10 +374,10 @@ impl State {
match log.topics[0] {
AppRegistered::SIGNATURE_HASH => {
let (publisher_namehash, package_hash, package_name, metadata_url, metadata_hash) =
AppRegistered::abi_decode_data(&log.data, false)?;
let publisher_namehash = publisher_namehash.to_string();
let publisher_namehash = log.topics[1];
let package_hash = log.topics[2];
let (package_name, metadata_url, metadata_hash) =
AppRegistered::abi_decode_data(&log.data, true)?;
let metadata_hash = metadata_hash.to_string();
crate::print_to_terminal(
@ -365,7 +388,7 @@ impl State {
)
);
if generate_package_hash(&package_name, &publisher_namehash)
if generate_package_hash(&package_name, publisher_namehash.as_slice())
!= package_hash.to_string()
{
return Err(anyhow::anyhow!(
@ -375,7 +398,7 @@ impl State {
let Ok(Ok(Message::Response { body, .. })) =
Request::to(("our", "kns_indexer", "kns_indexer", "sys"))
.body(serde_json::to_vec(&IndexerRequests::NamehashToName(
publisher_namehash,
publisher_namehash.to_string(),
))?)
.send_and_await_response(3) else {
return Err(anyhow::anyhow!("got invalid response from kns_indexer"));
@ -395,12 +418,12 @@ impl State {
metadata_hash,
metadata,
};
self.insert_listing(listing);
self.insert_listing(package_hash.to_string(), listing);
}
AppMetadataUpdated::SIGNATURE_HASH => {
let (package_hash, metadata_url, metadata_hash) =
let package_hash = log.topics[1].to_string();
let (metadata_url, metadata_hash) =
AppMetadataUpdated::abi_decode_data(&log.data, false)?;
let metadata_hash = metadata_hash.to_string();
crate::print_to_terminal(
@ -417,10 +440,10 @@ impl State {
"app store: got log with no matching listing"
))?;
let metadata = fetch_metadata(&metadata_url, &metadata_hash)?;
let metadata = fetch_metadata(&metadata_url, &metadata_hash).ok();
current_listing.metadata_hash = metadata_hash;
current_listing.metadata = Some(metadata);
current_listing.metadata = metadata;
}
Transfer::SIGNATURE_HASH => {
let from = alloy_primitives::Address::from_word(log.topics[1]);
@ -480,13 +503,6 @@ pub fn fetch_metadata(
}
}
pub fn generate_namehash(name: &str) -> String {
use sha3::{Digest, Keccak256};
let mut hasher = Keccak256::new();
hasher.update(name);
format!("{:x}", hasher.finalize())
}
pub fn generate_metadata_hash(metadata: &[u8]) -> String {
use sha3::{Digest, Keccak256};
let mut hasher = Keccak256::new();
@ -494,11 +510,12 @@ pub fn generate_metadata_hash(metadata: &[u8]) -> String {
format!("{:x}", hasher.finalize())
}
pub fn generate_package_hash(name: &str, publisher_namehash: &str) -> String {
pub fn generate_package_hash(name: &str, publisher_namehash: &[u8]) -> String {
use sha3::{Digest, Keccak256};
let mut hasher = Keccak256::new();
hasher.update([name, publisher_namehash].concat());
format!("{:x}", hasher.finalize())
hasher.update([name.as_bytes(), publisher_namehash].concat());
let hash = hasher.finalize();
format!("0x{:x}", hash)
}
pub fn generate_version_hash(zip_bytes: &[u8]) -> String {

View File

@ -42,7 +42,7 @@ fn init(our: Address) {
.body(
serde_json::to_vec(&LocalRequest::Download {
package: package_id.clone(),
install_from: download_from.clone(),
download_from: download_from.clone(),
mirror: true,
auto_update: true,
desired_version_hash: None,

View File

@ -25,7 +25,10 @@
}
],
"grant_capabilities": [
"eth:distro:sys",
"http_client:distro:sys",
"http_server:distro:sys",
"kns_indexer:kns_indexer:sys",
"terminal:terminal:sys",
"vfs:distro:sys"
],