kns: change to checkpointing state save

This commit is contained in:
bitful-pannul 2024-12-20 17:15:11 +02:00
parent db9717d249
commit 2611d53e85
4 changed files with 291 additions and 217 deletions

43
Cargo.lock generated
View File

@ -1486,6 +1486,15 @@ dependencies = [
"rustc_version 0.4.1",
]
[[package]]
name = "atomic-polyfill"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cf2bce30dfe09ef0bfaef228b9d414faaf7e563035494d7fe092dba54b300f4"
dependencies = [
"critical-section",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
@ -2317,6 +2326,12 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "critical-section"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b"
[[package]]
name = "crossbeam-channel"
version = "0.5.14"
@ -3322,6 +3337,15 @@ dependencies = [
"tracing",
]
[[package]]
name = "hash32"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67"
dependencies = [
"byteorder",
]
[[package]]
name = "hashbrown"
version = "0.13.2"
@ -3383,6 +3407,20 @@ dependencies = [
"http 0.2.12",
]
[[package]]
name = "heapless"
version = "0.7.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdc6457c0eb62c71aac4bc17216026d8410337c4126773b9c5daba343f17964f"
dependencies = [
"atomic-polyfill",
"hash32",
"rustc_version 0.4.1",
"serde",
"spin",
"stable_deref_trait",
]
[[package]]
name = "heck"
version = "0.4.1"
@ -4257,6 +4295,7 @@ dependencies = [
"anyhow",
"hex",
"kinode_process_lib 0.10.0",
"postcard",
"process_macros",
"rmp-serde",
"serde",
@ -5169,6 +5208,7 @@ dependencies = [
"cobs",
"embedded-io 0.4.0",
"embedded-io 0.6.1",
"heapless",
"serde",
]
@ -6363,6 +6403,9 @@ name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]]
name = "spki"

View File

@ -714,6 +714,15 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "atomic-polyfill"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cf2bce30dfe09ef0bfaef228b9d414faaf7e563035494d7fe092dba54b300f4"
dependencies = [
"critical-section",
]
[[package]]
name = "auto_impl"
version = "1.2.0"
@ -884,6 +893,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "cobs"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15"
[[package]]
name = "const-hex"
version = "1.14.0"
@ -928,6 +943,12 @@ dependencies = [
"libc",
]
[[package]]
name = "critical-section"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b"
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
@ -1353,6 +1374,15 @@ dependencies = [
"subtle",
]
[[package]]
name = "hash32"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67"
dependencies = [
"byteorder",
]
[[package]]
name = "hashbrown"
version = "0.13.2"
@ -1380,6 +1410,20 @@ dependencies = [
"serde",
]
[[package]]
name = "heapless"
version = "0.7.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdc6457c0eb62c71aac4bc17216026d8410337c4126773b9c5daba343f17964f"
dependencies = [
"atomic-polyfill",
"hash32",
"rustc_version 0.4.1",
"serde",
"spin",
"stable_deref_trait",
]
[[package]]
name = "heck"
version = "0.5.0"
@ -1789,6 +1833,7 @@ dependencies = [
"anyhow",
"hex",
"kinode_process_lib",
"postcard",
"process_macros",
"rmp-serde",
"serde",
@ -2179,6 +2224,17 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2"
[[package]]
name = "postcard"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "170a2601f67cc9dba8edd8c4870b15f71a6a2dc196daec8c83f72b59dff628a8"
dependencies = [
"cobs",
"heapless",
"serde",
]
[[package]]
name = "ppv-lite86"
version = "0.2.20"
@ -2782,6 +2838,15 @@ dependencies = [
"smallvec",
]
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]]
name = "spki"
version = "0.7.3"

View File

@ -18,6 +18,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
wit-bindgen = "0.36.0"
postcard = "1.1.1"
[lib]
crate-type = ["cdylib"]

View File

