mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-11-22 03:04:35 +03:00
chain:app_store: init
This commit is contained in:
parent
da6ca5e48b
commit
500364e1c5
23
Cargo.lock
generated
23
Cargo.lock
generated
@ -1341,6 +1341,26 @@ dependencies = [
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "chain"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"alloy-sol-types",
|
||||
"anyhow",
|
||||
"bincode",
|
||||
"kinode_process_lib 0.9.0",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"sha3",
|
||||
"url",
|
||||
"urlencoding 2.1.3",
|
||||
"wit-bindgen",
|
||||
"zip 1.1.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "chess"
|
||||
version = "0.2.1"
|
||||
@ -2078,10 +2098,7 @@ dependencies = [
|
||||
name = "downloads"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"alloy-sol-types",
|
||||
"anyhow",
|
||||
"bincode",
|
||||
"kinode_process_lib 0.9.0",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
|
@ -15,7 +15,7 @@ lib = { path = "lib" }
|
||||
members = [
|
||||
"lib", "kinode",
|
||||
"kinode/packages/app_store/app_store", "kinode/packages/app_store/ft_worker",
|
||||
"kinode/packages/app_store/download", "kinode/packages/app_store/install", "kinode/packages/app_store/uninstall", "kinode/packages/app_store/downloads",
|
||||
"kinode/packages/app_store/download", "kinode/packages/app_store/install", "kinode/packages/app_store/uninstall", "kinode/packages/app_store/downloads", "kinode/packages/app_store/chain",
|
||||
"kinode/packages/chess/chess",
|
||||
"kinode/packages/homepage/homepage",
|
||||
"kinode/packages/kino_updates/blog", "kinode/packages/kino_updates/globe",
|
||||
|
@ -52,15 +52,31 @@ interface chain {
|
||||
|
||||
use standard.{package-id};
|
||||
|
||||
variant chains {
|
||||
get-app(package-id),
|
||||
start-auto-update(package-id),
|
||||
stop-auto-update(package-id),
|
||||
}
|
||||
|
||||
record on-chain-app {
|
||||
// include package_id?
|
||||
metadata-uri: string,
|
||||
metadata-hash: string,
|
||||
// this might be a thing internally, but could also just be fetched on demand
|
||||
// would negate them becoming outdated
|
||||
metadata: option<metadata-file>,
|
||||
metadata: option<on-chain-metadata>,
|
||||
}
|
||||
|
||||
record metadata-file {
|
||||
record on-chain-metadata {
|
||||
name: option<string>,
|
||||
description: option<string>,
|
||||
image: option<string>,
|
||||
external-url: option<string>,
|
||||
animation-url: option<string>,
|
||||
properties: on-chain-properties,
|
||||
}
|
||||
|
||||
record on-chain-properties {
|
||||
package-name: string,
|
||||
publisher: string,
|
||||
current-version: string,
|
||||
@ -72,21 +88,9 @@ interface chain {
|
||||
dependencies: option<list<string>>,
|
||||
}
|
||||
|
||||
record get-app {
|
||||
app: package-id,
|
||||
}
|
||||
|
||||
record get-app-response {
|
||||
app: on-chain-app,
|
||||
app: option<on-chain-app>,
|
||||
}
|
||||
|
||||
// add variant
|
||||
// and these:
|
||||
// start-mirroring(package-id),
|
||||
// stop-mirroring(package-id),
|
||||
// start-auto-update(package-id),
|
||||
// stop-auto-update(package-id),
|
||||
|
||||
}
|
||||
|
||||
interface main {
|
||||
|
@ -1 +0,0 @@
|
||||
../../ft_worker/src/ft_worker_lib.rs
|
@ -213,28 +213,14 @@ fn get_package_id(url_params: &HashMap<String, String>) -> anyhow::Result<Packag
|
||||
}
|
||||
|
||||
fn gen_package_info(id: &PackageId, listing: &PackageListing) -> serde_json::Value {
|
||||
// fixxx
|
||||
json!({
|
||||
"tba": listing.tba,
|
||||
"package": id.package().to_string(),
|
||||
"publisher": id.publisher(),
|
||||
"installed": match &listing.state {
|
||||
Some(state) => state.installed,
|
||||
None => false,
|
||||
},
|
||||
"metadata_hash": listing.metadata_hash,
|
||||
"metadata_uri": listing.metadata_uri,
|
||||
"metadata": listing.metadata,
|
||||
"state": match &listing.state {
|
||||
Some(state) => json!({
|
||||
"mirrored_from": state.mirrored_from,
|
||||
"our_version": state.our_version_hash,
|
||||
"caps_approved": state.caps_approved,
|
||||
"mirroring": state.mirroring,
|
||||
"auto_update": state.auto_update,
|
||||
"verified": state.verified,
|
||||
}),
|
||||
None => json!(null),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@ -474,7 +460,7 @@ fn serve_paths(
|
||||
package_id,
|
||||
download_from,
|
||||
false, // Don't mirror during update
|
||||
pkg_listing.state.as_ref().map_or(false, |s| s.auto_update),
|
||||
false, // fixxx
|
||||
desired_version_hash,
|
||||
) {
|
||||
DownloadResponse::Started => {
|
||||
|
@ -86,7 +86,7 @@ fn init(our: Address) {
|
||||
|
||||
// create new provider with request-timeout of 60s
|
||||
// can change, log requests can take quite a long time.
|
||||
let eth_provider = eth::Provider::new(CHAIN_ID, CHAIN_TIMEOUT);
|
||||
let eth_provider: eth::Provider = eth::Provider::new(CHAIN_ID, CHAIN_TIMEOUT);
|
||||
|
||||
let mut state = fetch_state(our, eth_provider);
|
||||
fetch_and_subscribe_logs(&mut state);
|
||||
@ -471,14 +471,14 @@ fn handle_receive_download_package(
|
||||
},
|
||||
};
|
||||
|
||||
let old_manifest_hash = match state.packages.get(package_id) {
|
||||
Some(listing) => listing
|
||||
.state
|
||||
.as_ref()
|
||||
.and_then(|state| state.manifest_hash.clone())
|
||||
.unwrap_or("OLD".to_string()),
|
||||
_ => "OLD".to_string(),
|
||||
};
|
||||
// let old_manifest_hash = match state.packages.get(package_id) {
|
||||
// Some(listing) => listing
|
||||
// .metadata
|
||||
// .as_ref()
|
||||
// .and_then(|metadata| metadata.properties.manifest_hash.clone())
|
||||
// .unwrap_or("OLD".to_string()),
|
||||
// _ => "OLD".to_string(),
|
||||
// };
|
||||
|
||||
state.add_downloaded_package(
|
||||
package_id,
|
||||
@ -495,20 +495,20 @@ fn handle_receive_download_package(
|
||||
Some(blob.bytes),
|
||||
)?;
|
||||
|
||||
let new_manifest_hash = match state.packages.get(package_id) {
|
||||
Some(listing) => listing
|
||||
.state
|
||||
.as_ref()
|
||||
.and_then(|state| state.manifest_hash.clone())
|
||||
.unwrap_or("NEW".to_string()),
|
||||
_ => "NEW".to_string(),
|
||||
};
|
||||
// let new_manifest_hash = match state.packages.get(package_id) {
|
||||
// Some(listing) => listing
|
||||
// .state
|
||||
// .as_ref()
|
||||
// .and_then(|state| state.manifest_hash.clone())
|
||||
// .unwrap_or("NEW".to_string()),
|
||||
// _ => "NEW".to_string(),
|
||||
// };
|
||||
|
||||
// lastly, if auto_update is true, AND the manifest has NOT changed,
|
||||
// trigger install!
|
||||
if requested_package.auto_update && old_manifest_hash == new_manifest_hash {
|
||||
handle_install(state, package_id)?;
|
||||
}
|
||||
// // lastly, if auto_update is true, AND the manifest has NOT changed,
|
||||
// // trigger install!
|
||||
// if requested_package.auto_update && old_manifest_hash == new_manifest_hash {
|
||||
// handle_install(state, package_id)?;
|
||||
// }
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -71,8 +71,6 @@ pub struct PackageListing {
|
||||
pub metadata_uri: String,
|
||||
pub metadata_hash: String,
|
||||
pub metadata: Option<kt::Erc721Metadata>,
|
||||
/// if we have downloaded the package, this is populated
|
||||
pub state: Option<PackageState>,
|
||||
}
|
||||
|
||||
/// state of an individual package we have downloaded
|
||||
@ -178,7 +176,6 @@ impl State {
|
||||
metadata_uri: "".to_string(),
|
||||
metadata_hash: utils::sha_256_hash(&serde_json::to_vec(&metadata).unwrap()),
|
||||
metadata: Some(metadata),
|
||||
state: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
@ -214,7 +211,6 @@ impl State {
|
||||
self.downloaded_apis.insert(package_id.to_owned());
|
||||
}
|
||||
}
|
||||
listing.state = Some(package_state);
|
||||
// kinode_process_lib::set_state(&serde_json::to_vec(self)?);
|
||||
Ok(())
|
||||
}
|
||||
@ -228,14 +224,7 @@ impl State {
|
||||
let res = self
|
||||
.packages
|
||||
.get_mut(package_id)
|
||||
.map(|listing| {
|
||||
if let Some(package_state) = &mut listing.state {
|
||||
fn_(package_state);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
})
|
||||
.map(|listing| true)
|
||||
.unwrap_or(false);
|
||||
// kinode_process_lib::set_state(&serde_json::to_vec(self).unwrap());
|
||||
res
|
||||
@ -324,7 +313,6 @@ impl State {
|
||||
metadata_uri: "".to_string(),
|
||||
metadata_hash: "".to_string(),
|
||||
metadata: None,
|
||||
state: None,
|
||||
},
|
||||
);
|
||||
self.add_downloaded_package(
|
||||
@ -357,7 +345,6 @@ impl State {
|
||||
let Some(listing) = self.packages.get_mut(package_id) else {
|
||||
return Err(anyhow::anyhow!("package not found"));
|
||||
};
|
||||
listing.state = None;
|
||||
// kinode_process_lib::set_state(&serde_json::to_vec(self)?);
|
||||
println!("uninstalled {package_id}");
|
||||
Ok(())
|
||||
@ -443,7 +430,6 @@ impl State {
|
||||
metadata_uri,
|
||||
metadata_hash,
|
||||
metadata,
|
||||
state: None,
|
||||
});
|
||||
}
|
||||
};
|
||||
@ -486,9 +472,7 @@ impl State {
|
||||
if let Ok(metadata) =
|
||||
utils::fetch_metadata_from_url(&listing.metadata_uri, &metadata_hash, 30)
|
||||
{
|
||||
if let Some(package_state) = &listing.state {
|
||||
auto_update(&self.our, package_id, &metadata, package_state);
|
||||
}
|
||||
// auto_update(&self.our, package_id, &metadata, package_state);
|
||||
listing.metadata = Some(metadata);
|
||||
}
|
||||
|
||||
|
@ -7,7 +7,8 @@ use {
|
||||
alloy_primitives::keccak256,
|
||||
alloy_sol_types::SolEvent,
|
||||
kinode_process_lib::{
|
||||
eth, get_blob, get_state, http, kernel_types as kt, kimap, print_to_terminal, println, vfs,
|
||||
eth, get_blob, get_state, http, kernel_types as kt, kimap, print_to_terminal, println,
|
||||
vfs::{self, File},
|
||||
Address, LazyLoadBlob, PackageId, ProcessId, Request,
|
||||
},
|
||||
std::collections::HashSet,
|
||||
@ -556,3 +557,8 @@ where
|
||||
.expect("failed to serialize VfsRequest"),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn print_and_log(logfile: &mut File, message: &str, verbosity: u8) {
|
||||
print_to_terminal(verbosity, message);
|
||||
let _ = logfile.write_all(message.as_bytes());
|
||||
}
|
||||
|
29
kinode/packages/app_store/chain/Cargo.toml
Normal file
29
kinode/packages/app_store/chain/Cargo.toml
Normal file
@ -0,0 +1,29 @@
|
||||
[package]
|
||||
name = "chain"
|
||||
version = "0.5.0"
|
||||
edition = "2021"
|
||||
|
||||
[features]
|
||||
simulation-mode = []
|
||||
|
||||
[dependencies]
|
||||
alloy-primitives = "0.7.6"
|
||||
alloy-sol-types = "0.7.6"
|
||||
anyhow = "1.0"
|
||||
bincode = "1.3.3"
|
||||
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", branch = "develop" }
|
||||
rand = "0.8"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
sha2 = "0.10.8"
|
||||
sha3 = "0.10.8"
|
||||
url = "2.4.1"
|
||||
urlencoding = "2.1.0"
|
||||
wit-bindgen = "0.24.0"
|
||||
zip = { version = "1.1.1", default-features = false }
|
||||
|
||||
[lib]
|
||||
crate-type = ["cdylib"]
|
||||
|
||||
[package.metadata.component]
|
||||
package = "kinode:process"
|
382
kinode/packages/app_store/chain/src/lib.rs
Normal file
382
kinode/packages/app_store/chain/src/lib.rs
Normal file
@ -0,0 +1,382 @@
|
||||
#![feature(let_chains)]
|
||||
//! chain:app_store:sys
|
||||
//! manages indexing relevant packages and their versions from the kimap.
|
||||
//! keeps eth subscriptions open, keeps data updated.
|
||||
//!
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
|
||||
use crate::kinode::process::chain::{
|
||||
Chains, GetAppResponse, OnChainApp, OnChainMetadata, OnChainProperties,
|
||||
};
|
||||
use alloy_primitives::keccak256;
|
||||
use alloy_sol_types::SolEvent;
|
||||
use kinode_process_lib::{
|
||||
await_message, call_init, eth, get_blob, get_state, http, kernel_types as kt, kimap,
|
||||
print_to_terminal, println, Address, Message, PackageId, Response,
|
||||
};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
wit_bindgen::generate!({
|
||||
path: "target/wit",
|
||||
generate_unused_types: true,
|
||||
world: "app-store-sys-v0",
|
||||
additional_derives: [serde::Deserialize, serde::Serialize],
|
||||
});
|
||||
|
||||
#[cfg(not(feature = "simulation-mode"))]
|
||||
const CHAIN_ID: u64 = kimap::KIMAP_CHAIN_ID;
|
||||
#[cfg(feature = "simulation-mode")]
|
||||
const CHAIN_ID: u64 = 31337; // local
|
||||
|
||||
const CHAIN_TIMEOUT: u64 = 60; // 60s
|
||||
|
||||
#[cfg(not(feature = "simulation-mode"))]
|
||||
const KIMAP_ADDRESS: &str = kimap::KIMAP_ADDRESS;
|
||||
#[cfg(feature = "simulation-mode")]
|
||||
const KIMAP_ADDRESS: &str = "0xEce71a05B36CA55B895427cD9a440eEF7Cf3669D";
|
||||
|
||||
#[cfg(not(feature = "simulation-mode"))]
|
||||
const KIMAP_FIRST_BLOCK: u64 = kimap::KIMAP_FIRST_BLOCK;
|
||||
#[cfg(feature = "simulation-mode")]
|
||||
const KIMAP_FIRST_BLOCK: u64 = 1;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct State {
|
||||
/// the kimap helper we are using
|
||||
pub kimap: kimap::Kimap,
|
||||
/// the last block at which we saved the state of the listings to disk.
|
||||
/// when we boot, we can read logs starting from this block and
|
||||
/// rebuild latest state.
|
||||
pub last_saved_block: u64,
|
||||
/// onchain listings
|
||||
pub listings: HashMap<PackageId, PackageListing>,
|
||||
// auto-update, maybe... -> means, when you get a new thing on this packge,
|
||||
// tell download to go fetch it, and tell main to go install it
|
||||
}
|
||||
|
||||
/// listing information derived from metadata hash in listing event
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub struct PackageListing {
|
||||
pub tba: eth::Address,
|
||||
pub metadata_uri: String,
|
||||
pub metadata_hash: String,
|
||||
// should this even be optional?
|
||||
// relegate to only valid apps maybe?
|
||||
pub metadata: Option<kt::Erc721Metadata>,
|
||||
pub auto_update: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(untagged)] // untagged as a meta-type for all incoming requests
|
||||
pub enum Req {
|
||||
Eth(eth::EthSubResult),
|
||||
Request(Chains),
|
||||
}
|
||||
|
||||
call_init!(init);
|
||||
fn init(our: Address) {
|
||||
println!("started, indexing on contract address {}", KIMAP_ADDRESS);
|
||||
// create new provider with request-timeout of 60s
|
||||
// can change, log requests can take quite a long time.
|
||||
let eth_provider: eth::Provider = eth::Provider::new(CHAIN_ID, CHAIN_TIMEOUT);
|
||||
|
||||
let mut state = fetch_state(eth_provider);
|
||||
fetch_and_subscribe_logs(&mut state);
|
||||
|
||||
loop {
|
||||
match await_message() {
|
||||
Err(send_error) => {
|
||||
// TODO handle these based on what they are triggered by
|
||||
println!("got network error: {send_error}");
|
||||
}
|
||||
Ok(message) => {
|
||||
if let Err(e) = handle_message(&our, &mut state, &message) {
|
||||
println!("error handling message: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow::Result<()> {
|
||||
if message.is_request() {
|
||||
let req: Req = serde_json::from_slice(message.body())?;
|
||||
match req {
|
||||
Req::Eth(eth_result) => {
|
||||
if !message.is_local(our) || message.source().process != "eth:distro:sys" {
|
||||
return Err(anyhow::anyhow!(
|
||||
"eth sub event from unexpected address: {}",
|
||||
message.source()
|
||||
));
|
||||
}
|
||||
|
||||
if let Ok(eth::EthSub { result, .. }) = eth_result {
|
||||
if let eth::SubscriptionResult::Log(log) = result {
|
||||
handle_eth_log(state, *log)?;
|
||||
}
|
||||
} else {
|
||||
// attempt to resubscribe
|
||||
state
|
||||
.kimap
|
||||
.provider
|
||||
.subscribe_loop(1, app_store_filter(state));
|
||||
}
|
||||
}
|
||||
Req::Request(chains) => {
|
||||
handle_local_request(our, state, chains)?;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(anyhow::anyhow!("not a request"));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_local_request(our: &Address, state: &mut State, chains: Chains) -> anyhow::Result<()> {
|
||||
match chains {
|
||||
Chains::GetApp(package_id) => {
|
||||
let onchain_app =
|
||||
state
|
||||
.listings
|
||||
.get(&package_id.to_process_lib())
|
||||
.map(|app| OnChainApp {
|
||||
metadata_uri: app.metadata_uri.clone(),
|
||||
metadata_hash: app.metadata_hash.clone(),
|
||||
metadata: app.metadata.as_ref().map(|m| m.clone().into()),
|
||||
});
|
||||
let response = GetAppResponse { app: onchain_app };
|
||||
Response::new()
|
||||
.body(serde_json::to_vec(&response)?)
|
||||
.send()?;
|
||||
}
|
||||
Chains::StartAutoUpdate(package_id) => {
|
||||
state
|
||||
.listings
|
||||
.get_mut(&package_id.to_process_lib())
|
||||
.ok_or(anyhow::anyhow!("package not found"))?
|
||||
.auto_update = true;
|
||||
// send responses in these too.
|
||||
}
|
||||
Chains::StopAutoUpdate(package_id) => {
|
||||
state
|
||||
.listings
|
||||
.get_mut(&package_id.to_process_lib())
|
||||
.ok_or(anyhow::anyhow!("package not found"))?
|
||||
.auto_update = false;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_eth_log(state: &mut State, log: eth::Log) -> anyhow::Result<()> {
|
||||
let block_number: u64 = log.block_number.ok_or(anyhow::anyhow!("blocknumbaerror"))?;
|
||||
let note: kimap::Note =
|
||||
kimap::decode_note_log(&log).map_err(|e| anyhow::anyhow!("decodelogerror: {e:?}"))?;
|
||||
|
||||
let package_id = note
|
||||
.parent_path
|
||||
.split_once('.')
|
||||
.ok_or(anyhow::anyhow!("invalid publisher name"))
|
||||
.and_then(|(package, publisher)| {
|
||||
if package.is_empty() || publisher.is_empty() {
|
||||
Err(anyhow::anyhow!("invalid publisher name"))
|
||||
} else {
|
||||
Ok(PackageId::new(&package, &publisher))
|
||||
}
|
||||
})?;
|
||||
|
||||
// the app store exclusively looks for ~metadata-uri postings: if one is
|
||||
// observed, we then *query* for ~metadata-hash to verify the content
|
||||
// at the URI.
|
||||
//
|
||||
|
||||
let metadata_uri = String::from_utf8_lossy(¬e.data).to_string();
|
||||
|
||||
let (tba, metadata_hash) = {
|
||||
// generate ~metadata-hash full-path
|
||||
let hash_note = format!("~metadata-hash.{}", note.parent_path);
|
||||
|
||||
// owner can change which we don't track (yet?) so don't save, need to get when desired
|
||||
let (tba, _owner, data) = state.kimap.get(&hash_note).map_err(|e| {
|
||||
println!("Couldn't find {hash_note}: {e:?}");
|
||||
anyhow::anyhow!("metadata hash mismatch")
|
||||
})?;
|
||||
|
||||
let Some(hash_note) = data else {
|
||||
return Err(anyhow::anyhow!("metadata not found"));
|
||||
};
|
||||
|
||||
(tba, String::from_utf8_lossy(&hash_note).to_string())
|
||||
};
|
||||
|
||||
// fetch metadata from the URI (currently only handling HTTP(S) URLs!)
|
||||
// assert that the metadata hash matches the fetched data
|
||||
let metadata = fetch_metadata_from_url(&metadata_uri, &metadata_hash, 30)?;
|
||||
|
||||
match state.listings.entry(package_id) {
|
||||
std::collections::hash_map::Entry::Occupied(mut listing) => {
|
||||
let listing = listing.get_mut();
|
||||
listing.metadata_uri = metadata_uri;
|
||||
listing.tba = tba;
|
||||
listing.metadata_hash = metadata_hash;
|
||||
listing.metadata = Some(metadata);
|
||||
}
|
||||
std::collections::hash_map::Entry::Vacant(listing) => {
|
||||
listing.insert(PackageListing {
|
||||
tba,
|
||||
metadata_uri,
|
||||
metadata_hash,
|
||||
metadata: Some(metadata),
|
||||
auto_update: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
state.last_saved_block = block_number;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// create the filter used for app store getLogs and subscription.
|
||||
/// the app store exclusively looks for ~metadata-uri postings: if one is
|
||||
/// observed, we then *query* for ~metadata-hash to verify the content
|
||||
/// at the URI.
|
||||
///
|
||||
/// this means that ~metadata-hash should be *posted before or at the same time* as ~metadata-uri!
|
||||
pub fn app_store_filter(state: &State) -> eth::Filter {
|
||||
let notes = vec![keccak256("~metadata-uri")];
|
||||
|
||||
eth::Filter::new()
|
||||
.address(*state.kimap.address())
|
||||
.events([kimap::contract::Note::SIGNATURE])
|
||||
.topic3(notes)
|
||||
}
|
||||
|
||||
/// create a filter to fetch app store event logs from chain and subscribe to new events
|
||||
pub fn fetch_and_subscribe_logs(state: &mut State) {
|
||||
let filter = app_store_filter(state);
|
||||
// get past logs, subscribe to new ones.
|
||||
// subscribe first so we don't miss any logs
|
||||
state.kimap.provider.subscribe_loop(1, filter.clone());
|
||||
for log in fetch_logs(
|
||||
&state.kimap.provider,
|
||||
&filter.from_block(state.last_saved_block),
|
||||
) {
|
||||
if let Err(e) = handle_eth_log(state, log) {
|
||||
print_to_terminal(1, &format!("error ingesting log: {e}"));
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/// fetch logs from the chain with a given filter
|
||||
fn fetch_logs(eth_provider: ð::Provider, filter: ð::Filter) -> Vec<eth::Log> {
|
||||
loop {
|
||||
match eth_provider.get_logs(filter) {
|
||||
Ok(res) => return res,
|
||||
Err(_) => {
|
||||
println!("failed to fetch logs! trying again in 5s...");
|
||||
std::thread::sleep(std::time::Duration::from_secs(5));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// fetch metadata from url and verify it matches metadata_hash
|
||||
pub fn fetch_metadata_from_url(
|
||||
metadata_url: &str,
|
||||
metadata_hash: &str,
|
||||
timeout: u64,
|
||||
) -> Result<kt::Erc721Metadata, anyhow::Error> {
|
||||
if let Ok(url) = url::Url::parse(metadata_url) {
|
||||
if let Ok(_) =
|
||||
http::client::send_request_await_response(http::Method::GET, url, None, timeout, vec![])
|
||||
{
|
||||
if let Some(body) = get_blob() {
|
||||
let hash = keccak_256_hash(&body.bytes);
|
||||
if &hash == metadata_hash {
|
||||
return Ok(serde_json::from_slice::<kt::Erc721Metadata>(&body.bytes)
|
||||
.map_err(|_| anyhow::anyhow!("metadata not found"))?);
|
||||
} else {
|
||||
return Err(anyhow::anyhow!("metadata hash mismatch"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(anyhow::anyhow!("metadata not found"))
|
||||
}
|
||||
|
||||
/// generate a Keccak-256 hash string (with 0x prefix) of the metadata bytes
|
||||
pub fn keccak_256_hash(bytes: &[u8]) -> String {
|
||||
use sha3::{Digest, Keccak256};
|
||||
let mut hasher = Keccak256::new();
|
||||
hasher.update(bytes);
|
||||
format!("0x{:x}", hasher.finalize())
|
||||
}
|
||||
|
||||
/// fetch state from disk or create a new one if that fails
|
||||
pub fn fetch_state(provider: eth::Provider) -> State {
|
||||
if let Some(state_bytes) = get_state() {
|
||||
match serde_json::from_slice::<State>(&state_bytes) {
|
||||
Ok(state) => {
|
||||
if state.kimap.address().to_string() == KIMAP_ADDRESS {
|
||||
return state;
|
||||
} else {
|
||||
println!(
|
||||
"state contract address mismatch. rebuilding state! expected {}, got {}",
|
||||
KIMAP_ADDRESS,
|
||||
state.kimap.address().to_string()
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => println!("failed to deserialize saved state, rebuilding: {e}"),
|
||||
}
|
||||
}
|
||||
State {
|
||||
kimap: kimap::Kimap::new(provider, eth::Address::from_str(KIMAP_ADDRESS).unwrap()),
|
||||
last_saved_block: 0,
|
||||
listings: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
// quite annoyingly, we must convert from our gen'd version of PackageId
|
||||
// to the process_lib's gen'd version. this is in order to access custom
|
||||
// Impls that we want to use
|
||||
impl crate::kinode::process::main::PackageId {
|
||||
pub fn to_process_lib(self) -> PackageId {
|
||||
PackageId {
|
||||
package_name: self.package_name,
|
||||
publisher_node: self.publisher_node,
|
||||
}
|
||||
}
|
||||
pub fn from_process_lib(package_id: PackageId) -> Self {
|
||||
Self {
|
||||
package_name: package_id.package_name,
|
||||
publisher_node: package_id.publisher_node,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<kt::Erc721Metadata> for OnChainMetadata {
|
||||
fn from(erc: kt::Erc721Metadata) -> Self {
|
||||
OnChainMetadata {
|
||||
name: erc.name,
|
||||
description: erc.description,
|
||||
image: erc.image,
|
||||
external_url: erc.external_url,
|
||||
animation_url: erc.animation_url,
|
||||
properties: OnChainProperties {
|
||||
package_name: erc.properties.package_name,
|
||||
publisher: erc.properties.publisher,
|
||||
current_version: erc.properties.current_version,
|
||||
mirrors: erc.properties.mirrors,
|
||||
code_hashes: erc.properties.code_hashes.into_iter().collect(),
|
||||
license: erc.properties.license,
|
||||
screenshots: erc.properties.screenshots,
|
||||
wit_version: erc.properties.wit_version,
|
||||
dependencies: erc.properties.dependencies,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
@ -7,10 +7,7 @@ edition = "2021"
|
||||
simulation-mode = []
|
||||
|
||||
[dependencies]
|
||||
alloy-primitives = "0.7.6"
|
||||
alloy-sol-types = "0.7.6"
|
||||
anyhow = "1.0"
|
||||
bincode = "1.3.3"
|
||||
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", branch = "develop" }
|
||||
rand = "0.8"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
@ -9,9 +9,13 @@ use crate::kinode::process::downloads::{
|
||||
};
|
||||
|
||||
use ft_worker_lib::{spawn_receive_transfer, spawn_send_transfer};
|
||||
use kinode::process::standard::print_to_terminal;
|
||||
use kinode_process_lib::{
|
||||
await_message, call_init, eth, get_blob, http, kimap, println, vfs, Address, Message,
|
||||
PackageId, ProcessId, Request, Response,
|
||||
await_message, call_init, eth, get_blob, http,
|
||||
http::client::{HttpClientError, HttpClientResponse},
|
||||
kimap, println,
|
||||
vfs::{self, File, SeekFrom},
|
||||
Address, Message, PackageId, ProcessId, Request, Response,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@ -31,10 +35,9 @@ pub const APP_SHARE_TIMEOUT: u64 = 120; // 120s
|
||||
#[serde(untagged)] // untagged as a meta-type for all incoming responses
|
||||
pub enum Resp {
|
||||
Download(DownloadResponse),
|
||||
HttpClient(Result<http::client::HttpClientResponse, http::client::HttpClientError>),
|
||||
HttpClient(Result<HttpClientResponse, HttpClientError>),
|
||||
}
|
||||
|
||||
// downloads:app_store:sys State. read in from vfs drive: /app_store:sys/downloads
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct State {
|
||||
mirroring: HashSet<(PackageId, String)>, // (package_id, version_hash)
|
||||
@ -49,13 +52,19 @@ fn init(our: Address) {
|
||||
// state should be a simple struct, which can be serialized (parts of it)
|
||||
// it contains which packages we are mirroring
|
||||
|
||||
// // logfile
|
||||
// logfile
|
||||
|
||||
// load from state, okay to decouple from vfs. it's "app-state" for downloads.
|
||||
let mut state = State {
|
||||
mirroring: HashSet::new(),
|
||||
};
|
||||
|
||||
let mut logfile =
|
||||
vfs::open_file("/app_store:sys/.log", true, Some(5)).expect("could not open logfile");
|
||||
logfile
|
||||
.seek(SeekFrom::End(0))
|
||||
.expect("could not seek to end of logfile");
|
||||
|
||||
loop {
|
||||
match await_message() {
|
||||
Err(send_error) => {
|
||||
@ -63,7 +72,7 @@ fn init(our: Address) {
|
||||
println!("got network error: {send_error}");
|
||||
}
|
||||
Ok(message) => {
|
||||
if let Err(e) = handle_message(&our, &mut state, &message) {
|
||||
if let Err(e) = handle_message(&our, &mut state, &message, &mut logfile) {
|
||||
println!("error handling message: {:?}", e);
|
||||
}
|
||||
}
|
||||
@ -75,12 +84,36 @@ fn init(our: Address) {
|
||||
/// function defined for each kind of message. check whether the source
|
||||
/// of the message is allowed to send that kind of message to us.
|
||||
/// finally, fire a response if expected from a request.
|
||||
fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow::Result<()> {
|
||||
fn handle_message(
|
||||
our: &Address,
|
||||
state: &mut State,
|
||||
message: &Message,
|
||||
logfile: &mut File,
|
||||
) -> anyhow::Result<()> {
|
||||
if message.is_request() {
|
||||
match serde_json::from_slice::<Downloads>(message.body())? {
|
||||
Downloads::Download(download_request) => {
|
||||
let is_local = message.is_local(our);
|
||||
handle_download_request(our, state, download_request, is_local)?;
|
||||
match handle_download_request(our, state, download_request, is_local) {
|
||||
Ok(()) => {
|
||||
Response::new()
|
||||
.body(serde_json::to_vec(&Resp::Download(DownloadResponse {
|
||||
success: true,
|
||||
error: None,
|
||||
}))?)
|
||||
.send()?;
|
||||
}
|
||||
Err(e) => {
|
||||
// make function, print and log!
|
||||
print_and_log(logfile, &format!("error handling download request: {e}"), 1);
|
||||
Response::new()
|
||||
.body(serde_json::to_vec(&Resp::Download(DownloadResponse {
|
||||
success: false,
|
||||
error: Some(e.to_string()),
|
||||
}))?)
|
||||
.send()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Downloads::Progress(ProgressUpdate {
|
||||
package_id,
|
||||
@ -88,7 +121,23 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow
|
||||
downloaded,
|
||||
total,
|
||||
}) => {
|
||||
// forward progress to UI
|
||||
// forward progress to main:app_store:sys,
|
||||
// which then pushes to the frontend.
|
||||
let target =
|
||||
Address::new(&our.node, ProcessId::new(Some("main"), "app_store", "sys"));
|
||||
let _ = Request::new()
|
||||
.target(target)
|
||||
.body(
|
||||
serde_json::to_vec(&ProgressUpdate {
|
||||
package_id,
|
||||
version_hash,
|
||||
downloaded,
|
||||
total,
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
.send();
|
||||
|
||||
// http_server.ws_push_all_channels(
|
||||
// "/",
|
||||
// http::server::WsMessageType::Text,
|
||||
@ -114,6 +163,7 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow
|
||||
match serde_json::from_slice::<Resp>(message.body())? {
|
||||
Resp::Download(download_response) => {
|
||||
// TODO handle download response
|
||||
// send_and_awaits? this might not be needed.
|
||||
}
|
||||
Resp::HttpClient(resp) => {
|
||||
let name = match message.context() {
|
||||
@ -219,6 +269,7 @@ fn handle_download_request(
|
||||
}
|
||||
|
||||
fn handle_receive_http_download(state: &mut State, name: &str) -> anyhow::Result<()> {
|
||||
// use context here instead, verify bytes immediately.
|
||||
println!("Received HTTP download for: {}", name);
|
||||
|
||||
// Parse the name to extract package_id and version_hash
|
||||
@ -297,3 +348,8 @@ fn handle_receive_http_download(state: &mut State, name: &str) -> anyhow::Result
|
||||
// Some(metadata) => metadata.properties.wit_version,
|
||||
// None => Some(0),
|
||||
// };
|
||||
|
||||
pub fn print_and_log(logfile: &mut File, message: &str, verbosity: u8) {
|
||||
print_to_terminal(verbosity, message);
|
||||
let _ = logfile.write_all(message.as_bytes());
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user