mirror of
https://github.com/uqbar-dao/nectar.git
synced 2025-01-04 06:37:36 +03:00
Merge pull request #639 from kinode-dao/bp/checkpointkns
kns: checkpoint save state
This commit is contained in:
commit
a68716c276
11
Cargo.lock
generated
11
Cargo.lock
generated
@ -6386,6 +6386,17 @@ version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
|
||||
|
||||
[[package]]
|
||||
name = "state"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"kinode_process_lib 0.10.0",
|
||||
"process_macros",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"wit-bindgen 0.36.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "static_assertions"
|
||||
version = "1.1.0"
|
||||
|
@ -20,7 +20,7 @@ members = [
|
||||
"kinode/packages/contacts/contacts",
|
||||
"kinode/packages/homepage/homepage",
|
||||
"kinode/packages/kns-indexer/kns-indexer", "kinode/packages/kns-indexer/get-block", "kinode/packages/settings/settings", "kinode/packages/kns-indexer/reset",
|
||||
"kinode/packages/kns-indexer/node-info",
|
||||
"kinode/packages/kns-indexer/node-info", "kinode/packages/kns-indexer/state",
|
||||
"kinode/packages/terminal/terminal",
|
||||
"kinode/packages/terminal/alias", "kinode/packages/terminal/cat", "kinode/packages/terminal/echo",
|
||||
"kinode/packages/terminal/help", "kinode/packages/terminal/hi", "kinode/packages/terminal/kfetch",
|
||||
|
11
kinode/packages/kns-indexer/Cargo.lock
generated
11
kinode/packages/kns-indexer/Cargo.lock
generated
@ -2798,6 +2798,17 @@ version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
|
||||
|
||||
[[package]]
|
||||
name = "state"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"kinode_process_lib",
|
||||
"process_macros",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"wit-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "static_assertions"
|
||||
version = "1.1.0"
|
||||
|
@ -5,6 +5,7 @@ members = [
|
||||
"kns-indexer",
|
||||
"reset",
|
||||
"node-info",
|
||||
"state",
|
||||
]
|
||||
|
||||
[profile.release]
|
||||
|
@ -14,6 +14,9 @@ interface kns-indexer {
|
||||
/// returns an Option<KnsUpdate>
|
||||
/// set block to 0 if you just want to get the current state of the indexer
|
||||
node-info(node-info-request),
|
||||
/// return the entire state of the indexer at the given block
|
||||
/// set block to 0 if you just want to get the current state of the indexer
|
||||
get-state(get-state-request),
|
||||
/// resets and re-indexes the chain, requires root cap,
|
||||
/// returns a response varaint reset
|
||||
reset,
|
||||
@ -22,6 +25,7 @@ interface kns-indexer {
|
||||
variant indexer-response {
|
||||
name(option<string>),
|
||||
node-info(option<wit-kns-update>),
|
||||
get-state(wit-state),
|
||||
reset(reset-result),
|
||||
}
|
||||
|
||||
@ -43,6 +47,18 @@ interface kns-indexer {
|
||||
routers: list<string>,
|
||||
}
|
||||
|
||||
record get-state-request {
|
||||
block: u64,
|
||||
}
|
||||
|
||||
record wit-state {
|
||||
chain-id: u64,
|
||||
contract-address: list<u8>, // 20-byte ETH address
|
||||
names: list<tuple<string, string>>, // map, but wit doesn't support maps
|
||||
nodes: list<tuple<string, wit-kns-update>>, // map, but wit doesn't support maps
|
||||
last-block: u64,
|
||||
}
|
||||
|
||||
variant reset-result {
|
||||
success,
|
||||
err(reset-error),
|
||||
|
@ -1,16 +1,16 @@
|
||||
use crate::kinode::process::kns_indexer::{
|
||||
IndexerRequest, IndexerResponse, NamehashToNameRequest, NodeInfoRequest, ResetError,
|
||||
ResetResult, WitKnsUpdate,
|
||||
ResetResult, WitKnsUpdate, WitState,
|
||||
};
|
||||
use alloy_primitives::keccak256;
|
||||
use alloy_sol_types::SolEvent;
|
||||
use kinode::process::standard::clear_state;
|
||||
use kinode_process_lib::{
|
||||
await_message, call_init, eth, kimap,
|
||||
kv::{self, Kv},
|
||||
net, print_to_terminal, println, timer, Address, Capability, Message, Request, Response,
|
||||
await_message, call_init, eth, get_state, kimap, net, print_to_terminal, println, set_state,
|
||||
timer, Address, Capability, Message, Request, Response,
|
||||
};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
collections::{BTreeMap, HashMap},
|
||||
net::{IpAddr, Ipv4Addr, Ipv6Addr},
|
||||
str::FromStr,
|
||||
};
|
||||
@ -42,169 +42,69 @@ const CURRENT_VERSION: u32 = 1;
|
||||
const MAX_PENDING_ATTEMPTS: u8 = 3;
|
||||
const SUBSCRIPTION_TIMEOUT: u64 = 60;
|
||||
const DELAY_MS: u64 = 1_000; // 1s
|
||||
const CHECKPOINT_MS: u64 = 300_000; // 5 minutes
|
||||
|
||||
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
|
||||
struct State {
|
||||
/// version of the state in kv
|
||||
version: u32,
|
||||
/// last block we have an update from
|
||||
last_block: u64,
|
||||
/// kv handle
|
||||
/// includes keys and values for:
|
||||
/// "meta:chain_id", "meta:version", "meta:last_block", "meta:contract_address",
|
||||
/// "names:{namehash}" -> "{name}", "nodes:{name}" -> "{node_info}"
|
||||
kv: Kv<String, Vec<u8>>,
|
||||
/// the chain id we are indexing
|
||||
chain_id: u64,
|
||||
/// what contract this state pertains to
|
||||
contract_address: eth::Address,
|
||||
/// namehash to human readable name
|
||||
names: HashMap<String, String>,
|
||||
/// human readable name to most recent on-chain routing information as json
|
||||
nodes: HashMap<String, net::KnsUpdate>,
|
||||
/// last saved checkpoint block
|
||||
last_checkpoint_block: u64,
|
||||
}
|
||||
|
||||
impl State {
|
||||
fn new(our: &Address) -> Self {
|
||||
let kv: Kv<String, Vec<u8>> = match kv::open(our.package_id(), "kns_indexer", Some(10)) {
|
||||
Ok(kv) => kv,
|
||||
Err(e) => panic!("fatal: error opening kns_indexer key_value database: {e:?}"),
|
||||
};
|
||||
Self {
|
||||
version: CURRENT_VERSION,
|
||||
last_block: KIMAP_FIRST_BLOCK,
|
||||
kv,
|
||||
fn new() -> Self {
|
||||
State {
|
||||
chain_id: CHAIN_ID,
|
||||
contract_address: eth::Address::from_str(KIMAP_ADDRESS).unwrap(),
|
||||
names: HashMap::new(),
|
||||
nodes: HashMap::new(),
|
||||
last_checkpoint_block: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Loads the state from kv, and updates it with the current block number and version.
|
||||
/// The result of this function will be that the constants for chain ID and contract address
|
||||
/// are always matching the values in the kv.
|
||||
fn load(our: &Address) -> Self {
|
||||
let mut state = Self::new(our);
|
||||
|
||||
let desired_contract_address = eth::Address::from_str(KIMAP_ADDRESS).unwrap();
|
||||
|
||||
let version = state.get_version();
|
||||
let chain_id = state.get_chain_id();
|
||||
let contract_address = state.get_contract_address();
|
||||
let last_block = state.get_last_block();
|
||||
|
||||
if version != Some(CURRENT_VERSION)
|
||||
|| chain_id != Some(CHAIN_ID)
|
||||
|| contract_address != Some(desired_contract_address)
|
||||
{
|
||||
// if version/contract/chain_id are new, run migrations here.
|
||||
state.set_version(CURRENT_VERSION);
|
||||
state.set_chain_id(CHAIN_ID);
|
||||
state.set_contract_address(desired_contract_address);
|
||||
}
|
||||
|
||||
state.last_block = last_block.unwrap_or(state.last_block);
|
||||
|
||||
println!(
|
||||
"started\n 🐦⬛ KNS Indexer State\n\
|
||||
▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔\n\
|
||||
Version {}\n\
|
||||
Chain ID {}\n\
|
||||
Last Block {}\n\
|
||||
KIMAP {}\n\
|
||||
▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁\n",
|
||||
state.version, CHAIN_ID, state.last_block, desired_contract_address,
|
||||
);
|
||||
|
||||
state
|
||||
}
|
||||
|
||||
/// Reset by removing the database and reloading fresh state
|
||||
fn reset(&self, our: &Address) {
|
||||
// Remove the entire database
|
||||
if let Err(e) = kv::remove_db(our.package_id(), "kns_indexer", None) {
|
||||
println!("Warning: error removing kns_indexer database: {e:?}");
|
||||
fn load() -> Self {
|
||||
match get_state() {
|
||||
None => Self::new(),
|
||||
Some(state_bytes) => match rmp_serde::from_slice(&state_bytes) {
|
||||
Ok(state) => state,
|
||||
Err(e) => {
|
||||
println!("failed to deserialize saved state: {e:?}");
|
||||
Self::new()
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn meta_version_key() -> String {
|
||||
"meta:version".to_string()
|
||||
/// Reset by removing the checkpoint and reloading fresh state
|
||||
fn reset(&self) {
|
||||
clear_state();
|
||||
}
|
||||
|
||||
fn meta_last_block_key() -> String {
|
||||
"meta:last_block".to_string()
|
||||
/// Saves a checkpoint, serializes to the current block
|
||||
fn save(&mut self, block: u64) {
|
||||
self.last_checkpoint_block = block;
|
||||
match rmp_serde::to_vec(self) {
|
||||
Ok(state_bytes) => set_state(&state_bytes),
|
||||
Err(e) => println!("failed to serialize state: {e:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn meta_chain_id_key() -> String {
|
||||
"meta:chain_id".to_string()
|
||||
}
|
||||
|
||||
fn meta_contract_address_key() -> String {
|
||||
"meta:contract_address".to_string()
|
||||
}
|
||||
|
||||
fn name_key(namehash: &str) -> String {
|
||||
format!("name:{}", namehash)
|
||||
}
|
||||
|
||||
fn node_key(name: &str) -> String {
|
||||
format!("node:{}", name)
|
||||
}
|
||||
|
||||
fn get_last_block(&self) -> Option<u64> {
|
||||
self.kv.get_as::<u64>(&Self::meta_last_block_key()).ok()
|
||||
}
|
||||
|
||||
fn set_last_block(&mut self, block: u64) {
|
||||
self.kv
|
||||
.set_as::<u64>(&Self::meta_last_block_key(), &block, None)
|
||||
.unwrap();
|
||||
self.last_block = block;
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Option<u32> {
|
||||
self.kv.get_as::<u32>(&Self::meta_version_key()).ok()
|
||||
}
|
||||
|
||||
fn set_version(&mut self, version: u32) {
|
||||
self.kv
|
||||
.set_as::<u32>(&Self::meta_version_key(), &version, None)
|
||||
.unwrap();
|
||||
self.version = version;
|
||||
}
|
||||
|
||||
fn get_name(&self, namehash: &str) -> Option<String> {
|
||||
self.kv
|
||||
.get(&Self::name_key(namehash))
|
||||
.ok()
|
||||
.and_then(|bytes| String::from_utf8(bytes).ok())
|
||||
}
|
||||
|
||||
fn set_name(&mut self, namehash: &str, name: &str) {
|
||||
self.kv
|
||||
.set(&Self::name_key(namehash), &name.as_bytes().to_vec(), None)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn get_node(&self, name: &str) -> Option<net::KnsUpdate> {
|
||||
self.kv.get_as::<net::KnsUpdate>(&Self::node_key(name)).ok()
|
||||
}
|
||||
|
||||
fn set_node(&mut self, name: &str, node: &net::KnsUpdate) {
|
||||
self.kv
|
||||
.set_as::<net::KnsUpdate>(&Self::node_key(name), &node, None)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn get_chain_id(&self) -> Option<u64> {
|
||||
self.kv.get_as::<u64>(&Self::meta_chain_id_key()).ok()
|
||||
}
|
||||
|
||||
fn set_chain_id(&mut self, chain_id: u64) {
|
||||
self.kv
|
||||
.set_as::<u64>(&Self::meta_chain_id_key(), &chain_id, None)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn get_contract_address(&self) -> Option<eth::Address> {
|
||||
self.kv
|
||||
.get_as::<eth::Address>(&Self::meta_contract_address_key())
|
||||
.ok()
|
||||
}
|
||||
|
||||
fn set_contract_address(&mut self, contract_address: eth::Address) {
|
||||
self.kv
|
||||
.set_as::<eth::Address>(&Self::meta_contract_address_key(), &contract_address, None)
|
||||
.expect("Failed to set contract address");
|
||||
/// loops through saved nodes, and sends them to net
|
||||
/// called upon bootup
|
||||
fn send_nodes(&self) -> anyhow::Result<()> {
|
||||
for node in self.nodes.values() {
|
||||
Request::to(("our", "net", "distro", "sys"))
|
||||
.body(rmp_serde::to_vec(&net::NetAction::KnsUpdate(node.clone()))?)
|
||||
.send()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@ -236,6 +136,27 @@ impl From<WitKnsUpdate> for net::KnsUpdate {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<State> for WitState {
|
||||
fn from(s: State) -> Self {
|
||||
let contract_address: [u8; 20] = s.contract_address.into();
|
||||
WitState {
|
||||
chain_id: s.chain_id.clone(),
|
||||
contract_address: contract_address.to_vec(),
|
||||
names: s
|
||||
.names
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect::<Vec<_>>(),
|
||||
nodes: s
|
||||
.nodes
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone().into()))
|
||||
.collect::<Vec<_>>(),
|
||||
last_block: s.last_checkpoint_block.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
enum KnsError {
|
||||
#[error("Parent node for note not found")]
|
||||
@ -244,8 +165,8 @@ enum KnsError {
|
||||
|
||||
call_init!(init);
|
||||
fn init(our: Address) {
|
||||
// state is loaded from kv, and updated with the current block number and version.
|
||||
let state = State::load(&our);
|
||||
// state is checkpointed regularly (default every 5 minutes if new events are found)
|
||||
let state = State::load();
|
||||
|
||||
if let Err(e) = main(our, state) {
|
||||
println!("fatal error: {e}");
|
||||
@ -256,22 +177,29 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
|
||||
#[cfg(feature = "simulation-mode")]
|
||||
add_temp_hardcoded_tlzs(&mut state);
|
||||
|
||||
let chain_id = CHAIN_ID;
|
||||
let kimap_address = eth::Address::from_str(KIMAP_ADDRESS).unwrap();
|
||||
// loop through checkpointed values and send to net
|
||||
if let Err(e) = state.send_nodes() {
|
||||
// todo change verbosity
|
||||
println!("failed to send nodes to net: {e}");
|
||||
}
|
||||
|
||||
// current block is only saved to state upon checkpoints, we use this to keep track of last events
|
||||
// set to checkpoint-1
|
||||
let mut last_block = state.last_checkpoint_block.saturating_sub(1);
|
||||
|
||||
// sub_id: 1
|
||||
// listen to all mint events in kimap
|
||||
let mints_filter = eth::Filter::new()
|
||||
.address(kimap_address)
|
||||
.from_block(state.last_block)
|
||||
.address(state.contract_address)
|
||||
.from_block(last_block)
|
||||
.to_block(eth::BlockNumberOrTag::Latest)
|
||||
.event("Mint(bytes32,bytes32,bytes,bytes)");
|
||||
|
||||
// sub_id: 2
|
||||
// listen to all note events that are relevant to the KNS protocol within kimap
|
||||
let notes_filter = eth::Filter::new()
|
||||
.address(kimap_address)
|
||||
.from_block(state.last_block)
|
||||
.address(state.contract_address)
|
||||
.from_block(last_block)
|
||||
.to_block(eth::BlockNumberOrTag::Latest)
|
||||
.event("Note(bytes32,bytes32,bytes,bytes,bytes)")
|
||||
.topic3(vec![
|
||||
@ -284,7 +212,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
|
||||
|
||||
// 60s timeout -- these calls can take a long time
|
||||
// if they do time out, we try them again
|
||||
let eth_provider: eth::Provider = eth::Provider::new(chain_id, SUBSCRIPTION_TIMEOUT);
|
||||
let eth_provider: eth::Provider = eth::Provider::new(state.chain_id, SUBSCRIPTION_TIMEOUT);
|
||||
|
||||
// subscribe to logs first, so no logs are m issed
|
||||
eth_provider.subscribe_loop(1, mints_filter.clone(), 2, 0);
|
||||
@ -299,25 +227,28 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
|
||||
let mut pending_notes: BTreeMap<u64, Vec<(kimap::contract::Note, u8)>> = BTreeMap::new();
|
||||
|
||||
// if block in state is < current_block, get logs from that part.
|
||||
print_to_terminal(
|
||||
2,
|
||||
&format!("syncing old logs from block: {}", state.last_block),
|
||||
);
|
||||
print_to_terminal(2, &format!("syncing old logs from block: {}", last_block));
|
||||
fetch_and_process_logs(
|
||||
ð_provider,
|
||||
&mut state,
|
||||
mints_filter.clone(),
|
||||
&mut pending_notes,
|
||||
&mut last_block,
|
||||
);
|
||||
fetch_and_process_logs(
|
||||
ð_provider,
|
||||
&mut state,
|
||||
notes_filter.clone(),
|
||||
&mut pending_notes,
|
||||
&mut last_block,
|
||||
);
|
||||
|
||||
// set a timer tick so any pending logs will be processed
|
||||
timer::set_timer(DELAY_MS, None);
|
||||
|
||||
// set a timer tick for checkpointing
|
||||
timer::set_timer(CHECKPOINT_MS, Some(b"checkpoint".to_vec()));
|
||||
|
||||
print_to_terminal(2, "done syncing old logs.");
|
||||
|
||||
loop {
|
||||
@ -327,6 +258,10 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
|
||||
|
||||
// if true, time to go check current block number and handle pending notes.
|
||||
let tick = message.is_local(&our) && message.source().process == "timer:distro:sys";
|
||||
let checkpoint = message.is_local(&our)
|
||||
&& message.source().process == "timer:distro:sys"
|
||||
&& message.context() == Some(b"checkpoint");
|
||||
|
||||
let Message::Request {
|
||||
source,
|
||||
body,
|
||||
@ -339,10 +274,12 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
|
||||
&mut state,
|
||||
ð_provider,
|
||||
tick,
|
||||
checkpoint,
|
||||
&mut pending_notes,
|
||||
&[],
|
||||
&mints_filter,
|
||||
¬es_filter,
|
||||
&mut last_block,
|
||||
)?;
|
||||
}
|
||||
continue;
|
||||
@ -353,10 +290,12 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
|
||||
&mut state,
|
||||
ð_provider,
|
||||
tick,
|
||||
checkpoint,
|
||||
&mut pending_notes,
|
||||
&body,
|
||||
&mints_filter,
|
||||
¬es_filter,
|
||||
&mut last_block,
|
||||
)?;
|
||||
} else {
|
||||
match serde_json::from_slice(&body)? {
|
||||
@ -364,15 +303,13 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
|
||||
// TODO: make sure we've seen the whole block, while actually
|
||||
// sending a response to the proper place.
|
||||
Response::new()
|
||||
.body(IndexerResponse::Name(state.get_name(hash)))
|
||||
.body(IndexerResponse::Name(state.names.get(hash).cloned()))
|
||||
.send()?;
|
||||
}
|
||||
IndexerRequest::NodeInfo(NodeInfoRequest { ref name, .. }) => {
|
||||
Response::new()
|
||||
.body(&IndexerResponse::NodeInfo(
|
||||
state
|
||||
.get_node(name)
|
||||
.map(|update| WitKnsUpdate::from(update)),
|
||||
.body(IndexerResponse::NodeInfo(
|
||||
state.nodes.get(name).map(|n| n.clone().into()),
|
||||
))
|
||||
.send()?;
|
||||
}
|
||||
@ -393,12 +330,17 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
|
||||
}
|
||||
}
|
||||
// reload state fresh - this will create new db
|
||||
state.reset(&our);
|
||||
state.reset();
|
||||
Response::new()
|
||||
.body(IndexerResponse::Reset(ResetResult::Success))
|
||||
.send()?;
|
||||
panic!("resetting state, restarting!");
|
||||
}
|
||||
IndexerRequest::GetState(_) => {
|
||||
Response::new()
|
||||
.body(IndexerResponse::GetState(state.clone().into()))
|
||||
.send()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -408,17 +350,19 @@ fn handle_eth_message(
|
||||
state: &mut State,
|
||||
eth_provider: ð::Provider,
|
||||
tick: bool,
|
||||
checkpoint: bool,
|
||||
pending_notes: &mut BTreeMap<u64, Vec<(kimap::contract::Note, u8)>>,
|
||||
body: &[u8],
|
||||
mints_filter: ð::Filter,
|
||||
notes_filter: ð::Filter,
|
||||
last_block: &mut u64,
|
||||
) -> anyhow::Result<()> {
|
||||
match serde_json::from_slice::<eth::EthSubResult>(body) {
|
||||
Ok(Ok(eth::EthSub { result, .. })) => {
|
||||
if let Ok(eth::SubscriptionResult::Log(log)) =
|
||||
serde_json::from_value::<eth::SubscriptionResult>(result)
|
||||
{
|
||||
if let Err(e) = handle_log(state, pending_notes, &log) {
|
||||
if let Err(e) = handle_log(state, pending_notes, &log, last_block) {
|
||||
print_to_terminal(1, &format!("log-handling error! {e:?}"));
|
||||
}
|
||||
}
|
||||
@ -438,10 +382,13 @@ fn handle_eth_message(
|
||||
let block_number = eth_provider.get_block_number();
|
||||
if let Ok(block_number) = block_number {
|
||||
print_to_terminal(2, &format!("new block: {}", block_number));
|
||||
state.set_last_block(block_number);
|
||||
*last_block = block_number;
|
||||
if checkpoint {
|
||||
state.save(block_number);
|
||||
}
|
||||
}
|
||||
}
|
||||
handle_pending_notes(state, pending_notes)?;
|
||||
handle_pending_notes(state, pending_notes, last_block)?;
|
||||
|
||||
if !pending_notes.is_empty() {
|
||||
timer::set_timer(DELAY_MS, None);
|
||||
@ -453,6 +400,7 @@ fn handle_eth_message(
|
||||
fn handle_pending_notes(
|
||||
state: &mut State,
|
||||
pending_notes: &mut BTreeMap<u64, Vec<(kimap::contract::Note, u8)>>,
|
||||
last_block: &mut u64,
|
||||
) -> anyhow::Result<()> {
|
||||
if pending_notes.is_empty() {
|
||||
return Ok(());
|
||||
@ -460,7 +408,7 @@ fn handle_pending_notes(
|
||||
let mut blocks_to_remove = vec![];
|
||||
|
||||
for (block, notes) in pending_notes.iter_mut() {
|
||||
if *block < state.last_block {
|
||||
if block < last_block {
|
||||
let mut keep_notes = Vec::new();
|
||||
for (note, attempt) in notes.drain(..) {
|
||||
if attempt >= MAX_PENDING_ATTEMPTS {
|
||||
@ -505,53 +453,69 @@ fn handle_note(state: &mut State, note: &kimap::contract::Note) -> anyhow::Resul
|
||||
if !kimap::valid_note(¬e_label) {
|
||||
return Err(anyhow::anyhow!("skipping invalid note: {note_label}"));
|
||||
}
|
||||
let Some(node_name) = state.get_name(&node_hash) else {
|
||||
|
||||
let Some(node_name) = state.names.get(&node_hash) else {
|
||||
return Err(KnsError::NoParentError.into());
|
||||
};
|
||||
|
||||
if let Some(mut node) = state.get_node(&node_name) {
|
||||
match note_label.as_str() {
|
||||
"~ws-port" => {
|
||||
let ws = bytes_to_port(¬e.data)?;
|
||||
match note_label.as_str() {
|
||||
"~ws-port" => {
|
||||
let ws = bytes_to_port(¬e.data)?;
|
||||
if let Some(node) = state.nodes.get_mut(node_name) {
|
||||
node.ports.insert("ws".to_string(), ws);
|
||||
node.routers = vec![]; // port defined, -> direct
|
||||
}
|
||||
"~tcp-port" => {
|
||||
let tcp = bytes_to_port(¬e.data)?;
|
||||
node.ports.insert("tcp".to_string(), tcp);
|
||||
node.routers = vec![]; // port defined, -> direct
|
||||
}
|
||||
"~net-key" => {
|
||||
if note.data.len() != 32 {
|
||||
return Err(anyhow::anyhow!("invalid net-key length"));
|
||||
}
|
||||
node.public_key = hex::encode(¬e.data);
|
||||
}
|
||||
"~routers" => {
|
||||
let routers = decode_routers(¬e.data, state);
|
||||
node.routers = routers;
|
||||
node.ports = BTreeMap::new(); // -> indirect
|
||||
node.ips = vec![];
|
||||
}
|
||||
"~ip" => {
|
||||
let ip = bytes_to_ip(¬e.data)?;
|
||||
node.ips = vec![ip.to_string()];
|
||||
node.routers = vec![]; // -> direct
|
||||
}
|
||||
_other => {
|
||||
// Ignore unknown notes
|
||||
// port defined, -> direct
|
||||
node.routers = vec![];
|
||||
}
|
||||
}
|
||||
"~tcp-port" => {
|
||||
let tcp = bytes_to_port(¬e.data)?;
|
||||
if let Some(node) = state.nodes.get_mut(node_name) {
|
||||
node.ports.insert("tcp".to_string(), tcp);
|
||||
// port defined, -> direct
|
||||
node.routers = vec![];
|
||||
}
|
||||
}
|
||||
"~net-key" => {
|
||||
if note.data.len() != 32 {
|
||||
return Err(anyhow::anyhow!("invalid net-key length"));
|
||||
}
|
||||
if let Some(node) = state.nodes.get_mut(node_name) {
|
||||
node.public_key = hex::encode(¬e.data);
|
||||
}
|
||||
}
|
||||
"~routers" => {
|
||||
let routers = decode_routers(¬e.data, state);
|
||||
if let Some(node) = state.nodes.get_mut(node_name) {
|
||||
node.routers = routers;
|
||||
// -> indirect
|
||||
node.ports = BTreeMap::new();
|
||||
node.ips = vec![];
|
||||
}
|
||||
}
|
||||
"~ip" => {
|
||||
let ip = bytes_to_ip(¬e.data)?;
|
||||
if let Some(node) = state.nodes.get_mut(node_name) {
|
||||
node.ips = vec![ip.to_string()];
|
||||
// -> direct
|
||||
node.routers = vec![];
|
||||
}
|
||||
}
|
||||
_other => {
|
||||
// Ignore unknown notes
|
||||
}
|
||||
}
|
||||
|
||||
// Update the node in the state
|
||||
state.set_node(&node_name, &node);
|
||||
|
||||
// Only send an update if we have a *full* set of data for networking
|
||||
if !node.public_key.is_empty()
|
||||
&& ((!node.ips.is_empty() && !node.ports.is_empty()) || !node.routers.is_empty())
|
||||
// only send an update if we have a *full* set of data for networking:
|
||||
// a node name, plus either <routers> or <ip, port(s)>
|
||||
if let Some(node_info) = state.nodes.get(node_name) {
|
||||
if !node_info.public_key.is_empty()
|
||||
&& ((!node_info.ips.is_empty() && !node_info.ports.is_empty())
|
||||
|| node_info.routers.len() > 0)
|
||||
{
|
||||
Request::to(("our", "net", "distro", "sys"))
|
||||
.body(rmp_serde::to_vec(&net::NetAction::KnsUpdate(node))?)
|
||||
.body(rmp_serde::to_vec(&net::NetAction::KnsUpdate(
|
||||
node_info.clone(),
|
||||
))?)
|
||||
.send()?;
|
||||
}
|
||||
}
|
||||
@ -563,9 +527,10 @@ fn handle_log(
|
||||
state: &mut State,
|
||||
pending_notes: &mut BTreeMap<u64, Vec<(kimap::contract::Note, u8)>>,
|
||||
log: ð::Log,
|
||||
last_block: &mut u64,
|
||||
) -> anyhow::Result<()> {
|
||||
if let Some(block) = log.block_number {
|
||||
state.set_last_block(block);
|
||||
*last_block = block;
|
||||
}
|
||||
|
||||
match log.topics()[0] {
|
||||
@ -579,15 +544,15 @@ fn handle_log(
|
||||
return Err(anyhow::anyhow!("skipping invalid name: {name}"));
|
||||
}
|
||||
|
||||
let full_name = match state.get_name(&parent_hash) {
|
||||
let full_name = match state.names.get(&parent_hash) {
|
||||
Some(parent_name) => format!("{name}.{parent_name}"),
|
||||
None => name,
|
||||
};
|
||||
|
||||
state.set_name(&child_hash.clone(), &full_name.clone());
|
||||
state.set_node(
|
||||
&full_name.clone(),
|
||||
&net::KnsUpdate {
|
||||
state.names.insert(child_hash.clone(), full_name.clone());
|
||||
state.nodes.insert(
|
||||
full_name.clone(),
|
||||
net::KnsUpdate {
|
||||
name: full_name.clone(),
|
||||
public_key: String::new(),
|
||||
ips: Vec::new(),
|
||||
@ -633,13 +598,14 @@ fn fetch_and_process_logs(
|
||||
state: &mut State,
|
||||
filter: eth::Filter,
|
||||
pending_notes: &mut BTreeMap<u64, Vec<(kimap::contract::Note, u8)>>,
|
||||
last_block: &mut u64,
|
||||
) {
|
||||
loop {
|
||||
match eth_provider.get_logs(&filter) {
|
||||
Ok(logs) => {
|
||||
print_to_terminal(2, &format!("log len: {}", logs.len()));
|
||||
for log in logs {
|
||||
if let Err(e) = handle_log(state, pending_notes, &log) {
|
||||
if let Err(e) = handle_log(state, pending_notes, &log, last_block) {
|
||||
print_to_terminal(1, &format!("log-handling error! {e:?}"));
|
||||
}
|
||||
}
|
||||
@ -683,7 +649,7 @@ fn decode_routers(data: &[u8], state: &State) -> Vec<String> {
|
||||
for chunk in data.chunks(32) {
|
||||
let hash_str = format!("0x{}", hex::encode(chunk));
|
||||
|
||||
match state.get_name(&hash_str) {
|
||||
match state.names.get(&hash_str) {
|
||||
Some(full_name) => routers.push(full_name.clone()),
|
||||
None => print_to_terminal(
|
||||
1,
|
||||
|
@ -40,5 +40,17 @@
|
||||
"kns-indexer:kns-indexer:sys"
|
||||
],
|
||||
"wit_version": 1
|
||||
},
|
||||
"state.wasm": {
|
||||
"root": false,
|
||||
"public": false,
|
||||
"request_networking": false,
|
||||
"request_capabilities": [
|
||||
"kns-indexer:kns-indexer:sys"
|
||||
],
|
||||
"grant_capabilities": [
|
||||
"kns-indexer:kns-indexer:sys"
|
||||
],
|
||||
"wit_version": 1
|
||||
}
|
||||
}
|
||||
|
20
kinode/packages/kns-indexer/state/Cargo.toml
Normal file
20
kinode/packages/kns-indexer/state/Cargo.toml
Normal file
@ -0,0 +1,20 @@
|
||||
[package]
|
||||
name = "state"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[features]
|
||||
simulation-mode = []
|
||||
|
||||
[dependencies]
|
||||
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "d97e012" }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
process_macros = "0.1"
|
||||
wit-bindgen = "0.36.0"
|
||||
|
||||
[lib]
|
||||
crate-type = ["cdylib"]
|
||||
|
||||
[package.metadata.component]
|
||||
package = "kinode:process"
|
46
kinode/packages/kns-indexer/state/src/lib.rs
Normal file
46
kinode/packages/kns-indexer/state/src/lib.rs
Normal file
@ -0,0 +1,46 @@
|
||||
use crate::kinode::process::kns_indexer::{GetStateRequest, IndexerRequest, IndexerResponse};
|
||||
use kinode_process_lib::{eth, script, Address, Message, Request};
|
||||
|
||||
wit_bindgen::generate!({
|
||||
path: "target/wit",
|
||||
world: "kns-indexer-sys-v0",
|
||||
generate_unused_types: true,
|
||||
additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto],
|
||||
});
|
||||
|
||||
script!(init);
|
||||
fn init(_our: Address, _args: String) -> String {
|
||||
// we don't take any args
|
||||
|
||||
let Ok(Message::Response { body, .. }) =
|
||||
Request::to(("our", "kns-indexer", "kns-indexer", "sys"))
|
||||
.body(IndexerRequest::GetState(GetStateRequest { block: 0 }))
|
||||
.send_and_await_response(10)
|
||||
.unwrap()
|
||||
else {
|
||||
return "failed to get state from kns-indexer".to_string();
|
||||
};
|
||||
let Ok(IndexerResponse::GetState(state)) = body.try_into() else {
|
||||
return "failed to deserialize state".to_string();
|
||||
};
|
||||
// can change later, but for now, just print every known node name
|
||||
let mut names = state
|
||||
.names
|
||||
.iter()
|
||||
.map(|(_k, v)| v.clone())
|
||||
.collect::<Vec<_>>();
|
||||
names.sort();
|
||||
let contract_address: [u8; 20] = state
|
||||
.contract_address
|
||||
.try_into()
|
||||
.expect("invalid contract addess: doesn't have 20 bytes");
|
||||
let contract_address: eth::Address = contract_address.into();
|
||||
format!(
|
||||
"\nrunning on chain id {}\nCA: {}\n{} known nodes as of block {}\n {}",
|
||||
state.chain_id,
|
||||
contract_address,
|
||||
names.len(),
|
||||
state.last_block,
|
||||
names.join("\n ")
|
||||
)
|
||||
}
|
Loading…
Reference in New Issue
Block a user