@ -4,13 +4,13 @@ use crate::kinode::process::kns_indexer::{
};
use alloy_primitives::keccak256;
use alloy_sol_types::SolEvent;
use kinode::process::standard::clear_state;
use kinode_process_lib::{
await_message, call_init, eth, kimap,
kv::{self, Kv},
net, print_to_terminal, println, timer, Address, Capability, Message, Request, Response,
await_message, call_init, eth, get_state, kimap, net, print_to_terminal, println, set_state,
timer, Address, Capability, Message, Request, Response,
};
use std::{
collections::BTreeMap,
collections::{BTreeMap, HashMap},
net::{IpAddr, Ipv4Addr, Ipv6Addr},
str::FromStr,
};
@ -42,169 +42,73 @@ const CURRENT_VERSION: u32 = 1;
const MAX_PENDING_ATTEMPTS: u8 = 3;
const SUBSCRIPTION_TIMEOUT: u64 = 60;
const DELAY_MS: u64 = 1_000; // 1s
const CHECKPOINT_MS: u64 = 300_000; // 5 minutes
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
struct State {
/// version of the state in kv
version: u32,
/// last block we have an update from
last_block: u64,
/// kv handle
/// includes keys and values for:
/// "meta:chain_id", "meta:version", "meta:last_block", "meta:contract_address",
/// "names:{namehash}" -> "{name}", "nodes:{name}" -> "{node_info}"
kv: Kv<String, Vec<u8>>,
/// the chain id we are indexing
chain_id: u64,
/// what contract this state pertains to
contract_address: eth::Address,
/// namehash to human readable name
names: HashMap<String, String>,
/// human readable name to most recent on-chain routing information as json
nodes: HashMap<String, net::KnsUpdate>,
/// last saved checkpoint block
last_checkpoint_block: u64,
}
impl State {
fn new(our: &Address) -> Self {
let kv: Kv<String, Vec<u8>> = match kv::open(our.package_id(), "kns_indexer", Some(10)) {
Ok(kv) => kv,
Err(e) => panic!("fatal: error opening kns_indexer key_value database: {e:?}"),
};
Self {
version: CURRENT_VERSION,
last_block: KIMAP_FIRST_BLOCK,
kv,
fn new() -> Self {
State {
chain_id: CHAIN_ID,
contract_address: eth::Address::from_str(KIMAP_ADDRESS).unwrap(),
names: HashMap::new(),
nodes: HashMap::new(),
last_checkpoint_block: 0,
}
}
/// Loads the state from kv, and updates it with the current block number and version.
/// The result of this function will be that the constants for chain ID and contract address
/// are always matching the values in the kv.
fn load(our: &Address) -> Self {
let mut state = Self::new(our);
let desired_contract_address = eth::Address::from_str(KIMAP_ADDRESS).unwrap();
let version = state.get_version();
let chain_id = state.get_chain_id();
let contract_address = state.get_contract_address();
let last_block = state.get_last_block();
if version != Some(CURRENT_VERSION)
|| chain_id != Some(CHAIN_ID)
|| contract_address != Some(desired_contract_address)
{
// if version/contract/chain_id are new, run migrations here.
state.set_version(CURRENT_VERSION);
state.set_chain_id(CHAIN_ID);
state.set_contract_address(desired_contract_address);
}
state.last_block = last_block.unwrap_or(state.last_block);
println!(
"\n 🐦‍⬛ KNS Indexer State\n\
\n\
Version {}\n\
Chain ID {}\n\
Last Block {}\n\
KIMAP {}\n\
\n",
state.version, state.last_block, CHAIN_ID, desired_contract_address,
);
state
}
/// Reset by removing the database and reloading fresh state
fn reset(&self, our: &Address) {
// Remove the entire database
if let Err(e) = kv::remove_db(our.package_id(), "kns_indexer", None) {
println!("Warning: error removing kns_indexer database: {e:?}");
fn load() -> Self {
match get_state() {
None => Self::new(),
Some(state_bytes) => match postcard::from_bytes(&state_bytes) {
Ok(state) => state,
Err(e) => {
println!("failed to deserialize saved state: {e:?}");
Self::new()
}
},
}
}
fn meta_version_key() -> String {
"meta:version".to_string()
/// Reset by removing the checkpoint and reloading fresh state
fn reset(&self) {
clear_state();
}
fn meta_last_block_key() -> String {
"meta:last_block".to_string()
/// Saves a checkkpoint, serializes to the current block
fn save(&mut self, block: u64) {
self.last_checkpoint_block = block;
match postcard::to_allocvec(self) {
Ok(state_bytes) => {
set_state(&state_bytes);
}
Err(e) => {
println!("failed to serialize state: {e:?}");
}
}
}
fn meta_chain_id_key() -> String {
"meta:chain_id".to_string()
}
fn meta_contract_address_key() -> String {
"meta:contract_address".to_string()
}
fn name_key(namehash: &str) -> String {
format!("name:{}", namehash)
}
fn node_key(name: &str) -> String {
format!("node:{}", name)
}
fn get_last_block(&self) -> Option<u64> {
self.kv.get_as::<u64>(&Self::meta_last_block_key()).ok()
}
fn set_last_block(&mut self, block: u64) {
self.kv
.set_as::<u64>(&Self::meta_last_block_key(), &block, None)
.unwrap();
self.last_block = block;
}
fn get_version(&self) -> Option<u32> {
self.kv.get_as::<u32>(&Self::meta_version_key()).ok()
}
fn set_version(&mut self, version: u32) {
self.kv
.set_as::<u32>(&Self::meta_version_key(), &version, None)
.unwrap();
self.version = version;
}
fn get_name(&self, namehash: &str) -> Option<String> {
self.kv
.get(&Self::name_key(namehash))
.ok()
.and_then(|bytes| String::from_utf8(bytes).ok())
}
fn set_name(&mut self, namehash: &str, name: &str) {
self.kv
.set(&Self::name_key(namehash), &name.as_bytes().to_vec(), None)
.unwrap();
}
fn get_node(&self, name: &str) -> Option<net::KnsUpdate> {
self.kv.get_as::<net::KnsUpdate>(&Self::node_key(name)).ok()
}
fn set_node(&mut self, name: &str, node: &net::KnsUpdate) {
self.kv
.set_as::<net::KnsUpdate>(&Self::node_key(name), &node, None)
.unwrap();
}
fn get_chain_id(&self) -> Option<u64> {
self.kv.get_as::<u64>(&Self::meta_chain_id_key()).ok()
}
fn set_chain_id(&mut self, chain_id: u64) {
self.kv
.set_as::<u64>(&Self::meta_chain_id_key(), &chain_id, None)
.unwrap();
}
fn get_contract_address(&self) -> Option<eth::Address> {
self.kv
.get_as::<eth::Address>(&Self::meta_contract_address_key())
.ok()
}
fn set_contract_address(&mut self, contract_address: eth::Address) {
self.kv
.set_as::<eth::Address>(&Self::meta_contract_address_key(), &contract_address, None)
.expect("Failed to set contract address");
/// loops through saved nodes, and sends them to net
/// called upon bootup
fn send_nodes(&self) -> anyhow::Result<()> {
for node in self.nodes.values() {
Request::to(("our", "net", "distro", "sys"))
.body(rmp_serde::to_vec(&net::NetAction::KnsUpdate(node.clone()))?)
.send()?;
}
Ok(())
}
}
@ -236,6 +140,24 @@ impl From<WitKnsUpdate> for net::KnsUpdate {
}
}
// impl From<WitState> for State {
// fn from(s: WitState) -> Self {
// let contract_address: [u8; 20] = s
// .contract_address
// .try_into()
// .expect("invalid contract addess: doesn't have 20 bytes");
// State {
// chain_id: s.chain_id.clone(),
// contract_address: contract_address.into(),
// names: HashMap::from_iter(s.names),
// nodes: HashMap::from_iter(s.nodes.iter().map(|(k, v)| (k.clone(), v.clone().into()))),
// last_block: s.last_block.clone(),
// }
// }
// }
// add back state!
// and another todo, loop through entries at the start and give them to uhh net?
#[derive(Debug, thiserror::Error)]
enum KnsError {
#[error("Parent node for note not found")]
@ -244,8 +166,8 @@ enum KnsError {
call_init!(init);
fn init(our: Address) {
// state is loaded from kv, and updated with the current block number and version.
let state = State::load(&our);
// state is checkpointed regularly (default every 5 minutes if new events are found)
let state = State::load();
if let Err(e) = main(our, state) {
println!("fatal error: {e}");
@ -256,22 +178,29 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
#[cfg(feature = "simulation-mode")]
add_temp_hardcoded_tlzs(&mut state);
let chain_id = CHAIN_ID;
let kimap_address = eth::Address::from_str(KIMAP_ADDRESS).unwrap();
// loop through checkpointed values and send to net
if let Err(e) = state.send_nodes() {
// todo change verbosity
println!("failed to send nodes to net: {e}");
}
// current block is only saved to state upon checkpoints, we use this to keep track of last events
// set to checkpoint-1
let mut last_block = state.last_checkpoint_block.saturating_sub(1);
// sub_id: 1
// listen to all mint events in kimap
let mints_filter = eth::Filter::new()
.address(kimap_address)
.from_block(state.last_block)
.address(state.contract_address)
.from_block(last_block)
.to_block(eth::BlockNumberOrTag::Latest)
.event("Mint(bytes32,bytes32,bytes,bytes)");
// sub_id: 2
// listen to all note events that are relevant to the KNS protocol within kimap
let notes_filter = eth::Filter::new()
.address(kimap_address)
.from_block(state.last_block)
.address(state.contract_address)
.from_block(last_block)
.to_block(eth::BlockNumberOrTag::Latest)
.event("Note(bytes32,bytes32,bytes,bytes,bytes)")
.topic3(vec![
@ -284,7 +213,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
// 60s timeout -- these calls can take a long time
// if they do time out, we try them again
let eth_provider: eth::Provider = eth::Provider::new(chain_id, SUBSCRIPTION_TIMEOUT);
let eth_provider: eth::Provider = eth::Provider::new(state.chain_id, SUBSCRIPTION_TIMEOUT);
// subscribe to logs first, so no logs are missed
eth_provider.subscribe_loop(1, mints_filter.clone());
@ -299,22 +228,28 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
let mut pending_notes: BTreeMap<u64, Vec<(kimap::contract::Note, u8)>> = BTreeMap::new();
// if block in state is < current_block, get logs from that part.
println!("syncing old logs from block: {}", state.last_block);
println!("syncing old logs from block: {}", last_block);
fetch_and_process_logs(
&eth_provider,
&mut state,
mints_filter.clone(),
&mut pending_notes,
&mut last_block,
);
fetch_and_process_logs(
&eth_provider,
&mut state,
notes_filter.clone(),
&mut pending_notes,
&mut last_block,
);
// set a timer tick so any pending logs will be processed
timer::set_timer(DELAY_MS, None);
// set a timer tick for checkpointing
timer::set_timer(CHECKPOINT_MS, Some(b"checkpoint".to_vec()));
println!("done syncing old logs.");
loop {
@ -324,6 +259,10 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
// if true, time to go check current block number and handle pending notes.
let tick = message.is_local(&our) && message.source().process == "timer:distro:sys";
let checkpoint = message.is_local(&our)
&& message.source().process == "timer:distro:sys"
&& message.context() == Some(b"checkpoint");
let Message::Request {
source,
body,
@ -336,10 +275,12 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
&mut state,
&eth_provider,
tick,
checkpoint,
&mut pending_notes,
&[],
&mints_filter,
&notes_filter,
&mut last_block,
)?;
}
continue;
@ -350,10 +291,12 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
&mut state,
&eth_provider,
tick,
checkpoint,
&mut pending_notes,
&body,
&mints_filter,
&notes_filter,
&mut last_block,
)?;
} else {
match serde_json::from_slice(&body)? {
@ -361,15 +304,13 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
// TODO: make sure we've seen the whole block, while actually
// sending a response to the proper place.
Response::new()
.body(IndexerResponse::Name(state.get_name(hash)))
.body(IndexerResponse::Name(state.names.get(hash).cloned()))
.send()?;
}
IndexerRequest::NodeInfo(NodeInfoRequest { ref name, .. }) => {
Response::new()
.body(&IndexerResponse::NodeInfo(
state
.get_node(name)
.map(|update| WitKnsUpdate::from(update)),
.body(IndexerResponse::NodeInfo(
state.nodes.get(name).map(|n| n.clone().into()),
))
.send()?;
}
@ -390,7 +331,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
}
}
// reload state fresh - this will create new db
state.reset(&our);
state.reset();
Response::new()
.body(IndexerResponse::Reset(ResetResult::Success))
.send()?;
@ -405,17 +346,19 @@ fn handle_eth_message(
state: &mut State,
eth_provider: &eth::Provider,
tick: bool,
checkpoint: bool,
pending_notes: &mut BTreeMap<u64, Vec<(kimap::contract::Note, u8)>>,
body: &[u8],
mints_filter: &eth::Filter,
notes_filter: &eth::Filter,
last_block: &mut u64,
) -> anyhow::Result<()> {
match serde_json::from_slice::<eth::EthSubResult>(body) {
Ok(Ok(eth::EthSub { result, .. })) => {
if let Ok(eth::SubscriptionResult::Log(log)) =
serde_json::from_value::<eth::SubscriptionResult>(result)
{
if let Err(e) = handle_log(state, pending_notes, &log) {
if let Err(e) = handle_log(state, pending_notes, &log, last_block) {
print_to_terminal(1, &format!("log-handling error! {e:?}"));
}
}
@ -435,10 +378,13 @@ fn handle_eth_message(
let block_number = eth_provider.get_block_number();
if let Ok(block_number) = block_number {
print_to_terminal(2, &format!("new block: {}", block_number));
state.set_last_block(block_number);
*last_block = block_number;
if checkpoint {
state.save(block_number);
}
}
}
handle_pending_notes(state, pending_notes)?;
handle_pending_notes(state, pending_notes, last_block)?;
if !pending_notes.is_empty() {
timer::set_timer(DELAY_MS, None);
@ -450,6 +396,7 @@ fn handle_eth_message(
fn handle_pending_notes(
state: &mut State,
pending_notes: &mut BTreeMap<u64, Vec<(kimap::contract::Note, u8)>>,
last_block: &mut u64,
) -> anyhow::Result<()> {
if pending_notes.is_empty() {
return Ok(());
@ -457,7 +404,7 @@ fn handle_pending_notes(
let mut blocks_to_remove = vec![];
for (block, notes) in pending_notes.iter_mut() {
if *block < state.last_block {
if block < last_block {
let mut keep_notes = Vec::new();
for (note, attempt) in notes.drain(..) {
if attempt >= MAX_PENDING_ATTEMPTS {
@ -502,53 +449,69 @@ fn handle_note(state: &mut State, note: &kimap::contract::Note) -> anyhow::Resul
if !kimap::valid_note(&note_label) {
return Err(anyhow::anyhow!("skipping invalid note: {note_label}"));
}
let Some(node_name) = state.get_name(&node_hash) else {
let Some(node_name) = state.names.get(&node_hash) else {
return Err(KnsError::NoParentError.into());
};
if let Some(mut node) = state.get_node(&node_name) {
match note_label.as_str() {
"~ws-port" => {
let ws = bytes_to_port(&note.data)?;
match note_label.as_str() {
"~ws-port" => {
let ws = bytes_to_port(&note.data)?;
if let Some(node) = state.nodes.get_mut(node_name) {
node.ports.insert("ws".to_string(), ws);
node.routers = vec![]; // port defined, -> direct
}
"~tcp-port" => {
let tcp = bytes_to_port(&note.data)?;
node.ports.insert("tcp".to_string(), tcp);
node.routers = vec![]; // port defined, -> direct
}
"~net-key" => {
if note.data.len() != 32 {
return Err(anyhow::anyhow!("invalid net-key length"));
}
node.public_key = hex::encode(&note.data);
}
"~routers" => {
let routers = decode_routers(&note.data, state);
node.routers = routers;
node.ports = BTreeMap::new(); // -> indirect
node.ips = vec![];
}
"~ip" => {
let ip = bytes_to_ip(&note.data)?;
node.ips = vec![ip.to_string()];
node.routers = vec![]; // -> direct
}
_other => {
// Ignore unknown notes
// port defined, -> direct
node.routers = vec![];
}
}
"~tcp-port" => {
let tcp = bytes_to_port(&note.data)?;
if let Some(node) = state.nodes.get_mut(node_name) {
node.ports.insert("tcp".to_string(), tcp);
// port defined, -> direct
node.routers = vec![];
}
}
"~net-key" => {
if note.data.len() != 32 {
return Err(anyhow::anyhow!("invalid net-key length"));
}
if let Some(node) = state.nodes.get_mut(node_name) {
node.public_key = hex::encode(&note.data);
}
}
"~routers" => {
let routers = decode_routers(&note.data, state);
if let Some(node) = state.nodes.get_mut(node_name) {
node.routers = routers;
// -> indirect
node.ports = BTreeMap::new();
node.ips = vec![];
}
}
"~ip" => {
let ip = bytes_to_ip(&note.data)?;
if let Some(node) = state.nodes.get_mut(node_name) {
node.ips = vec![ip.to_string()];
// -> direct
node.routers = vec![];
}
}
_other => {
// Ignore unknown notes
}
}
// Update the node in the state
state.set_node(&node_name, &node);
// Only send an update if we have a *full* set of data for networking
if !node.public_key.is_empty()
&& ((!node.ips.is_empty() && !node.ports.is_empty()) || !node.routers.is_empty())
// only send an update if we have a *full* set of data for networking:
// a node name, plus either <routers> or <ip, port(s)>
if let Some(node_info) = state.nodes.get(node_name) {
if !node_info.public_key.is_empty()
&& ((!node_info.ips.is_empty() && !node_info.ports.is_empty())
|| node_info.routers.len() > 0)
{
Request::to(("our", "net", "distro", "sys"))
.body(rmp_serde::to_vec(&net::NetAction::KnsUpdate(node))?)
.body(rmp_serde::to_vec(&net::NetAction::KnsUpdate(
node_info.clone(),
))?)
.send()?;
}
}
@ -560,9 +523,10 @@ fn handle_log(
state: &mut State,
pending_notes: &mut BTreeMap<u64, Vec<(kimap::contract::Note, u8)>>,
log: &eth::Log,
last_block: &mut u64,
) -> anyhow::Result<()> {
if let Some(block) = log.block_number {
state.set_last_block(block);
*last_block = block;
}
match log.topics()[0] {
@ -576,15 +540,15 @@ fn handle_log(
return Err(anyhow::anyhow!("skipping invalid name: {name}"));
}
let full_name = match state.get_name(&parent_hash) {
let full_name = match state.names.get(&parent_hash) {
Some(parent_name) => format!("{name}.{parent_name}"),
None => name,
};
state.set_name(&child_hash.clone(), &full_name.clone());
state.set_node(
&full_name.clone(),
&net::KnsUpdate {
state.names.insert(child_hash.clone(), full_name.clone());
state.nodes.insert(
full_name.clone(),
net::KnsUpdate {
name: full_name.clone(),
public_key: String::new(),
ips: Vec::new(),
@ -630,13 +594,14 @@ fn fetch_and_process_logs(
state: &mut State,
filter: eth::Filter,
pending_notes: &mut BTreeMap<u64, Vec<(kimap::contract::Note, u8)>>,
last_block: &mut u64,
) {
loop {
match eth_provider.get_logs(&filter) {
Ok(logs) => {
println!("log len: {}", logs.len());
for log in logs {
if let Err(e) = handle_log(state, pending_notes, &log) {
if let Err(e) = handle_log(state, pending_notes, &log, last_block) {
print_to_terminal(1, &format!("log-handling error! {e:?}"));
}
}
@ -680,7 +645,7 @@ fn decode_routers(data: &[u8], state: &State) -> Vec<String> {
for chunk in data.chunks(32) {
let hash_str = format!("0x{}", hex::encode(chunk));
match state.get_name(&hash_str) {
match state.names.get(&hash_str) {
Some(full_name) => routers.push(full_name.clone()),
None => print_to_terminal(
1,