fixes: sub first, then get logs, fix to kimap helpers in process_lib, remove vfs print

This commit is contained in:
dr-frmr 2024-08-01 02:56:41 +03:00
parent 160a94bcd2
commit 56fa1530b7
No known key found for this signature in database
6 changed files with 87 additions and 77 deletions

54
Cargo.lock generated
View File

@ -199,7 +199,7 @@ dependencies = [
"itoa", "itoa",
"serde", "serde",
"serde_json", "serde_json",
"winnow 0.6.16", "winnow 0.6.18",
] ]
[[package]] [[package]]
@ -583,7 +583,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbcba3ca07cf7975f15d871b721fb18031eec8bce51103907f6dcce00b255d98" checksum = "cbcba3ca07cf7975f15d871b721fb18031eec8bce51103907f6dcce00b255d98"
dependencies = [ dependencies = [
"serde", "serde",
"winnow 0.6.16", "winnow 0.6.18",
] ]
[[package]] [[package]]
@ -1394,9 +1394,9 @@ dependencies = [
[[package]] [[package]]
name = "clap" name = "clap"
version = "4.5.11" version = "4.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35723e6a11662c2afb578bcf0b88bf6ea8e21282a953428f240574fcc3a2b5b3" checksum = "0fbb260a053428790f3de475e304ff84cdbc4face759ea7a3e64c1edd938a7fc"
dependencies = [ dependencies = [
"clap_builder", "clap_builder",
"clap_derive", "clap_derive",
@ -1404,9 +1404,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_builder" name = "clap_builder"
version = "4.5.11" version = "4.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49eb96cbfa7cfa35017b7cd548c75b14c3118c98b423041d70562665e07fb0fa" checksum = "64b17d7ea74e9f833c7dbf2cbe4fb12ff26783eda4782a8975b72f895c9b4d99"
dependencies = [ dependencies = [
"anstream", "anstream",
"anstyle", "anstyle",
@ -1416,9 +1416,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_derive" name = "clap_derive"
version = "4.5.11" version = "4.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d029b67f89d30bbb547c89fd5161293c0aec155fc691d7924b64550662db93e" checksum = "501d359d5f3dcaf6ecdeee48833ae73ec6e42723a1e52419c79abf9507eec0a0"
dependencies = [ dependencies = [
"heck 0.5.0", "heck 0.5.0",
"proc-macro2", "proc-macro2",
@ -3008,9 +3008,9 @@ checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683"
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "2.2.6" version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0"
dependencies = [ dependencies = [
"equivalent", "equivalent",
"hashbrown 0.14.5", "hashbrown 0.14.5",
@ -3313,7 +3313,7 @@ dependencies = [
[[package]] [[package]]
name = "kinode_process_lib" name = "kinode_process_lib"
version = "0.9.0" version = "0.9.0"
source = "git+https://github.com/kinode-dao/process_lib?branch=develop#05ab125d3e9e733f59301253c51bdf1b14f61140" source = "git+https://github.com/kinode-dao/process_lib?branch=develop#51800f9c144b3b69ed52406b4b2ae4c5aa078aec"
dependencies = [ dependencies = [
"alloy", "alloy",
"alloy-primitives", "alloy-primitives",
@ -5708,14 +5708,14 @@ dependencies = [
[[package]] [[package]]
name = "toml" name = "toml"
version = "0.8.17" version = "0.8.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a44eede9b727419af8095cb2d72fab15487a541f54647ad4414b34096ee4631" checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e"
dependencies = [ dependencies = [
"serde", "serde",
"serde_spanned", "serde_spanned",
"toml_datetime", "toml_datetime",
"toml_edit 0.22.18", "toml_edit 0.22.20",
] ]
[[package]] [[package]]
@ -5740,15 +5740,15 @@ dependencies = [
[[package]] [[package]]
name = "toml_edit" name = "toml_edit"
version = "0.22.18" version = "0.22.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1490595c74d930da779e944f5ba2ecdf538af67df1a9848cbd156af43c1b7cf0" checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d"
dependencies = [ dependencies = [
"indexmap", "indexmap",
"serde", "serde",
"serde_spanned", "serde_spanned",
"toml_datetime", "toml_datetime",
"winnow 0.6.16", "winnow 0.6.18",
] ]
[[package]] [[package]]
@ -6330,9 +6330,9 @@ dependencies = [
[[package]] [[package]]
name = "wasm-encoder" name = "wasm-encoder"
version = "0.214.0" version = "0.215.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff694f02a8d7a50b6922b197ae03883fbf18cdb2ae9fbee7b6148456f5f44041" checksum = "4fb56df3e06b8e6b77e37d2969a50ba51281029a9aeb3855e76b7f49b6418847"
dependencies = [ dependencies = [
"leb128", "leb128",
] ]
@ -6723,24 +6723,24 @@ dependencies = [
[[package]] [[package]]
name = "wast" name = "wast"
version = "214.0.0" version = "215.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "694bcdb24c49c8709bd8713768b71301a11e823923eee355d530f1d8d0a7f8e9" checksum = "1ff1d00d893593249e60720be04a7c1f42f1c4dc3806a2869f4e66ab61eb54cb"
dependencies = [ dependencies = [
"bumpalo", "bumpalo",
"leb128", "leb128",
"memchr", "memchr",
"unicode-width", "unicode-width",
"wasm-encoder 0.214.0", "wasm-encoder 0.215.0",
] ]
[[package]] [[package]]
name = "wat" name = "wat"
version = "1.214.0" version = "1.215.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "347249eb56773fa728df2656cfe3a8c19437ded61a922a0b5e0839d9790e278e" checksum = "670bf4d9c8cf76ae242d70ded47c546525b6dafaa6871f9bcb065344bf2b4e3d"
dependencies = [ dependencies = [
"wast 214.0.0", "wast 215.0.0",
] ]
[[package]] [[package]]
@ -7010,9 +7010,9 @@ dependencies = [
[[package]] [[package]]
name = "winnow" name = "winnow"
version = "0.6.16" version = "0.6.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b480ae9340fc261e6be3e95a1ba86d54ae3f9171132a73ce8d4bbaf68339507c" checksum = "68a9bda4691f099d435ad181000724da8e5899daa10713c2d432552b9ccd3a6f"
dependencies = [ dependencies = [
"memchr", "memchr",
] ]

View File

@ -576,7 +576,7 @@ fn handle_eth_sub_event(
event: eth::SubscriptionResult, event: eth::SubscriptionResult,
) -> Result<(), AppStoreLogError> { ) -> Result<(), AppStoreLogError> {
let eth::SubscriptionResult::Log(log) = event else { let eth::SubscriptionResult::Log(log) = event else {
return Err(AppStoreLogError::DecodeLogError); return Ok(()); // not a log event
}; };
state.ingest_contract_event(*log, true) state.ingest_contract_event(*log, true)
} }

View File

@ -1,6 +1,5 @@
use crate::{utils, DownloadRequest, LocalRequest}; use crate::{utils, DownloadRequest, LocalRequest};
use crate::{KIMAP_ADDRESS, VFS_TIMEOUT}; use crate::{KIMAP_ADDRESS, VFS_TIMEOUT};
use alloy_sol_types::SolEvent;
use kinode_process_lib::kernel_types::Erc721Metadata; use kinode_process_lib::kernel_types::Erc721Metadata;
use kinode_process_lib::{ use kinode_process_lib::{
eth, kernel_types as kt, kimap, println, vfs, Address, NodeId, PackageId, Request, eth, kernel_types as kt, kimap, println, vfs, Address, NodeId, PackageId, Request,
@ -17,7 +16,7 @@ use std::str::FromStr;
pub enum AppStoreLogError { pub enum AppStoreLogError {
NoBlockNumber, NoBlockNumber,
GetNameError, GetNameError,
DecodeLogError, DecodeLogError(kimap::DecodeLogError),
PackageHashMismatch, PackageHashMismatch,
InvalidPublisherName, InvalidPublisherName,
MetadataNotFound, MetadataNotFound,
@ -30,7 +29,7 @@ impl std::fmt::Display for AppStoreLogError {
match self { match self {
AppStoreLogError::NoBlockNumber => write!(f, "log with no block number"), AppStoreLogError::NoBlockNumber => write!(f, "log with no block number"),
AppStoreLogError::GetNameError => write!(f, "no corresponding name for namehash found"), AppStoreLogError::GetNameError => write!(f, "no corresponding name for namehash found"),
AppStoreLogError::DecodeLogError => write!(f, "error decoding log data"), AppStoreLogError::DecodeLogError(e) => write!(f, "error decoding log data: {e:?}"),
AppStoreLogError::PackageHashMismatch => write!(f, "mismatched package hash"), AppStoreLogError::PackageHashMismatch => write!(f, "mismatched package hash"),
AppStoreLogError::InvalidPublisherName => write!(f, "invalid publisher name"), AppStoreLogError::InvalidPublisherName => write!(f, "invalid publisher name"),
AppStoreLogError::MetadataNotFound => write!(f, "metadata not found"), AppStoreLogError::MetadataNotFound => write!(f, "metadata not found"),
@ -365,7 +364,7 @@ impl State {
let block_number: u64 = log.block_number.ok_or(AppStoreLogError::NoBlockNumber)?; let block_number: u64 = log.block_number.ok_or(AppStoreLogError::NoBlockNumber)?;
let note: kimap::Note = let note: kimap::Note =
kimap::decode_note_log(&log).ok_or(AppStoreLogError::DecodeLogError)?; kimap::decode_note_log(&log).map_err(AppStoreLogError::DecodeLogError)?;
let package_id = note let package_id = note
.parent_path .parent_path
@ -385,7 +384,8 @@ impl State {
// //
let metadata_uri = String::from_utf8_lossy(&note.data).to_string(); let metadata_uri = String::from_utf8_lossy(&note.data).to_string();
// generate ~metadata-hash notehash let (tba, metadata_hash) = if update_listings {
// generate ~metadata-hash full-path
let hash_note = format!("~metadata-hash.{}", note.parent_path); let hash_note = format!("~metadata-hash.{}", note.parent_path);
// owner can change which we don't track (yet?) so don't save, need to get when desired // owner can change which we don't track (yet?) so don't save, need to get when desired
@ -398,7 +398,10 @@ impl State {
return Err(AppStoreLogError::MetadataNotFound); return Err(AppStoreLogError::MetadataNotFound);
}; };
let metadata_hash = String::from_utf8_lossy(&hash_note).to_string(); (tba, String::from_utf8_lossy(&hash_note).to_string())
} else {
(eth::Address::ZERO, "".to_string())
};
// fetch metadata from the URI (currently only handling HTTP(S) URLs!) // fetch metadata from the URI (currently only handling HTTP(S) URLs!)
// assert that the metadata hash matches the fetched data // assert that the metadata hash matches the fetched data
@ -415,10 +418,10 @@ impl State {
match self.packages.entry(package_id) { match self.packages.entry(package_id) {
std::collections::hash_map::Entry::Occupied(mut listing) => { std::collections::hash_map::Entry::Occupied(mut listing) => {
let listing = listing.get_mut(); let listing = listing.get_mut();
listing.tba = tba;
listing.metadata_uri = metadata_uri; listing.metadata_uri = metadata_uri;
listing.metadata_hash = metadata_hash;
if update_listings { if update_listings {
listing.tba = tba;
listing.metadata_hash = metadata_hash;
listing.metadata = metadata; listing.metadata = metadata;
} }
} }
@ -444,14 +447,41 @@ impl State {
/// of stale metadata. /// of stale metadata.
pub fn update_listings(&mut self) { pub fn update_listings(&mut self) {
for (package_id, listing) in self.packages.iter_mut() { for (package_id, listing) in self.packages.iter_mut() {
// if publisher is `sys`, we can skip
if package_id.publisher() == "sys" {
continue;
}
// generate ~metadata-hash full-path
let hash_note = format!(
"~metadata-hash.{}",
package_id.to_string().replace(":", ".")
);
// owner can change which we don't track (yet?) so don't save, need to get when desired
let Ok((tba, _owner, data)) = self.kimap.get(&hash_note) else {
println!("Couldn't find {hash_note}");
continue;
};
let Some(hash_note) = data else {
println!("No data for {hash_note}");
continue;
};
let metadata_hash = String::from_utf8_lossy(&hash_note).to_string();
if let Ok(metadata) = if let Ok(metadata) =
utils::fetch_metadata_from_url(&listing.metadata_uri, &listing.metadata_hash, 30) utils::fetch_metadata_from_url(&listing.metadata_uri, &metadata_hash, 30)
{ {
if let Some(package_state) = &listing.state { if let Some(package_state) = &listing.state {
auto_update(&self.our, package_id, &metadata, package_state); auto_update(&self.our, package_id, &metadata, package_state);
} }
listing.metadata = Some(metadata); listing.metadata = Some(metadata);
} }
listing.tba = tba;
listing.metadata_hash = metadata_hash;
} }
// kinode_process_lib::set_state(&serde_json::to_vec(self).unwrap()); // kinode_process_lib::set_state(&serde_json::to_vec(self).unwrap());
} }

View File

@ -96,16 +96,17 @@ pub fn app_store_filter(state: &State) -> eth::Filter {
pub fn fetch_and_subscribe_logs(state: &mut State) { pub fn fetch_and_subscribe_logs(state: &mut State) {
let filter = app_store_filter(state); let filter = app_store_filter(state);
// get past logs, subscribe to new ones. // get past logs, subscribe to new ones.
// subscribe first so we don't miss any logs
state.kimap.provider.subscribe_loop(1, filter.clone());
for log in fetch_logs( for log in fetch_logs(
&state.kimap.provider, &state.kimap.provider,
&filter.clone().from_block(state.last_saved_block), &filter.from_block(state.last_saved_block),
) { ) {
if let Err(e) = state.ingest_contract_event(log, false) { if let Err(e) = state.ingest_contract_event(log, false) {
print_to_terminal(1, &format!("error ingesting log: {e}")); print_to_terminal(1, &format!("error ingesting log: {e}"));
}; };
} }
state.update_listings(); state.update_listings();
state.kimap.provider.subscribe_loop(1, filter);
} }
/// fetch logs from the chain with a given filter /// fetch logs from the chain with a given filter

View File

@ -110,22 +110,18 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
), ),
); );
// subscribe to logs first, so no logs are missed
println!("subscribing to new logs...");
eth_provider.subscribe_loop(1, mints_filter.clone());
eth_provider.subscribe_loop(2, notes_filter.clone());
listen_to_new_blocks(); // sub_id: 3
// if block in state is < current_block, get logs from that part. // if block in state is < current_block, get logs from that part.
println!("syncing old logs..."); println!("syncing old logs...");
fetch_and_process_logs(&eth_provider, &our, &mut state, mints_filter.clone()); fetch_and_process_logs(&eth_provider, &our, &mut state, mints_filter.clone());
fetch_and_process_logs(&eth_provider, &our, &mut state, notes_filter.clone()); fetch_and_process_logs(&eth_provider, &our, &mut state, notes_filter.clone());
println!("done syncing old logs."); println!("done syncing old logs.");
let current_block = eth_provider.get_block_number().unwrap();
println!("current block: {}", current_block);
state.last_block = current_block;
println!("subscribing to new logs...");
subscribe_to_logs(&eth_provider, mints_filter.clone(), 1);
subscribe_to_logs(&eth_provider, notes_filter.clone(), 2);
listen_to_new_blocks(); // sub_id: 3
println!("subscribed to logs successfully");
let mut pending_requests: BTreeMap<u64, Vec<IndexerRequests>> = BTreeMap::new(); let mut pending_requests: BTreeMap<u64, Vec<IndexerRequests>> = BTreeMap::new();
loop { loop {
@ -227,9 +223,9 @@ fn handle_eth_message(
Ok(Err(e)) => { Ok(Err(e)) => {
println!("got eth subscription error ({e:?}), resubscribing"); println!("got eth subscription error ({e:?}), resubscribing");
if e.id == 1 { if e.id == 1 {
subscribe_to_logs(&eth_provider, mints_filter.clone(), 1); eth_provider.subscribe_loop(1, mints_filter.clone());
} else if e.id == 2 { } else if e.id == 2 {
subscribe_to_logs(&eth_provider, notes_filter.clone(), 2); eth_provider.subscribe_loop(2, notes_filter.clone());
} else if e.id == 3 { } else if e.id == 3 {
listen_to_new_blocks(); listen_to_new_blocks();
} }
@ -527,16 +523,3 @@ fn listen_to_new_blocks() {
.send() .send()
.unwrap(); .unwrap();
} }
fn subscribe_to_logs(eth_provider: &eth::Provider, filter: eth::Filter, sub_id: u64) {
loop {
match eth_provider.subscribe(sub_id, filter.clone()) {
Ok(()) => break,
Err(_) => {
println!("failed to subscribe to chain! trying again in 5s...");
std::thread::sleep(std::time::Duration::from_secs(5));
continue;
}
}
}
}

View File

@ -97,7 +97,6 @@ pub async fn vfs(
// Clone Arcs for the new task // Clone Arcs for the new task
let our_node = our_node.clone(); let our_node = our_node.clone();
let send_to_loop = send_to_loop.clone(); let send_to_loop = send_to_loop.clone();
let send_to_terminal = send_to_terminal.clone();
let send_to_caps_oracle = send_to_caps_oracle.clone(); let send_to_caps_oracle = send_to_caps_oracle.clone();
let open_files = open_files.clone(); let open_files = open_files.clone();
let vfs_path = vfs_path.clone(); let vfs_path = vfs_path.clone();
@ -118,9 +117,6 @@ pub async fn vfs(
) )
.await .await
{ {
Printout::new(1, format!("vfs: {e}"))
.send(&send_to_terminal)
.await;
KernelMessage::builder() KernelMessage::builder()
.id(km_id) .id(km_id)
.source((our_node.as_str(), VFS_PROCESS_ID.clone())) .source((our_node.as_str(), VFS_PROCESS_ID.clone()))