diff --git a/kinode/packages/app_store/app_store/src/utils.rs b/kinode/packages/app_store/app_store/src/utils.rs index 1222930a..bb1904a9 100644 --- a/kinode/packages/app_store/app_store/src/utils.rs +++ b/kinode/packages/app_store/app_store/src/utils.rs @@ -7,8 +7,8 @@ use { alloy_primitives::keccak256, alloy_sol_types::SolEvent, kinode_process_lib::{ - eth, get_blob, get_state, http, kernel_types as kt, kimap, println, vfs, Address, - LazyLoadBlob, PackageId, ProcessId, Request, + eth, get_blob, get_state, http, kernel_types as kt, kimap, print_to_terminal, println, vfs, + Address, LazyLoadBlob, PackageId, ProcessId, Request, }, std::collections::HashSet, }; @@ -101,7 +101,7 @@ pub fn fetch_and_subscribe_logs(state: &mut State) { &filter.clone().from_block(state.last_saved_block), ) { if let Err(e) = state.ingest_contract_event(log, false) { - println!("error ingesting log: {e:?}"); + print_to_terminal(1, &format!("error ingesting log: {e}")); }; } state.update_listings(); diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 74c3d55d..d834aff9 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -4,7 +4,7 @@ use crate::kinode::process::kns_indexer::{ use alloy_sol_types::SolEvent; use kinode_process_lib::{ await_message, call_init, eth, kimap, net, print_to_terminal, println, Address, Message, - Request, Response, + ProcessId, Request, Response, }; use serde::{Deserialize, Serialize}; use std::{ @@ -110,13 +110,21 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { ), ); - subscribe_to_logs(ð_provider, mints_filter.clone(), 1); - subscribe_to_logs(ð_provider, notes_filter.clone(), 2); - println!("subscribed to logs successfully"); - // if block in state is < current_block, get logs from that part. + println!("syncing old logs..."); fetch_and_process_logs(ð_provider, &our, &mut state, mints_filter.clone()); fetch_and_process_logs(ð_provider, &our, &mut state, notes_filter.clone()); + println!("done syncing old logs."); + + let current_block = eth_provider.get_block_number().unwrap(); + println!("current block: {}", current_block); + state.last_block = current_block; + + println!("subscribing to new logs..."); + subscribe_to_logs(ð_provider, mints_filter.clone(), 1); + subscribe_to_logs(ð_provider, notes_filter.clone(), 2); + listen_to_new_blocks(&our); // sub_id: 3 + println!("subscribed to logs successfully"); let mut pending_requests: BTreeMap> = BTreeMap::new(); @@ -209,6 +217,11 @@ fn handle_eth_message( // print errors at verbosity=1 print_to_terminal(1, &format!("log-handling error! {e:?}")); } + } else if let eth::SubscriptionResult::Header(header) = result { + if let Some(block) = header.number { + // risque.. + state.last_block = block; + } } } Ok(Err(e)) => { @@ -217,6 +230,8 @@ fn handle_eth_message( subscribe_to_logs(ð_provider, mints_filter.clone(), 1); } else if e.id == 2 { subscribe_to_logs(ð_provider, notes_filter.clone(), 2); + } else if e.id == 3 { + listen_to_new_blocks(&our); } } Err(e) => { @@ -247,13 +262,17 @@ fn handle_pending_requests( match request { IndexerRequests::NamehashToName(NamehashToNameRequest { hash, .. }) => { Response::new() - .body(serde_json::to_vec(&state.names.get(hash))?) + .body(serde_json::to_vec(&IndexerResponses::Name( + state.names.get(hash).cloned(), + ))?) .send() .unwrap(); } IndexerRequests::NodeInfo(NodeInfoRequest { name, .. }) => { Response::new() - .body(serde_json::to_vec(&state.nodes.get(name))?) + .body(serde_json::to_vec(&IndexerResponses::NodeInfo( + state.nodes.get(name).cloned(), + ))?) .send() .unwrap(); } @@ -491,6 +510,21 @@ pub fn bytes_to_port(bytes: &[u8]) -> anyhow::Result { } } +fn listen_to_new_blocks(our: &Address) { + let eth_newheads_sub = eth::EthAction::SubscribeLogs { + sub_id: 3, + chain_id: CHAIN_ID, + kind: eth::SubscriptionKind::NewHeads, + params: eth::Params::Bool(false), + }; + let our_eth = Address::new(our.node(), ProcessId::new(Some("eth"), "distro", "sys")); + + Request::new() + .body(serde_json::to_vec(ð_newheads_sub).unwrap()) + .target(our_eth) + .send(); +} + fn subscribe_to_logs(eth_provider: ð::Provider, filter: eth::Filter, sub_id: u64) { loop { match eth_provider.subscribe(sub_id, filter.clone()) {