From 9fddb2cedd9796ebb87ca34770fed5cae4c0e952 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Tue, 10 Dec 2024 21:54:52 +0200 Subject: [PATCH] app_store: sqlite storage backend --- kinode/packages/app_store/chain/src/lib.rs | 616 ++++++++++++++------ kinode/packages/app_store/pkg/manifest.json | 2 + 2 files changed, 433 insertions(+), 185 deletions(-) diff --git a/kinode/packages/app_store/chain/src/lib.rs b/kinode/packages/app_store/chain/src/lib.rs index 66db5995..44c29e9b 100644 --- a/kinode/packages/app_store/chain/src/lib.rs +++ b/kinode/packages/app_store/chain/src/lib.rs @@ -33,14 +33,14 @@ use alloy_primitives::keccak256; use alloy_sol_types::SolEvent; use kinode::process::chain::ChainResponses; use kinode_process_lib::{ - await_message, call_init, eth, get_blob, get_state, http, kernel_types as kt, kimap, + await_message, call_init, eth, http, kimap, get_blob, print_to_terminal, println, timer, Address, Message, PackageId, Request, Response, + sqlite::{self, Sqlite}, + kernel_types as kt, }; use serde::{Deserialize, Serialize}; -use std::{ - collections::{HashMap, HashSet}, - str::FromStr, -}; +use std::str::FromStr; +use std::collections::HashMap; wit_bindgen::generate!({ path: "target/wit", @@ -63,7 +63,6 @@ const KIMAP_ADDRESS: &str = "0x9CE8cCD2932DC727c70f9ae4f8C2b68E6Abed58C"; const DELAY_MS: u64 = 1_000; // 1s -#[derive(Debug, Serialize, Deserialize)] pub struct State { /// the kimap helper we are using pub kimap: kimap::Kimap, @@ -71,10 +70,8 @@ pub struct State { /// when we boot, we can read logs starting from this block and /// rebuild latest state. pub last_saved_block: u64, - /// onchain listings - pub listings: HashMap, - /// set of packages that we have published - pub published: HashSet, + /// tables: listings: , published: vec + pub db: DB, } /// listing information derived from metadata hash in listing event @@ -83,10 +80,9 @@ pub struct PackageListing { pub tba: eth::Address, pub metadata_uri: String, pub metadata_hash: String, - // should this even be optional? - // relegate to only valid apps maybe? pub metadata: Option, pub auto_update: bool, + pub block: u64 } #[derive(Debug, Serialize, Deserialize, process_macros::SerdeJsonInto)] @@ -96,18 +92,270 @@ pub enum Req { Request(ChainRequests), } +pub struct DB { + inner: Sqlite, +} + +impl DB { + pub fn connect(our: &Address) -> anyhow::Result { + let inner = sqlite::open(our.package_id(), "app_store_chain.sqlite", Some(10))?; + // create tables + inner.write(CREATE_META_TABLE.into(), vec![], None)?; + inner.write(CREATE_LISTINGS_TABLE.into(), vec![], None)?; + inner.write(CREATE_PUBLISHED_TABLE.into(), vec![], None)?; + + Ok(Self { inner }) + } + + pub fn get_last_saved_block(&self) -> anyhow::Result { + let query = "SELECT value FROM meta WHERE key = 'last_saved_block'"; + let rows = self.inner.read(query.into(), vec![])?; + if let Some(row) = rows.get(0) { + if let Some(val_str) = row.get("value").and_then(|v| v.as_str()) { + if let Ok(block) = val_str.parse::() { + return Ok(block); + } + } + } + Ok(0) + } + + pub fn set_last_saved_block(&self, block: u64) -> anyhow::Result<()> { + let query = "INSERT INTO meta (key, value) VALUES ('last_saved_block', ?) + ON CONFLICT(key) DO UPDATE SET value=excluded.value"; + let params = vec![block.to_string().into()]; + self.inner.write(query.into(), params, None)?; + Ok(()) + } + + pub fn insert_or_update_listing(&self, package_id: &PackageId, listing: &PackageListing) -> anyhow::Result<()> { + let metadata_json = if let Some(m) = &listing.metadata { + serde_json::to_string(m)? + } else { + "".to_string() + }; + + let query = "INSERT INTO listings (package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(package_name, publisher_node) + DO UPDATE SET + tba=excluded.tba, + metadata_uri=excluded.metadata_uri, + metadata_hash=excluded.metadata_hash, + metadata_json=excluded.metadata_json, + auto_update=excluded.auto_update, + block=excluded.block"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + listing.tba.to_string().into(), + listing.metadata_uri.clone().into(), + listing.metadata_hash.clone().into(), + metadata_json.into(), + (if listing.auto_update {1} else {0}).into(), + listing.block.into() + ]; + + self.inner.write(query.into(), params, None)?; + Ok(()) + } + + + pub fn delete_listing(&self, package_id: &PackageId) -> anyhow::Result<()> { + let query = "DELETE FROM listings WHERE package_name = ? AND publisher_node = ?"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + ]; + self.inner.write(query.into(), params, None)?; + Ok(()) + } + + pub fn get_listing(&self, package_id: &PackageId) -> anyhow::Result> { + let query = "SELECT tba, metadata_uri, metadata_hash, metadata_json, auto_update, block FROM listings WHERE package_name = ? AND publisher_node = ?"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + ]; + let rows = self.inner.read(query.into(), params)?; + if let Some(row) = rows.get(0) { + Ok(Some(self.row_to_listing(row)?)) + } else { + Ok(None) + } + } + + pub fn get_all_listings(&self) -> anyhow::Result> { + let query = "SELECT package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block FROM listings"; + let rows = self.inner.read(query.into(), vec![])?; + let mut listings = Vec::new(); + for row in rows { + let pid = PackageId { + package_name: row["package_name"].as_str().unwrap_or("").to_string(), + publisher_node: row["publisher_node"].as_str().unwrap_or("").to_string(), + }; + let listing = self.row_to_listing(&row)?; + listings.push((pid, listing)); + } + Ok(listings) + } + + pub fn get_listings_batch(&self, limit: u64, offset: u64) -> anyhow::Result> { + let query = format!( + "SELECT package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block + FROM listings + ORDER BY package_name, publisher_node + LIMIT {} OFFSET {}", + limit, offset + ); + + let rows = self.inner.read(query, vec![])?; + let mut listings = Vec::new(); + for row in rows { + let pid = PackageId { + package_name: row["package_name"].as_str().unwrap_or("").to_string(), + publisher_node: row["publisher_node"].as_str().unwrap_or("").to_string(), + }; + let listing = self.row_to_listing(&row)?; + listings.push((pid, listing)); + } + Ok(listings) + } + + pub fn get_listings_since_block(&self, block_number: u64) -> anyhow::Result> { + let query = "SELECT package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block + FROM listings + WHERE block > ?"; + let params = vec![block_number.into()]; + let rows = self.inner.read(query.into(), params)?; + let mut listings = Vec::new(); + for row in rows { + let pid = PackageId { + package_name: row["package_name"].as_str().unwrap_or("").to_string(), + publisher_node: row["publisher_node"].as_str().unwrap_or("").to_string(), + }; + let listing = self.row_to_listing(&row)?; + listings.push((pid, listing)); + } + Ok(listings) + } + + + pub fn row_to_listing(&self, row: &HashMap) -> anyhow::Result { + let tba_str = row["tba"].as_str().ok_or_else(|| anyhow::anyhow!("Invalid tba"))?; + let tba = tba_str.parse::()?; + let metadata_uri = row["metadata_uri"].as_str().unwrap_or("").to_string(); + let metadata_hash = row["metadata_hash"].as_str().unwrap_or("").to_string(); + let metadata_json = row["metadata_json"].as_str().unwrap_or(""); + let metadata: Option = + if metadata_json.is_empty() { + None + } else { + serde_json::from_str(metadata_json)? + }; + let auto_update = row["auto_update"].as_i64().unwrap_or(0) == 1; + let block = row["block"].as_i64().unwrap_or(0) as u64; + + Ok(PackageListing { + tba, + metadata_uri, + metadata_hash, + metadata, + auto_update, + block, + }) + } + + pub fn get_published(&self, package_id: &PackageId) -> anyhow::Result { + let query = "SELECT 1 FROM published WHERE package_name = ? AND publisher_node = ?"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + ]; + let rows = self.inner.read(query.into(), params)?; + Ok(!rows.is_empty()) + } + + pub fn insert_published(&self, package_id: &PackageId) -> anyhow::Result<()> { + let query = "INSERT INTO published (package_name, publisher_node) VALUES (?, ?) ON CONFLICT DO NOTHING"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + ]; + self.inner.write(query.into(), params, None)?; + Ok(()) + } + + pub fn delete_published(&self, package_id: &PackageId) -> anyhow::Result<()> { + let query = "DELETE FROM published WHERE package_name = ? AND publisher_node = ?"; + let params = vec![ + package_id.package_name.clone().into(), + package_id.publisher_node.clone().into(), + ]; + self.inner.write(query.into(), params, None)?; + Ok(()) + } + + pub fn get_all_published(&self) -> anyhow::Result> { + let query = "SELECT package_name, publisher_node FROM published"; + let rows = self.inner.read(query.into(), vec![])?; + let mut result = Vec::new(); + for row in rows { + let pid = PackageId { + package_name: row["package_name"].as_str().unwrap_or("").to_string(), + publisher_node: row["publisher_node"].as_str().unwrap_or("").to_string(), + }; + result.push(pid); + } + Ok(result) + } +} + +const CREATE_META_TABLE: &str = " +CREATE TABLE IF NOT EXISTS meta ( + key TEXT PRIMARY KEY, + value TEXT +);"; + +const CREATE_LISTINGS_TABLE: &str = " +CREATE TABLE IF NOT EXISTS listings ( + package_name TEXT NOT NULL, + publisher_node TEXT NOT NULL, + tba TEXT NOT NULL, + metadata_uri TEXT, + metadata_hash TEXT, + metadata_json TEXT, + auto_update INTEGER NOT NULL DEFAULT 0, + block INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (package_name, publisher_node) +);"; + +const CREATE_PUBLISHED_TABLE: &str = " +CREATE TABLE IF NOT EXISTS published ( + package_name TEXT NOT NULL, + publisher_node TEXT NOT NULL, + PRIMARY KEY (package_name, publisher_node) +);"; + call_init!(init); fn init(our: Address) { - println!( - "chain started, indexing on contract address {}", - KIMAP_ADDRESS - ); - // create new provider with request-timeout of 60s - // can change, log requests can take quite a long time. let eth_provider: eth::Provider = eth::Provider::new(CHAIN_ID, CHAIN_TIMEOUT); - let mut state = fetch_state(eth_provider); - fetch_and_subscribe_logs(&our, &mut state); + let db = DB::connect(&our).expect("failed to open DB"); + let kimap_helper = kimap::Kimap::new(eth_provider, eth::Address::from_str(KIMAP_ADDRESS).unwrap()); + let last_saved_block = db.get_last_saved_block().unwrap_or(0); + + println!( + "chain started, indexing on kimap address {} at block {}", + KIMAP_ADDRESS, last_saved_block + ); + let mut state = State { + kimap: kimap_helper, + last_saved_block, + db, + }; + + fetch_and_subscribe_logs(&our, &mut state, last_saved_block); loop { match await_message() { @@ -126,17 +374,15 @@ fn init(our: Address) { fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow::Result<()> { if !message.is_request() { if message.is_local(&our) && message.source().process == "timer:distro:sys" { - // handling of ETH RPC subscriptions delayed by DELAY_MS - // to allow kns to have a chance to process block: handle now let Some(context) = message.context() else { - return Err(anyhow::anyhow!("foo")); + return Err(anyhow::anyhow!("No context in timer message")); }; let log = serde_json::from_slice(context)?; handle_eth_log(our, state, log, false)?; return Ok(()); } } else { - match message.body().try_into()? { + match serde_json::from_slice::(message.body())? { Req::Eth(eth_result) => { if !message.is_local(our) || message.source().process != "eth:distro:sys" { return Err(anyhow::anyhow!( @@ -147,16 +393,11 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow if let Ok(eth::EthSub { result, .. }) = eth_result { if let eth::SubscriptionResult::Log(ref log) = result { - // delay handling of ETH RPC subscriptions by DELAY_MS - // to allow kns to have a chance to process block timer::set_timer(DELAY_MS, Some(serde_json::to_vec(log)?)); } } else { - // attempt to resubscribe - state - .kimap - .provider - .subscribe_loop(1, app_store_filter(state)); + // re-subscribe if error + state.kimap.provider.subscribe_loop(1, app_store_filter(state)); } } Req::Request(chains) => { @@ -171,48 +412,37 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result<()> { match req { ChainRequests::GetApp(package_id) => { - let onchain_app = state - .listings - .get(&package_id.clone().to_process_lib()) - .map(|app| OnchainApp { - package_id: package_id, - tba: app.tba.to_string(), - metadata_uri: app.metadata_uri.clone(), - metadata_hash: app.metadata_hash.clone(), - metadata: app.metadata.as_ref().map(|m| m.clone().into()), - auto_update: app.auto_update, - }); + let pid = package_id.clone().to_process_lib(); + let listing = state.db.get_listing(&pid)?; + let onchain_app = listing.map(|app| app.to_onchain_app(&pid)); let response = ChainResponses::GetApp(onchain_app); Response::new().body(&response).send()?; } ChainRequests::GetApps => { - let apps: Vec = state - .listings - .iter() - .map(|(id, listing)| listing.to_onchain_app(id)) + let listings = state.db.get_all_listings()?; + let apps: Vec = listings + .into_iter() + .map(|(pid, listing)| listing.to_onchain_app(&pid)) .collect(); - let response = ChainResponses::GetApps(apps); Response::new().body(&response).send()?; } ChainRequests::GetOurApps => { - let apps: Vec = state - .published - .iter() - .filter_map(|id| { - state - .listings - .get(id) - .map(|listing| listing.to_onchain_app(id)) - }) - .collect(); - + let published_list = state.db.get_all_published()?; + let mut apps = Vec::new(); + for pid in published_list { + if let Some(listing) = state.db.get_listing(&pid)? { + apps.push(listing.to_onchain_app(&pid)); + } + } let response = ChainResponses::GetOurApps(apps); Response::new().body(&response).send()?; } ChainRequests::StartAutoUpdate(package_id) => { - if let Some(listing) = state.listings.get_mut(&package_id.to_process_lib()) { + let pid = package_id.to_process_lib(); + if let Some(mut listing) = state.db.get_listing(&pid)? { listing.auto_update = true; + state.db.insert_or_update_listing(&pid, &listing)?; let response = ChainResponses::AutoUpdateStarted; Response::new().body(&response).send()?; } else { @@ -221,8 +451,10 @@ fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result } } ChainRequests::StopAutoUpdate(package_id) => { - if let Some(listing) = state.listings.get_mut(&package_id.to_process_lib()) { + let pid = package_id.to_process_lib(); + if let Some(mut listing) = state.db.get_listing(&pid)? { listing.auto_update = false; + state.db.insert_or_update_listing(&pid, &listing)?; let response = ChainResponses::AutoUpdateStopped; Response::new().body(&response).send()?; } else { @@ -256,7 +488,7 @@ fn handle_eth_log( if package.is_empty() || publisher.is_empty() { Err(anyhow::anyhow!("invalid publisher name")) } else { - Ok(PackageId::new(&package, &publisher)) + Ok(PackageId::new(package, publisher)) } })?; @@ -265,8 +497,8 @@ fn handle_eth_log( // at the URI. let metadata_uri = String::from_utf8_lossy(¬e.data).to_string(); - let is_our_package = &package_id.publisher() == &our.node(); - + let is_our_package = package_id.publisher() == our.node(); + println!("we got a log: {block_number} {package_id} {metadata_uri}"); let (tba, metadata_hash) = if !startup { // generate ~metadata-hash full-path let hash_note = format!("~metadata-hash.{}", note.parent_path); @@ -290,10 +522,12 @@ fn handle_eth_log( match data { None => { - // if ~metadata-uri is also empty, this is an unpublish action! + // unpublish if metadata_uri empty if metadata_uri.is_empty() { - state.published.remove(&package_id); - state.listings.remove(&package_id); + state.db.delete_published(&package_id)?; + state.db.delete_listing(&package_id)?; + state.last_saved_block = block_number; + state.db.set_last_saved_block(block_number)?; return Ok(()); } return Err(anyhow::anyhow!( @@ -307,7 +541,7 @@ fn handle_eth_log( }; if is_our_package { - state.published.insert(package_id.clone()); + state.db.insert_published(&package_id)?; } // if this is a startup event, we don't need to fetch metadata from the URI -- @@ -320,109 +554,143 @@ fn handle_eth_log( None }; - match state.listings.entry(package_id.clone()) { - std::collections::hash_map::Entry::Occupied(mut listing) => { - let listing = listing.get_mut(); - listing.metadata_uri = metadata_uri; - listing.tba = tba; - listing.metadata_hash = metadata_hash; - listing.metadata = metadata.clone(); - } - std::collections::hash_map::Entry::Vacant(listing) => { - listing.insert(PackageListing { - tba, - metadata_uri, - metadata_hash, - metadata: metadata.clone(), - auto_update: false, - }); - } + let mut listing = state + .db + .get_listing(&package_id)? + .unwrap_or(PackageListing { + tba, + metadata_uri: metadata_uri.clone(), + metadata_hash: metadata_hash.clone(), + metadata: metadata.clone(), + auto_update: false, + block: block_number, + }); + // update fields + listing.tba = tba; + listing.metadata_uri = metadata_uri; + listing.metadata_hash = metadata_hash; + listing.metadata = metadata.clone(); + + state.db.insert_or_update_listing(&package_id, &listing)?; + + if !startup && listing.auto_update { + println!("kicking off auto-update for: {}", package_id); + Request::to(("our", "downloads", "app_store", "sys")) + .body(&DownloadRequests::AutoUpdate(AutoUpdateRequest { + package_id: crate::kinode::process::main::PackageId::from_process_lib( + package_id.clone(), + ), + metadata: metadata.unwrap().into(), + })) + .send() + .unwrap(); } if !startup { - // if auto_update is enabled, send a message to downloads to kick off the update. - if let Some(listing) = state.listings.get(&package_id) { - if listing.auto_update { - print_to_terminal(0, &format!("kicking off auto-update for: {}", package_id)); - Request::to(("our", "downloads", "app_store", "sys")) - .body(&DownloadRequests::AutoUpdate(AutoUpdateRequest { - package_id: crate::kinode::process::main::PackageId::from_process_lib( - package_id, - ), - metadata: metadata.unwrap().into(), - })) - .send() - .unwrap(); - } - } + state.last_saved_block = block_number; + state.db.set_last_saved_block(block_number)?; } - state.last_saved_block = block_number; - Ok(()) } /// after startup, fetch metadata for all listings /// we do this as a separate step to not repeatedly fetch outdated metadata /// as we process logs. -fn update_all_metadata(state: &mut State) { - state.listings.retain(|package_id, listing| { - let (tba, metadata_hash) = { - // generate ~metadata-hash full-path - let hash_note = format!( - "~metadata-hash.{}.{}", - package_id.package(), - package_id.publisher() - ); +fn update_all_metadata(state: &mut State, last_saved_block: u64) { + let updated_listings = match state.db.get_listings_since_block(last_saved_block) { + Ok(listings) => listings, + Err(e) => { + print_to_terminal(1, &format!("error fetching updated listings since block {last_saved_block}: {e}")); + return; + } + }; - // owner can change which we don't track (yet?) so don't save, need to get when desired - let Ok((tba, _owner, data)) = (match state.kimap.get(&hash_note) { - Ok(gr) => Ok(gr), - Err(e) => match e { - eth::EthError::RpcError(_) => { - // retry on RpcError after DELAY_MS sleep - // sleep here rather than with, e.g., a message to - // `timer:distro:sys` so that events are processed in - // order of receipt - std::thread::sleep(std::time::Duration::from_millis(DELAY_MS)); - state.kimap.get(&hash_note) + for (pid, mut listing) in updated_listings { + let hash_note = format!("~metadata-hash.{}.{}", pid.package(), pid.publisher()); + let (tba, metadata_hash) = match state.kimap.get(&hash_note) { + Ok((t, _o, data)) => { + match data { + None => { + // If metadata_uri empty, unpublish + if listing.metadata_uri.is_empty() { + if let Err(e) = state.db.delete_published(&pid) { + print_to_terminal(1, &format!("error deleting published: {e}")); + } + } + if let Err(e) = state.db.delete_listing(&pid) { + print_to_terminal(1, &format!("error deleting listing: {e}")); + } + continue; } - _ => Err(e), - }, - }) else { - return false; - }; - - match data { - None => { - // if ~metadata-uri is also empty, this is an unpublish action! - if listing.metadata_uri.is_empty() { - state.published.remove(package_id); - } - return false; + Some(hash_note) => (t, String::from_utf8_lossy(&hash_note).to_string()), + } + } + Err(e) => { + // If RpcError, retry once after delay + if let eth::EthError::RpcError(_) = e { + std::thread::sleep(std::time::Duration::from_millis(DELAY_MS)); + match state.kimap.get(&hash_note) { + Ok((t, _o, data)) => { + if let Some(hash_note) = data { + (t, String::from_utf8_lossy(&hash_note).to_string()) + } else { + // no data again after retry + if listing.metadata_uri.is_empty() { + if let Err(e) = state.db.delete_published(&pid) { + print_to_terminal(1, &format!("error deleting published: {e}")); + } + } + if let Err(e) = state.db.delete_listing(&pid) { + print_to_terminal(1, &format!("error deleting listing: {e}")); + } + continue; + } + } + Err(e2) => { + print_to_terminal(1, &format!("error retrieving metadata-hash after retry: {e2:?}")); + continue; + } + } + } else { + print_to_terminal(1, &format!("error retrieving metadata-hash: {e:?} for {pid}")); + continue; } - Some(hash_note) => (tba, String::from_utf8_lossy(&hash_note).to_string()), } }; + + // Update listing fields listing.tba = tba; listing.metadata_hash = metadata_hash; - let metadata = - fetch_metadata_from_url(&listing.metadata_uri, &listing.metadata_hash, 30).ok(); + + let metadata = match fetch_metadata_from_url(&listing.metadata_uri, &listing.metadata_hash, 30) { + Ok(md) => Some(md), + Err(err) => { + print_to_terminal(1, &format!("error fetching metadata for {}: {err}", pid)); + None + } + }; listing.metadata = metadata.clone(); - if listing.auto_update { - print_to_terminal(0, &format!("kicking off auto-update for: {}", package_id)); - Request::to(("our", "downloads", "app_store", "sys")) - .body(&DownloadRequests::AutoUpdate(AutoUpdateRequest { - package_id: crate::kinode::process::main::PackageId::from_process_lib( - package_id.clone(), - ), - metadata: metadata.unwrap().into(), - })) - .send() - .unwrap(); + + if let Err(e) = state.db.insert_or_update_listing(&pid, &listing) { + print_to_terminal(1, &format!("error updating listing {}: {e}", pid)); } - true - }); + + if listing.auto_update { + if let Some(md) = metadata { + print_to_terminal(0, &format!("kicking off auto-update for: {}", pid)); + if let Err(e) = Request::to(("our", "downloads", "app_store", "sys")) + .body(&DownloadRequests::AutoUpdate(AutoUpdateRequest { + package_id: crate::kinode::process::main::PackageId::from_process_lib(pid.clone()), + metadata: md.into(), + })) + .send() + { + print_to_terminal(1, &format!("error sending auto-update request: {e}")); + } + } + } + } } /// create the filter used for app store getLogs and subscription. @@ -441,21 +709,25 @@ pub fn app_store_filter(state: &State) -> eth::Filter { } /// create a filter to fetch app store event logs from chain and subscribe to new events -pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State) { +pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State, last_saved_block: u64) { let filter = app_store_filter(state); // get past logs, subscribe to new ones. // subscribe first so we don't miss any logs - println!("subscribing..."); - state.kimap.provider.subscribe_loop(1, filter.clone()); - for log in fetch_logs( - &state.kimap.provider, - &filter.from_block(state.last_saved_block), - ) { + state.kimap.provider.subscribe_loop(1, filter.clone()); + println!("fetching old logs from block {last_saved_block}"); + for log in fetch_logs(&state.kimap.provider, &filter.from_block(last_saved_block)) { if let Err(e) = handle_eth_log(our, state, log, true) { print_to_terminal(1, &format!("error ingesting log: {e}")); }; } - update_all_metadata(state); + + update_all_metadata(state, last_saved_block); + // save updated last_saved_block + if let Ok(block_number) = state.kimap.provider.get_block_number() { + state.last_saved_block = block_number; + state.db.set_last_saved_block(block_number).unwrap(); + } + println!("up to date to block {}", state.last_saved_block); } /// fetch logs from the chain with a given filter @@ -504,32 +776,6 @@ pub fn keccak_256_hash(bytes: &[u8]) -> String { format!("0x{:x}", hasher.finalize()) } -/// fetch state from disk or create a new one if that fails -pub fn fetch_state(provider: eth::Provider) -> State { - if let Some(state_bytes) = get_state() { - match serde_json::from_slice::(&state_bytes) { - Ok(state) => { - if state.kimap.address().to_string() == KIMAP_ADDRESS { - return state; - } else { - println!( - "state contract address mismatch. rebuilding state! expected {}, got {}", - KIMAP_ADDRESS, - state.kimap.address().to_string() - ); - } - } - Err(e) => println!("failed to deserialize saved state, rebuilding: {e}"), - } - } - State { - kimap: kimap::Kimap::new(provider, eth::Address::from_str(KIMAP_ADDRESS).unwrap()), - last_saved_block: 0, - listings: HashMap::new(), - published: HashSet::new(), - } -} - // quite annoyingly, we must convert from our gen'd version of PackageId // to the process_lib's gen'd version. this is in order to access custom // Impls that we want to use @@ -584,4 +830,4 @@ impl From for OnchainMetadata { }, } } -} +} \ No newline at end of file diff --git a/kinode/packages/app_store/pkg/manifest.json b/kinode/packages/app_store/pkg/manifest.json index d409e265..b01c2aa1 100644 --- a/kinode/packages/app_store/pkg/manifest.json +++ b/kinode/packages/app_store/pkg/manifest.json @@ -39,6 +39,7 @@ "eth:distro:sys", "http_server:distro:sys", "http_client:distro:sys", + "sqlite:distro:sys", { "process": "vfs:distro:sys", "params": { @@ -52,6 +53,7 @@ "vfs:distro:sys", "http_client:distro:sys", "eth:distro:sys", + "sqlite:distro:sys", "timer:distro:sys" ], "public": false