kns & app_store: fix getname race

This commit is contained in:
bitful-pannul 2024-07-30 16:57:49 +03:00
parent 2e6091e205
commit 746bc5f5f7
2 changed files with 44 additions and 10 deletions

View File

@ -7,8 +7,8 @@ use {
alloy_primitives::keccak256, alloy_primitives::keccak256,
alloy_sol_types::SolEvent, alloy_sol_types::SolEvent,
kinode_process_lib::{ kinode_process_lib::{
eth, get_blob, get_state, http, kernel_types as kt, kimap, println, vfs, Address, eth, get_blob, get_state, http, kernel_types as kt, kimap, print_to_terminal, println, vfs,
LazyLoadBlob, PackageId, ProcessId, Request, Address, LazyLoadBlob, PackageId, ProcessId, Request,
}, },
std::collections::HashSet, std::collections::HashSet,
}; };
@ -101,7 +101,7 @@ pub fn fetch_and_subscribe_logs(state: &mut State) {
&filter.clone().from_block(state.last_saved_block), &filter.clone().from_block(state.last_saved_block),
) { ) {
if let Err(e) = state.ingest_contract_event(log, false) { if let Err(e) = state.ingest_contract_event(log, false) {
println!("error ingesting log: {e:?}"); print_to_terminal(1, &format!("error ingesting log: {e}"));
}; };
} }
state.update_listings(); state.update_listings();

View File

@ -4,7 +4,7 @@ use crate::kinode::process::kns_indexer::{
use alloy_sol_types::SolEvent; use alloy_sol_types::SolEvent;
use kinode_process_lib::{ use kinode_process_lib::{
await_message, call_init, eth, kimap, net, print_to_terminal, println, Address, Message, await_message, call_init, eth, kimap, net, print_to_terminal, println, Address, Message,
Request, Response, ProcessId, Request, Response,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
@ -110,13 +110,21 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
), ),
); );
subscribe_to_logs(&eth_provider, mints_filter.clone(), 1);
subscribe_to_logs(&eth_provider, notes_filter.clone(), 2);
println!("subscribed to logs successfully");
// if block in state is < current_block, get logs from that part. // if block in state is < current_block, get logs from that part.
println!("syncing old logs...");
fetch_and_process_logs(&eth_provider, &our, &mut state, mints_filter.clone()); fetch_and_process_logs(&eth_provider, &our, &mut state, mints_filter.clone());
fetch_and_process_logs(&eth_provider, &our, &mut state, notes_filter.clone()); fetch_and_process_logs(&eth_provider, &our, &mut state, notes_filter.clone());
println!("done syncing old logs.");
let current_block = eth_provider.get_block_number().unwrap();
println!("current block: {}", current_block);
state.last_block = current_block;
println!("subscribing to new logs...");
subscribe_to_logs(&eth_provider, mints_filter.clone(), 1);
subscribe_to_logs(&eth_provider, notes_filter.clone(), 2);
listen_to_new_blocks(&our); // sub_id: 3
println!("subscribed to logs successfully");
let mut pending_requests: BTreeMap<u64, Vec<IndexerRequests>> = BTreeMap::new(); let mut pending_requests: BTreeMap<u64, Vec<IndexerRequests>> = BTreeMap::new();
@ -209,6 +217,11 @@ fn handle_eth_message(
// print errors at verbosity=1 // print errors at verbosity=1
print_to_terminal(1, &format!("log-handling error! {e:?}")); print_to_terminal(1, &format!("log-handling error! {e:?}"));
} }
} else if let eth::SubscriptionResult::Header(header) = result {
if let Some(block) = header.number {
// risque..
state.last_block = block;
}
} }
} }
Ok(Err(e)) => { Ok(Err(e)) => {
@ -217,6 +230,8 @@ fn handle_eth_message(
subscribe_to_logs(&eth_provider, mints_filter.clone(), 1); subscribe_to_logs(&eth_provider, mints_filter.clone(), 1);
} else if e.id == 2 { } else if e.id == 2 {
subscribe_to_logs(&eth_provider, notes_filter.clone(), 2); subscribe_to_logs(&eth_provider, notes_filter.clone(), 2);
} else if e.id == 3 {
listen_to_new_blocks(&our);
} }
} }
Err(e) => { Err(e) => {
@ -247,13 +262,17 @@ fn handle_pending_requests(
match request { match request {
IndexerRequests::NamehashToName(NamehashToNameRequest { hash, .. }) => { IndexerRequests::NamehashToName(NamehashToNameRequest { hash, .. }) => {
Response::new() Response::new()
.body(serde_json::to_vec(&state.names.get(hash))?) .body(serde_json::to_vec(&IndexerResponses::Name(
state.names.get(hash).cloned(),
))?)
.send() .send()
.unwrap(); .unwrap();
} }
IndexerRequests::NodeInfo(NodeInfoRequest { name, .. }) => { IndexerRequests::NodeInfo(NodeInfoRequest { name, .. }) => {
Response::new() Response::new()
.body(serde_json::to_vec(&state.nodes.get(name))?) .body(serde_json::to_vec(&IndexerResponses::NodeInfo(
state.nodes.get(name).cloned(),
))?)
.send() .send()
.unwrap(); .unwrap();
} }
@ -491,6 +510,21 @@ pub fn bytes_to_port(bytes: &[u8]) -> anyhow::Result<u16> {
} }
} }
fn listen_to_new_blocks(our: &Address) {
let eth_newheads_sub = eth::EthAction::SubscribeLogs {
sub_id: 3,
chain_id: CHAIN_ID,
kind: eth::SubscriptionKind::NewHeads,
params: eth::Params::Bool(false),
};
let our_eth = Address::new(our.node(), ProcessId::new(Some("eth"), "distro", "sys"));
Request::new()
.body(serde_json::to_vec(&eth_newheads_sub).unwrap())
.target(our_eth)
.send();
}
fn subscribe_to_logs(eth_provider: &eth::Provider, filter: eth::Filter, sub_id: u64) { fn subscribe_to_logs(eth_provider: &eth::Provider, filter: eth::Filter, sub_id: u64) {
loop { loop {
match eth_provider.subscribe(sub_id, filter.clone()) { match eth_provider.subscribe(sub_id, filter.clone()) {