update kimap first block, separate subscriptions in kns

This commit is contained in:
bitful-pannul 2024-07-15 13:53:51 +02:00
parent 5ab71c4688
commit 13a6cf5aaa
3 changed files with 269 additions and 246 deletions

420
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -56,7 +56,7 @@ const KIMAP_ADDRESS: &str = "0x7290Aa297818d0b9660B2871Bb87f85a3f9B4559"; // opt
const KIMAP_ADDRESS: &str = "0x0165878A594ca255338adfa4d48449f69242Eb8F"; // note temp kimap address!
#[cfg(not(feature = "simulation-mode"))]
const KIMAP_FIRST_BLOCK: u64 = 118_590_088;
const KIMAP_FIRST_BLOCK: u64 = 122_295_937;
#[cfg(feature = "simulation-mode")]
const KIMAP_FIRST_BLOCK: u64 = 1;

View File

@ -33,7 +33,7 @@ const CHAIN_ID: u64 = 10; // optimism
const CHAIN_ID: u64 = 31337; // local
#[cfg(not(feature = "simulation-mode"))]
const KIMAP_FIRST_BLOCK: u64 = 114_923_786; // optimism, adjust
const KIMAP_FIRST_BLOCK: u64 = 114_923_786; // optimism
#[cfg(feature = "simulation-mode")]
const KIMAP_FIRST_BLOCK: u64 = 1; // local
@ -90,7 +90,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
#[cfg(feature = "simulation-mode")]
add_temp_hardcoded_tlzs(&mut state);
let _notes = vec![
let notes = vec![
keccak256("~net-key"),
keccak256("~ws-port"),
keccak256("~routers"),
@ -98,17 +98,20 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
keccak256("~ip"),
];
let filter = eth::Filter::new()
// sub_id: 1
let mints_filter = eth::Filter::new()
.address(state.contract_address.parse::<eth::Address>().unwrap())
.from_block(state.block - 1)
.to_block(eth::BlockNumberOrTag::Latest)
.events(vec![
"Mint(bytes32,bytes32,bytes,bytes)",
"Note(bytes32,bytes32,bytes,bytes,bytes)",
]);
// .topic3(_notes);
// TODO: potentially remove labelhash from Mint event, then we can filter Notes while getting all Mint events?
// do this with 2 subscriptions, for now, get all Note events.
.event("Mint(bytes32,bytes32,bytes,bytes)");
// sub_id: 2
let notes_filter = eth::Filter::new()
.address(state.contract_address.parse::<eth::Address>().unwrap())
.from_block(state.block - 1)
.to_block(eth::BlockNumberOrTag::Latest)
.event("Note(bytes32,bytes32,bytes,bytes,bytes)")
.topic3(notes);
// 60s timeout -- these calls can take a long time
// if they do time out, we try them again
@ -123,27 +126,13 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
),
);
subscribe_to_logs(&eth_provider, state.block - 1, filter.clone());
subscribe_to_logs(&eth_provider, state.block - 1, mints_filter.clone(), 1);
subscribe_to_logs(&eth_provider, state.block - 1, notes_filter.clone(), 2);
println!("subscribed to logs successfully");
// if block in state is < current_block, get logs from that part.
loop {
match eth_provider.get_logs(&filter) {
Ok(logs) => {
for log in logs {
if let Err(e) = handle_log(&our, &mut state, &log, &eth_provider) {
// print errors at verbosity=1
print_to_terminal(1, &format!("log-handling error! {e:?}"));
}
}
break;
}
Err(e) => {
println!("got eth error while fetching logs: {e:?}, trying again in 5s...");
std::thread::sleep(std::time::Duration::from_secs(5));
continue;
}
}
}
fetch_and_process_logs(&eth_provider, &our, &mut state, &mints_filter);
fetch_and_process_logs(&eth_provider, &our, &mut state, &notes_filter);
let mut pending_requests: BTreeMap<u64, Vec<IndexerRequests>> = BTreeMap::new();
@ -164,7 +153,8 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
&eth_provider,
&mut pending_requests,
&body,
&filter,
&mints_filter,
&notes_filter,
)?;
} else {
let request = serde_json::from_slice(&body)?;
@ -219,7 +209,8 @@ fn handle_eth_message(
eth_provider: &eth::Provider,
pending_requests: &mut BTreeMap<u64, Vec<IndexerRequests>>,
body: &[u8],
filter: &eth::Filter,
mints_filter: &eth::Filter,
notes_filter: &eth::Filter,
) -> anyhow::Result<()> {
let Ok(eth_result) = serde_json::from_slice::<eth::EthSubResult>(body) else {
return Err(anyhow::anyhow!("got invalid message"));
@ -236,7 +227,11 @@ fn handle_eth_message(
}
Err(e) => {
println!("got eth subscription error ({e:?}), resubscribing");
subscribe_to_logs(&eth_provider, state.block - 1, filter.clone());
if e.id == 1 {
subscribe_to_logs(&eth_provider, state.block - 1, mints_filter.clone(), 1);
} else if e.id == 2 {
subscribe_to_logs(&eth_provider, state.block - 1, notes_filter.clone(), 2);
}
}
}
@ -418,6 +413,30 @@ fn handle_log(
// helpers
fn fetch_and_process_logs(
eth_provider: &eth::Provider,
our: &Address,
state: &mut State,
filter: &eth::Filter,
) {
loop {
match eth_provider.get_logs(filter) {
Ok(logs) => {
for log in logs {
if let Err(e) = handle_log(our, state, &log, eth_provider) {
print_to_terminal(1, &format!("log-handling error! {e:?}"));
}
}
return ();
}
Err(e) => {
println!("got eth error while fetching logs: {e:?}, trying again in 5s...");
std::thread::sleep(std::time::Duration::from_secs(5));
}
}
}
}
fn get_node_name(state: &mut State, parent_hash: &str) -> String {
let mut current_hash = parent_hash;
let mut components = Vec::new(); // Collect components in a vector
@ -511,9 +530,14 @@ pub fn bytes_to_port(bytes: &[u8]) -> anyhow::Result<u16> {
}
}
fn subscribe_to_logs(eth_provider: &eth::Provider, from_block: u64, filter: eth::Filter) {
fn subscribe_to_logs(
eth_provider: &eth::Provider,
from_block: u64,
filter: eth::Filter,
sub_id: u64,
) {
loop {
match eth_provider.subscribe(1, filter.clone().from_block(from_block)) {
match eth_provider.subscribe(sub_id, filter.clone().from_block(from_block)) {
Ok(()) => break,
Err(_) => {
println!("failed to subscribe to chain! trying again in 5s...");
@ -522,5 +546,4 @@ fn subscribe_to_logs(eth_provider: &eth::Provider, from_block: u64, filter: eth:
}
}
}
println!("subscribed to logs successfully");
}