doria's nits

This commit is contained in:
dr-frmr 2024-12-19 14:25:54 -05:00
parent 6da51a771d
commit e54665745e
No known key found for this signature in database

View File

@ -9,7 +9,6 @@ use kinode_process_lib::{
kv::{self, Kv},
net, print_to_terminal, println, timer, Address, Capability, Message, Request, Response,
};
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeMap,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
@ -44,64 +43,66 @@ const MAX_PENDING_ATTEMPTS: u8 = 3;
const SUBSCRIPTION_TIMEOUT: u64 = 60;
const DELAY_MS: u64 = 1_000; // 1s
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
struct State {
// version of the state in kv
/// version of the state in kv
version: u32,
// last block we have an update from
/// last block we have an update from
last_block: u64,
// kv handle
// includes keys and values for:
// "meta:chain_id", "meta:version", "meta:last_block", "meta:contract_address",
// "names:{namehash}" -> "{name}", "nodes:{name}" -> "{node_info}"
/// kv handle
/// includes keys and values for:
/// "meta:chain_id", "meta:version", "meta:last_block", "meta:contract_address",
/// "names:{namehash}" -> "{name}", "nodes:{name}" -> "{node_info}"
kv: Kv<String, Vec<u8>>,
}
impl State {
fn load(our: &Address) -> Self {
fn new(our: &Address) -> Self {
let kv: Kv<String, Vec<u8>> = match kv::open(our.package_id(), "kns_indexer", Some(10)) {
Ok(kv) => kv,
Err(e) => panic!("fatal: error opening kns_indexer key_value database: {e:?}"),
};
let mut state = Self {
Self {
version: CURRENT_VERSION,
last_block: KIMAP_FIRST_BLOCK,
kv,
version: 0,
last_block: 0,
};
}
}
/// Loads the state from kv, and updates it with the current block number and version.
/// The result of this function will be that the constants for chain ID and contract address
/// are always matching the values in the kv.
fn load(our: &Address) -> Self {
let mut state = Self::new(our);
let desired_contract_address = eth::Address::from_str(KIMAP_ADDRESS).unwrap();
let version = state.get_version();
let chain_id = state.get_chain_id();
let contract_address = state.get_contract_address();
let last_block = state.get_last_block();
if version != CURRENT_VERSION
|| chain_id != CHAIN_ID
|| contract_address != eth::Address::from_str(KIMAP_ADDRESS).unwrap()
if version != Some(CURRENT_VERSION)
|| chain_id != Some(CHAIN_ID)
|| contract_address != Some(desired_contract_address)
{
// if version/contract/chain_id are new, run migrations here.
state.set_version(CURRENT_VERSION);
state.set_chain_id(CHAIN_ID);
state.set_contract_address(desired_contract_address);
}
state.set_chain_id(chain_id);
state.set_contract_address(contract_address);
state.set_version(CURRENT_VERSION);
// update state struct with final values
state.version = version;
state.last_block = last_block;
state.last_block = last_block.unwrap_or(state.last_block);
println!(
"\n 🐦‍⬛ KNS Indexer State\n\
\n\
Version {}\n\
Last Block {}\n\
Chain ID {}\n\
KIMAP {}\n\
\n",
state.version,
state.last_block,
chain_id,
contract_address.to_string(),
"\n 🐦‍⬛ KNS Indexer State\n\
\n\
Version {}\n\
Chain ID {}\n\
Last Block {}\n\
KIMAP {}\n\
\n",
state.version, state.last_block, CHAIN_ID, desired_contract_address,
);
state
@ -139,11 +140,8 @@ impl State {
format!("node:{}", name)
}
fn get_last_block(&self) -> u64 {
self.kv
.get_as::<u64>(&Self::meta_last_block_key())
.ok()
.unwrap_or(KIMAP_FIRST_BLOCK)
fn get_last_block(&self) -> Option<u64> {
self.kv.get_as::<u64>(&Self::meta_last_block_key()).ok()
}
fn set_last_block(&mut self, block: u64) {
@ -153,11 +151,8 @@ impl State {
self.last_block = block;
}
fn get_version(&self) -> u32 {
self.kv
.get_as::<u32>(&Self::meta_version_key())
.ok()
.unwrap_or(CURRENT_VERSION)
fn get_version(&self) -> Option<u32> {
self.kv.get_as::<u32>(&Self::meta_version_key()).ok()
}
fn set_version(&mut self, version: u32) {
@ -190,11 +185,8 @@ impl State {
.unwrap();
}
fn get_chain_id(&self) -> u64 {
self.kv
.get_as::<u64>(&Self::meta_chain_id_key())
.ok()
.unwrap_or(CHAIN_ID)
fn get_chain_id(&self) -> Option<u64> {
self.kv.get_as::<u64>(&Self::meta_chain_id_key()).ok()
}
fn set_chain_id(&mut self, chain_id: u64) {
@ -203,15 +195,10 @@ impl State {
.unwrap();
}
fn get_contract_address(&self) -> eth::Address {
match self
.kv
fn get_contract_address(&self) -> Option<eth::Address> {
self.kv
.get_as::<eth::Address>(&Self::meta_contract_address_key())
{
Ok(addr) => addr,
Err(_) => eth::Address::from_str(KIMAP_ADDRESS)
.expect("Failed to parse KIMAP_ADDRESS constant"),
}
.ok()
}
fn set_contract_address(&mut self, contract_address: eth::Address) {
@ -269,42 +256,40 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
#[cfg(feature = "simulation-mode")]
add_temp_hardcoded_tlzs(&mut state);
let chain_id = state.get_chain_id();
let kimap_address = state.get_contract_address();
let last_block = state.get_last_block();
let chain_id = CHAIN_ID;
let kimap_address = eth::Address::from_str(KIMAP_ADDRESS).unwrap();
// sub_id: 1
// listen to all mint events in kimap
let mints_filter = eth::Filter::new()
.address(kimap_address)
.from_block(last_block)
.from_block(state.last_block)
.to_block(eth::BlockNumberOrTag::Latest)
.event("Mint(bytes32,bytes32,bytes,bytes)");
let notes = vec![
keccak256("~ws-port"),
keccak256("~tcp-port"),
keccak256("~net-key"),
keccak256("~routers"),
keccak256("~ip"),
];
// sub_id: 2
// listen to all note events that are relevant to the KNS protocol within kimap
let notes_filter = eth::Filter::new()
.address(kimap_address)
.from_block(last_block)
.from_block(state.last_block)
.to_block(eth::BlockNumberOrTag::Latest)
.event("Note(bytes32,bytes32,bytes,bytes,bytes)")
.topic3(notes);
.topic3(vec![
keccak256("~ws-port"),
keccak256("~tcp-port"),
keccak256("~net-key"),
keccak256("~routers"),
keccak256("~ip"),
]);
// 60s timeout -- these calls can take a long time
// if they do time out, we try them again
let eth_provider: eth::Provider = eth::Provider::new(chain_id, SUBSCRIPTION_TIMEOUT);
// let _kimap_helper = kimap::Kimap::new(eth_provider.clone(), kimap_address);
// subscribe to logs first, so no logs are missed
eth_provider.subscribe_loop(1, mints_filter.clone());
eth_provider.subscribe_loop(2, notes_filter.clone());
println!("done subscribing to new logs.");
// if subscription results come back in the wrong order, we store them here
// until the right block is reached.
@ -314,7 +299,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
let mut pending_notes: BTreeMap<u64, Vec<(kimap::contract::Note, u8)>> = BTreeMap::new();
// if block in state is < current_block, get logs from that part.
println!("syncing old logs from block: {}", last_block);
println!("syncing old logs from block: {}", state.last_block);
fetch_and_process_logs(
&eth_provider,
&mut state,
@ -327,6 +312,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
notes_filter.clone(),
&mut pending_notes,
);
// set a timer tick so any pending logs will be processed
timer::set_timer(DELAY_MS, None);
println!("done syncing old logs.");
@ -335,6 +321,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
let Ok(message) = await_message() else {
continue;
};
// if true, time to go check current block number and handle pending notes.
let tick = message.is_local(&our) && message.source().process == "timer:distro:sys";
let Message::Request {
@ -358,7 +345,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
continue;
};
if source.process == "eth:distro:sys" {
if source.node() == our.node() && source.process == "eth:distro:sys" {
handle_eth_message(
&mut state,
&eth_provider,
@ -369,9 +356,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
&notes_filter,
)?;
} else {
let request = serde_json::from_slice(&body)?;
match request {
match serde_json::from_slice(&body)? {
IndexerRequest::NamehashToName(NamehashToNameRequest { ref hash, .. }) => {
// TODO: make sure we've seen the whole block, while actually
// sending a response to the proper place.
@ -445,6 +430,7 @@ fn handle_eth_message(
}
_ => {}
}
if tick {
let block_number = eth_provider.get_block_number();
if let Ok(block_number) = block_number {
@ -638,8 +624,7 @@ fn handle_log(
Ok(())
}
// helpers
/// Get logs for a filter then process them while taking pending notes into account.
fn fetch_and_process_logs(
eth_provider: &eth::Provider,
state: &mut State,
@ -680,7 +665,8 @@ fn add_temp_hardcoded_tlzs(state: &mut State) {
);
}
/// Decodes bytes into an array of keccak256 hashes (32 bytes each) and returns their full names.
/// Decodes bytes under ~routers in kimap into an array of keccak256 hashes (32 bytes each)
/// and returns the associated node identities.
fn decode_routers(data: &[u8], state: &State) -> Vec<String> {
if data.len() % 32 != 0 {
print_to_terminal(
@ -706,6 +692,7 @@ fn decode_routers(data: &[u8], state: &State) -> Vec<String> {
routers
}
/// convert IP address stored at ~ip in kimap to IpAddr
pub fn bytes_to_ip(bytes: &[u8]) -> anyhow::Result<IpAddr> {
match bytes.len() {
4 => {
@ -722,6 +709,7 @@ pub fn bytes_to_ip(bytes: &[u8]) -> anyhow::Result<IpAddr> {
}
}
/// convert port stored at ~[protocol]-port in kimap to u16
pub fn bytes_to_port(bytes: &[u8]) -> anyhow::Result<u16> {
match bytes.len() {
2 => Ok(u16::from_be_bytes([bytes[0], bytes[1]])),