app_store: sqlite storage backend

This commit is contained in:
bitful-pannul 2024-12-10 21:54:52 +02:00
parent 3259f279ea
commit 9fddb2cedd
2 changed files with 433 additions and 185 deletions

View File

@ -33,14 +33,14 @@ use alloy_primitives::keccak256;
use alloy_sol_types::SolEvent;
use kinode::process::chain::ChainResponses;
use kinode_process_lib::{
await_message, call_init, eth, get_blob, get_state, http, kernel_types as kt, kimap,
await_message, call_init, eth, http, kimap, get_blob,
print_to_terminal, println, timer, Address, Message, PackageId, Request, Response,
sqlite::{self, Sqlite},
kernel_types as kt,
};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
str::FromStr,
};
use std::str::FromStr;
use std::collections::HashMap;
wit_bindgen::generate!({
path: "target/wit",
@ -63,7 +63,6 @@ const KIMAP_ADDRESS: &str = "0x9CE8cCD2932DC727c70f9ae4f8C2b68E6Abed58C";
const DELAY_MS: u64 = 1_000; // 1s
#[derive(Debug, Serialize, Deserialize)]
pub struct State {
/// the kimap helper we are using
pub kimap: kimap::Kimap,
@ -71,10 +70,8 @@ pub struct State {
/// when we boot, we can read logs starting from this block and
/// rebuild latest state.
pub last_saved_block: u64,
/// onchain listings
pub listings: HashMap<PackageId, PackageListing>,
/// set of packages that we have published
pub published: HashSet<PackageId>,
/// tables: listings: <packade_id, listing>, published: vec<package_id>
pub db: DB,
}
/// listing information derived from metadata hash in listing event
@ -83,10 +80,9 @@ pub struct PackageListing {
pub tba: eth::Address,
pub metadata_uri: String,
pub metadata_hash: String,
// should this even be optional?
// relegate to only valid apps maybe?
pub metadata: Option<kt::Erc721Metadata>,
pub auto_update: bool,
pub block: u64
}
#[derive(Debug, Serialize, Deserialize, process_macros::SerdeJsonInto)]
@ -96,18 +92,270 @@ pub enum Req {
Request(ChainRequests),
}
pub struct DB {
inner: Sqlite,
}
impl DB {
pub fn connect(our: &Address) -> anyhow::Result<Self> {
let inner = sqlite::open(our.package_id(), "app_store_chain.sqlite", Some(10))?;
// create tables
inner.write(CREATE_META_TABLE.into(), vec![], None)?;
inner.write(CREATE_LISTINGS_TABLE.into(), vec![], None)?;
inner.write(CREATE_PUBLISHED_TABLE.into(), vec![], None)?;
Ok(Self { inner })
}
pub fn get_last_saved_block(&self) -> anyhow::Result<u64> {
let query = "SELECT value FROM meta WHERE key = 'last_saved_block'";
let rows = self.inner.read(query.into(), vec![])?;
if let Some(row) = rows.get(0) {
if let Some(val_str) = row.get("value").and_then(|v| v.as_str()) {
if let Ok(block) = val_str.parse::<u64>() {
return Ok(block);
}
}
}
Ok(0)
}
pub fn set_last_saved_block(&self, block: u64) -> anyhow::Result<()> {
let query = "INSERT INTO meta (key, value) VALUES ('last_saved_block', ?)
ON CONFLICT(key) DO UPDATE SET value=excluded.value";
let params = vec![block.to_string().into()];
self.inner.write(query.into(), params, None)?;
Ok(())
}
pub fn insert_or_update_listing(&self, package_id: &PackageId, listing: &PackageListing) -> anyhow::Result<()> {
let metadata_json = if let Some(m) = &listing.metadata {
serde_json::to_string(m)?
} else {
"".to_string()
};
let query = "INSERT INTO listings (package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(package_name, publisher_node)
DO UPDATE SET
tba=excluded.tba,
metadata_uri=excluded.metadata_uri,
metadata_hash=excluded.metadata_hash,
metadata_json=excluded.metadata_json,
auto_update=excluded.auto_update,
block=excluded.block";
let params = vec![
package_id.package_name.clone().into(),
package_id.publisher_node.clone().into(),
listing.tba.to_string().into(),
listing.metadata_uri.clone().into(),
listing.metadata_hash.clone().into(),
metadata_json.into(),
(if listing.auto_update {1} else {0}).into(),
listing.block.into()
];
self.inner.write(query.into(), params, None)?;
Ok(())
}
pub fn delete_listing(&self, package_id: &PackageId) -> anyhow::Result<()> {
let query = "DELETE FROM listings WHERE package_name = ? AND publisher_node = ?";
let params = vec![
package_id.package_name.clone().into(),
package_id.publisher_node.clone().into(),
];
self.inner.write(query.into(), params, None)?;
Ok(())
}
pub fn get_listing(&self, package_id: &PackageId) -> anyhow::Result<Option<PackageListing>> {
let query = "SELECT tba, metadata_uri, metadata_hash, metadata_json, auto_update, block FROM listings WHERE package_name = ? AND publisher_node = ?";
let params = vec![
package_id.package_name.clone().into(),
package_id.publisher_node.clone().into(),
];
let rows = self.inner.read(query.into(), params)?;
if let Some(row) = rows.get(0) {
Ok(Some(self.row_to_listing(row)?))
} else {
Ok(None)
}
}
pub fn get_all_listings(&self) -> anyhow::Result<Vec<(PackageId, PackageListing)>> {
let query = "SELECT package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block FROM listings";
let rows = self.inner.read(query.into(), vec![])?;
let mut listings = Vec::new();
for row in rows {
let pid = PackageId {
package_name: row["package_name"].as_str().unwrap_or("").to_string(),
publisher_node: row["publisher_node"].as_str().unwrap_or("").to_string(),
};
let listing = self.row_to_listing(&row)?;
listings.push((pid, listing));
}
Ok(listings)
}
pub fn get_listings_batch(&self, limit: u64, offset: u64) -> anyhow::Result<Vec<(PackageId, PackageListing)>> {
let query = format!(
"SELECT package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block
FROM listings
ORDER BY package_name, publisher_node
LIMIT {} OFFSET {}",
limit, offset
);
let rows = self.inner.read(query, vec![])?;
let mut listings = Vec::new();
for row in rows {
let pid = PackageId {
package_name: row["package_name"].as_str().unwrap_or("").to_string(),
publisher_node: row["publisher_node"].as_str().unwrap_or("").to_string(),
};
let listing = self.row_to_listing(&row)?;
listings.push((pid, listing));
}
Ok(listings)
}
pub fn get_listings_since_block(&self, block_number: u64) -> anyhow::Result<Vec<(PackageId, PackageListing)>> {
let query = "SELECT package_name, publisher_node, tba, metadata_uri, metadata_hash, metadata_json, auto_update, block
FROM listings
WHERE block > ?";
let params = vec![block_number.into()];
let rows = self.inner.read(query.into(), params)?;
let mut listings = Vec::new();
for row in rows {
let pid = PackageId {
package_name: row["package_name"].as_str().unwrap_or("").to_string(),
publisher_node: row["publisher_node"].as_str().unwrap_or("").to_string(),
};
let listing = self.row_to_listing(&row)?;
listings.push((pid, listing));
}
Ok(listings)
}
pub fn row_to_listing(&self, row: &HashMap<String, serde_json::Value>) -> anyhow::Result<PackageListing> {
let tba_str = row["tba"].as_str().ok_or_else(|| anyhow::anyhow!("Invalid tba"))?;
let tba = tba_str.parse::<eth::Address>()?;
let metadata_uri = row["metadata_uri"].as_str().unwrap_or("").to_string();
let metadata_hash = row["metadata_hash"].as_str().unwrap_or("").to_string();
let metadata_json = row["metadata_json"].as_str().unwrap_or("");
let metadata: Option<kinode_process_lib::kernel_types::Erc721Metadata> =
if metadata_json.is_empty() {
None
} else {
serde_json::from_str(metadata_json)?
};
let auto_update = row["auto_update"].as_i64().unwrap_or(0) == 1;
let block = row["block"].as_i64().unwrap_or(0) as u64;
Ok(PackageListing {
tba,
metadata_uri,
metadata_hash,
metadata,
auto_update,
block,
})
}
pub fn get_published(&self, package_id: &PackageId) -> anyhow::Result<bool> {
let query = "SELECT 1 FROM published WHERE package_name = ? AND publisher_node = ?";
let params = vec![
package_id.package_name.clone().into(),
package_id.publisher_node.clone().into(),
];
let rows = self.inner.read(query.into(), params)?;
Ok(!rows.is_empty())
}
pub fn insert_published(&self, package_id: &PackageId) -> anyhow::Result<()> {
let query = "INSERT INTO published (package_name, publisher_node) VALUES (?, ?) ON CONFLICT DO NOTHING";
let params = vec![
package_id.package_name.clone().into(),
package_id.publisher_node.clone().into(),
];
self.inner.write(query.into(), params, None)?;
Ok(())
}
pub fn delete_published(&self, package_id: &PackageId) -> anyhow::Result<()> {
let query = "DELETE FROM published WHERE package_name = ? AND publisher_node = ?";
let params = vec![
package_id.package_name.clone().into(),
package_id.publisher_node.clone().into(),
];
self.inner.write(query.into(), params, None)?;
Ok(())
}
pub fn get_all_published(&self) -> anyhow::Result<Vec<PackageId>> {
let query = "SELECT package_name, publisher_node FROM published";
let rows = self.inner.read(query.into(), vec![])?;
let mut result = Vec::new();
for row in rows {
let pid = PackageId {
package_name: row["package_name"].as_str().unwrap_or("").to_string(),
publisher_node: row["publisher_node"].as_str().unwrap_or("").to_string(),
};
result.push(pid);
}
Ok(result)
}
}
const CREATE_META_TABLE: &str = "
CREATE TABLE IF NOT EXISTS meta (
key TEXT PRIMARY KEY,
value TEXT
);";
const CREATE_LISTINGS_TABLE: &str = "
CREATE TABLE IF NOT EXISTS listings (
package_name TEXT NOT NULL,
publisher_node TEXT NOT NULL,
tba TEXT NOT NULL,
metadata_uri TEXT,
metadata_hash TEXT,
metadata_json TEXT,
auto_update INTEGER NOT NULL DEFAULT 0,
block INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (package_name, publisher_node)
);";
const CREATE_PUBLISHED_TABLE: &str = "
CREATE TABLE IF NOT EXISTS published (
package_name TEXT NOT NULL,
publisher_node TEXT NOT NULL,
PRIMARY KEY (package_name, publisher_node)
);";
call_init!(init);
fn init(our: Address) {
println!(
"chain started, indexing on contract address {}",
KIMAP_ADDRESS
);
// create new provider with request-timeout of 60s
// can change, log requests can take quite a long time.
let eth_provider: eth::Provider = eth::Provider::new(CHAIN_ID, CHAIN_TIMEOUT);
let mut state = fetch_state(eth_provider);
fetch_and_subscribe_logs(&our, &mut state);
let db = DB::connect(&our).expect("failed to open DB");
let kimap_helper = kimap::Kimap::new(eth_provider, eth::Address::from_str(KIMAP_ADDRESS).unwrap());
let last_saved_block = db.get_last_saved_block().unwrap_or(0);
println!(
"chain started, indexing on kimap address {} at block {}",
KIMAP_ADDRESS, last_saved_block
);
let mut state = State {
kimap: kimap_helper,
last_saved_block,
db,
};
fetch_and_subscribe_logs(&our, &mut state, last_saved_block);
loop {
match await_message() {
@ -126,17 +374,15 @@ fn init(our: Address) {
fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow::Result<()> {
if !message.is_request() {
if message.is_local(&our) && message.source().process == "timer:distro:sys" {
// handling of ETH RPC subscriptions delayed by DELAY_MS
// to allow kns to have a chance to process block: handle now
let Some(context) = message.context() else {
return Err(anyhow::anyhow!("foo"));
return Err(anyhow::anyhow!("No context in timer message"));
};
let log = serde_json::from_slice(context)?;
handle_eth_log(our, state, log, false)?;
return Ok(());
}
} else {
match message.body().try_into()? {
match serde_json::from_slice::<Req>(message.body())? {
Req::Eth(eth_result) => {
if !message.is_local(our) || message.source().process != "eth:distro:sys" {
return Err(anyhow::anyhow!(
@ -147,16 +393,11 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow
if let Ok(eth::EthSub { result, .. }) = eth_result {
if let eth::SubscriptionResult::Log(ref log) = result {
// delay handling of ETH RPC subscriptions by DELAY_MS
// to allow kns to have a chance to process block
timer::set_timer(DELAY_MS, Some(serde_json::to_vec(log)?));
}
} else {
// attempt to resubscribe
state
.kimap
.provider
.subscribe_loop(1, app_store_filter(state));
// re-subscribe if error
state.kimap.provider.subscribe_loop(1, app_store_filter(state));
}
}
Req::Request(chains) => {
@ -171,48 +412,37 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow
fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result<()> {
match req {
ChainRequests::GetApp(package_id) => {
let onchain_app = state
.listings
.get(&package_id.clone().to_process_lib())
.map(|app| OnchainApp {
package_id: package_id,
tba: app.tba.to_string(),
metadata_uri: app.metadata_uri.clone(),
metadata_hash: app.metadata_hash.clone(),
metadata: app.metadata.as_ref().map(|m| m.clone().into()),
auto_update: app.auto_update,
});
let pid = package_id.clone().to_process_lib();
let listing = state.db.get_listing(&pid)?;
let onchain_app = listing.map(|app| app.to_onchain_app(&pid));
let response = ChainResponses::GetApp(onchain_app);
Response::new().body(&response).send()?;
}
ChainRequests::GetApps => {
let apps: Vec<OnchainApp> = state
.listings
.iter()
.map(|(id, listing)| listing.to_onchain_app(id))
let listings = state.db.get_all_listings()?;
let apps: Vec<OnchainApp> = listings
.into_iter()
.map(|(pid, listing)| listing.to_onchain_app(&pid))
.collect();
let response = ChainResponses::GetApps(apps);
Response::new().body(&response).send()?;
}
ChainRequests::GetOurApps => {
let apps: Vec<OnchainApp> = state
.published
.iter()
.filter_map(|id| {
state
.listings
.get(id)
.map(|listing| listing.to_onchain_app(id))
})
.collect();
let published_list = state.db.get_all_published()?;
let mut apps = Vec::new();
for pid in published_list {
if let Some(listing) = state.db.get_listing(&pid)? {
apps.push(listing.to_onchain_app(&pid));
}
}
let response = ChainResponses::GetOurApps(apps);
Response::new().body(&response).send()?;
}
ChainRequests::StartAutoUpdate(package_id) => {
if let Some(listing) = state.listings.get_mut(&package_id.to_process_lib()) {
let pid = package_id.to_process_lib();
if let Some(mut listing) = state.db.get_listing(&pid)? {
listing.auto_update = true;
state.db.insert_or_update_listing(&pid, &listing)?;
let response = ChainResponses::AutoUpdateStarted;
Response::new().body(&response).send()?;
} else {
@ -221,8 +451,10 @@ fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result
}
}
ChainRequests::StopAutoUpdate(package_id) => {
if let Some(listing) = state.listings.get_mut(&package_id.to_process_lib()) {
let pid = package_id.to_process_lib();
if let Some(mut listing) = state.db.get_listing(&pid)? {
listing.auto_update = false;
state.db.insert_or_update_listing(&pid, &listing)?;
let response = ChainResponses::AutoUpdateStopped;
Response::new().body(&response).send()?;
} else {
@ -256,7 +488,7 @@ fn handle_eth_log(
if package.is_empty() || publisher.is_empty() {
Err(anyhow::anyhow!("invalid publisher name"))
} else {
Ok(PackageId::new(&package, &publisher))
Ok(PackageId::new(package, publisher))
}
})?;
@ -265,8 +497,8 @@ fn handle_eth_log(
// at the URI.
let metadata_uri = String::from_utf8_lossy(&note.data).to_string();
let is_our_package = &package_id.publisher() == &our.node();
let is_our_package = package_id.publisher() == our.node();
println!("we got a log: {block_number} {package_id} {metadata_uri}");
let (tba, metadata_hash) = if !startup {
// generate ~metadata-hash full-path
let hash_note = format!("~metadata-hash.{}", note.parent_path);
@ -290,10 +522,12 @@ fn handle_eth_log(
match data {
None => {
// if ~metadata-uri is also empty, this is an unpublish action!
// unpublish if metadata_uri empty
if metadata_uri.is_empty() {
state.published.remove(&package_id);
state.listings.remove(&package_id);
state.db.delete_published(&package_id)?;
state.db.delete_listing(&package_id)?;
state.last_saved_block = block_number;
state.db.set_last_saved_block(block_number)?;
return Ok(());
}
return Err(anyhow::anyhow!(
@ -307,7 +541,7 @@ fn handle_eth_log(
};
if is_our_package {
state.published.insert(package_id.clone());
state.db.insert_published(&package_id)?;
}
// if this is a startup event, we don't need to fetch metadata from the URI --
@ -320,97 +554,27 @@ fn handle_eth_log(
None
};
match state.listings.entry(package_id.clone()) {
std::collections::hash_map::Entry::Occupied(mut listing) => {
let listing = listing.get_mut();
listing.metadata_uri = metadata_uri;
listing.tba = tba;
listing.metadata_hash = metadata_hash;
listing.metadata = metadata.clone();
}
std::collections::hash_map::Entry::Vacant(listing) => {
listing.insert(PackageListing {
let mut listing = state
.db
.get_listing(&package_id)?
.unwrap_or(PackageListing {
tba,
metadata_uri,
metadata_hash,
metadata_uri: metadata_uri.clone(),
metadata_hash: metadata_hash.clone(),
metadata: metadata.clone(),
auto_update: false,
block: block_number,
});
}
}
if !startup {
// if auto_update is enabled, send a message to downloads to kick off the update.
if let Some(listing) = state.listings.get(&package_id) {
if listing.auto_update {
print_to_terminal(0, &format!("kicking off auto-update for: {}", package_id));
Request::to(("our", "downloads", "app_store", "sys"))
.body(&DownloadRequests::AutoUpdate(AutoUpdateRequest {
package_id: crate::kinode::process::main::PackageId::from_process_lib(
package_id,
),
metadata: metadata.unwrap().into(),
}))
.send()
.unwrap();
}
}
}
state.last_saved_block = block_number;
Ok(())
}
/// after startup, fetch metadata for all listings
/// we do this as a separate step to not repeatedly fetch outdated metadata
/// as we process logs.
fn update_all_metadata(state: &mut State) {
state.listings.retain(|package_id, listing| {
let (tba, metadata_hash) = {
// generate ~metadata-hash full-path
let hash_note = format!(
"~metadata-hash.{}.{}",
package_id.package(),
package_id.publisher()
);
// owner can change which we don't track (yet?) so don't save, need to get when desired
let Ok((tba, _owner, data)) = (match state.kimap.get(&hash_note) {
Ok(gr) => Ok(gr),
Err(e) => match e {
eth::EthError::RpcError(_) => {
// retry on RpcError after DELAY_MS sleep
// sleep here rather than with, e.g., a message to
// `timer:distro:sys` so that events are processed in
// order of receipt
std::thread::sleep(std::time::Duration::from_millis(DELAY_MS));
state.kimap.get(&hash_note)
}
_ => Err(e),
},
}) else {
return false;
};
match data {
None => {
// if ~metadata-uri is also empty, this is an unpublish action!
if listing.metadata_uri.is_empty() {
state.published.remove(package_id);
}
return false;
}
Some(hash_note) => (tba, String::from_utf8_lossy(&hash_note).to_string()),
}
};
// update fields
listing.tba = tba;
listing.metadata_uri = metadata_uri;
listing.metadata_hash = metadata_hash;
let metadata =
fetch_metadata_from_url(&listing.metadata_uri, &listing.metadata_hash, 30).ok();
listing.metadata = metadata.clone();
if listing.auto_update {
print_to_terminal(0, &format!("kicking off auto-update for: {}", package_id));
state.db.insert_or_update_listing(&package_id, &listing)?;
if !startup && listing.auto_update {
println!("kicking off auto-update for: {}", package_id);
Request::to(("our", "downloads", "app_store", "sys"))
.body(&DownloadRequests::AutoUpdate(AutoUpdateRequest {
package_id: crate::kinode::process::main::PackageId::from_process_lib(
@ -421,8 +585,112 @@ fn update_all_metadata(state: &mut State) {
.send()
.unwrap();
}
true
});
if !startup {
state.last_saved_block = block_number;
state.db.set_last_saved_block(block_number)?;
}
Ok(())
}
/// after startup, fetch metadata for all listings
/// we do this as a separate step to not repeatedly fetch outdated metadata
/// as we process logs.
fn update_all_metadata(state: &mut State, last_saved_block: u64) {
let updated_listings = match state.db.get_listings_since_block(last_saved_block) {
Ok(listings) => listings,
Err(e) => {
print_to_terminal(1, &format!("error fetching updated listings since block {last_saved_block}: {e}"));
return;
}
};
for (pid, mut listing) in updated_listings {
let hash_note = format!("~metadata-hash.{}.{}", pid.package(), pid.publisher());
let (tba, metadata_hash) = match state.kimap.get(&hash_note) {
Ok((t, _o, data)) => {
match data {
None => {
// If metadata_uri empty, unpublish
if listing.metadata_uri.is_empty() {
if let Err(e) = state.db.delete_published(&pid) {
print_to_terminal(1, &format!("error deleting published: {e}"));
}
}
if let Err(e) = state.db.delete_listing(&pid) {
print_to_terminal(1, &format!("error deleting listing: {e}"));
}
continue;
}
Some(hash_note) => (t, String::from_utf8_lossy(&hash_note).to_string()),
}
}
Err(e) => {
// If RpcError, retry once after delay
if let eth::EthError::RpcError(_) = e {
std::thread::sleep(std::time::Duration::from_millis(DELAY_MS));
match state.kimap.get(&hash_note) {
Ok((t, _o, data)) => {
if let Some(hash_note) = data {
(t, String::from_utf8_lossy(&hash_note).to_string())
} else {
// no data again after retry
if listing.metadata_uri.is_empty() {
if let Err(e) = state.db.delete_published(&pid) {
print_to_terminal(1, &format!("error deleting published: {e}"));
}
}
if let Err(e) = state.db.delete_listing(&pid) {
print_to_terminal(1, &format!("error deleting listing: {e}"));
}
continue;
}
}
Err(e2) => {
print_to_terminal(1, &format!("error retrieving metadata-hash after retry: {e2:?}"));
continue;
}
}
} else {
print_to_terminal(1, &format!("error retrieving metadata-hash: {e:?} for {pid}"));
continue;
}
}
};
// Update listing fields
listing.tba = tba;
listing.metadata_hash = metadata_hash;
let metadata = match fetch_metadata_from_url(&listing.metadata_uri, &listing.metadata_hash, 30) {
Ok(md) => Some(md),
Err(err) => {
print_to_terminal(1, &format!("error fetching metadata for {}: {err}", pid));
None
}
};
listing.metadata = metadata.clone();
if let Err(e) = state.db.insert_or_update_listing(&pid, &listing) {
print_to_terminal(1, &format!("error updating listing {}: {e}", pid));
}
if listing.auto_update {
if let Some(md) = metadata {
print_to_terminal(0, &format!("kicking off auto-update for: {}", pid));
if let Err(e) = Request::to(("our", "downloads", "app_store", "sys"))
.body(&DownloadRequests::AutoUpdate(AutoUpdateRequest {
package_id: crate::kinode::process::main::PackageId::from_process_lib(pid.clone()),
metadata: md.into(),
}))
.send()
{
print_to_terminal(1, &format!("error sending auto-update request: {e}"));
}
}
}
}
}
/// create the filter used for app store getLogs and subscription.
@ -441,21 +709,25 @@ pub fn app_store_filter(state: &State) -> eth::Filter {
}
/// create a filter to fetch app store event logs from chain and subscribe to new events
pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State) {
pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State, last_saved_block: u64) {
let filter = app_store_filter(state);
// get past logs, subscribe to new ones.
// subscribe first so we don't miss any logs
println!("subscribing...");
state.kimap.provider.subscribe_loop(1, filter.clone());
for log in fetch_logs(
&state.kimap.provider,
&filter.from_block(state.last_saved_block),
) {
println!("fetching old logs from block {last_saved_block}");
for log in fetch_logs(&state.kimap.provider, &filter.from_block(last_saved_block)) {
if let Err(e) = handle_eth_log(our, state, log, true) {
print_to_terminal(1, &format!("error ingesting log: {e}"));
};
}
update_all_metadata(state);
update_all_metadata(state, last_saved_block);
// save updated last_saved_block
if let Ok(block_number) = state.kimap.provider.get_block_number() {
state.last_saved_block = block_number;
state.db.set_last_saved_block(block_number).unwrap();
}
println!("up to date to block {}", state.last_saved_block);
}
/// fetch logs from the chain with a given filter
@ -504,32 +776,6 @@ pub fn keccak_256_hash(bytes: &[u8]) -> String {
format!("0x{:x}", hasher.finalize())
}
/// fetch state from disk or create a new one if that fails
pub fn fetch_state(provider: eth::Provider) -> State {
if let Some(state_bytes) = get_state() {
match serde_json::from_slice::<State>(&state_bytes) {
Ok(state) => {
if state.kimap.address().to_string() == KIMAP_ADDRESS {
return state;
} else {
println!(
"state contract address mismatch. rebuilding state! expected {}, got {}",
KIMAP_ADDRESS,
state.kimap.address().to_string()
);
}
}
Err(e) => println!("failed to deserialize saved state, rebuilding: {e}"),
}
}
State {
kimap: kimap::Kimap::new(provider, eth::Address::from_str(KIMAP_ADDRESS).unwrap()),
last_saved_block: 0,
listings: HashMap::new(),
published: HashSet::new(),
}
}
// quite annoyingly, we must convert from our gen'd version of PackageId
// to the process_lib's gen'd version. this is in order to access custom
// Impls that we want to use

View File

@ -39,6 +39,7 @@
"eth:distro:sys",
"http_server:distro:sys",
"http_client:distro:sys",
"sqlite:distro:sys",
{
"process": "vfs:distro:sys",
"params": {
@ -52,6 +53,7 @@
"vfs:distro:sys",
"http_client:distro:sys",
"eth:distro:sys",
"sqlite:distro:sys",
"timer:distro:sys"
],
"public": false