refactor kns_indexer for cleanliness

This commit is contained in:
dr-frmr 2024-07-09 13:58:10 +02:00
parent 352e1f1059
commit 7223569c71
No known key found for this signature in database
2 changed files with 51 additions and 69 deletions

View File

@ -6,7 +6,7 @@ use alloy_sol_types::{sol, SolCall, SolEvent};
use kinode_process_lib::{
await_message, call_init,
eth::{self, Provider, TransactionInput, TransactionRequest},
net, println, Address, Message, Request, Response,
net, print_to_terminal, println, Address, Message, Request, Response,
};
use serde::{Deserialize, Serialize};
use std::{
@ -81,11 +81,8 @@ fn init(our: Address) {
block: KIMAP_FIRST_BLOCK,
};
match main(our, state) {
Ok(_) => {}
Err(e) => {
println!("error: {e:?}");
}
if let Err(e) = main(our, state) {
println!("fatal error: {e}");
}
}
@ -117,10 +114,13 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
// if they do time out, we try them again
let eth_provider: eth::Provider = eth::Provider::new(state.chain_id, 60);
println!(
"subscribing, state.block: {}, chain_id: {}",
state.block - 1,
state.chain_id
print_to_terminal(
1,
&format!(
"subscribing, state.block: {}, chain_id: {}",
state.block - 1,
state.chain_id
),
);
subscribe_to_logs(&eth_provider, state.block - 1, filter.clone());
@ -130,11 +130,9 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
match eth_provider.get_logs(&filter) {
Ok(logs) => {
for log in logs {
match handle_log(&our, &mut state, &log, &eth_provider) {
Ok(()) => {}
Err(e) => {
println!("log-handling error! {e:?}");
}
if let Err(e) = handle_log(&our, &mut state, &log, &eth_provider) {
// print errors at verbosity=1
print_to_terminal(1, &format!("log-handling error! {e:?}"));
}
}
break;
@ -151,7 +149,6 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
loop {
let Ok(message) = await_message() else {
println!("got network error");
continue;
};
let Message::Request { source, body, .. } = message else {
@ -170,10 +167,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
&filter,
)?;
} else {
let Ok(request) = serde_json::from_slice(&body) else {
println!("got invalid message");
continue;
};
let request = serde_json::from_slice(&body)?;
match request {
// IndexerRequests, especially NamehashToName, relevant anymore? if they're mostly queried from the net runtime?
@ -234,11 +228,9 @@ fn handle_eth_message(
match eth_result {
Ok(eth::EthSub { result, .. }) => {
if let eth::SubscriptionResult::Log(log) = result {
match handle_log(our, state, &log, eth_provider) {
Ok(()) => {}
Err(e) => {
println!("log-handling error! {e:?}");
}
if let Err(e) = handle_log(our, state, &log, eth_provider) {
// print errors at verbosity=1
print_to_terminal(1, &format!("log-handling error! {e:?}"));
}
}
}
@ -354,59 +346,50 @@ fn handle_log(
// println!("got note, from name: {name}, note: {note}, note_hash: {node_hash}",);
match note.as_str() {
"~ws-port" => {
let ws = bytes_to_port(&decoded.data);
if let Ok(ws) = ws {
state.nodes.entry(name.clone()).and_modify(|node| {
node.ports.insert("ws".to_string(), ws);
// port defined, -> direct
node.routers = vec![];
});
node = Some(name.clone());
}
let ws = bytes_to_port(&decoded.data)?;
state.nodes.entry(name.clone()).and_modify(|node| {
node.ports.insert("ws".to_string(), ws);
// port defined, -> direct
node.routers = vec![];
});
node = Some(name.clone());
}
"~tcp-port" => {
let tcp = bytes_to_port(&decoded.data);
if let Ok(tcp) = tcp {
state.nodes.entry(name.clone()).and_modify(|node| {
node.ports.insert("tcp".to_string(), tcp);
// port defined, -> direct
node.routers = vec![];
});
node = Some(name.clone());
}
let tcp = bytes_to_port(&decoded.data)?;
state.nodes.entry(name.clone()).and_modify(|node| {
node.ports.insert("tcp".to_string(), tcp);
// port defined, -> direct
node.routers = vec![];
});
node = Some(name.clone());
}
"~net-key" => {
println!("decoded.data: {:?}\r", decoded.data);
// note silent errors here...
// print silently for debugging?
if decoded.data.len() != 32 {
return Err(anyhow::anyhow!("invalid net-key length"));
}
state.nodes.entry(name.clone()).and_modify(|node| {
node.public_key = decoded.data.to_string();
println!("node.public_key: {}", node.public_key);
});
node = Some(name);
}
"~routers" => {
let routers = decode_routers(&decoded.data)?;
state.nodes.entry(name.clone()).and_modify(|node| {
if let Ok(routers) = decode_routers(&decoded.data) {
node.routers = routers;
// -> indirect
node.ports = BTreeMap::new();
node.ips = vec![];
}
node.routers = routers;
// -> indirect
node.ports = BTreeMap::new();
node.ips = vec![];
});
node = Some(name.clone());
}
"~ip" => {
let ip = bytes_to_ip(&decoded.data);
if let Ok(ip) = ip {
state.nodes.entry(name.clone()).and_modify(|node| {
node.ips.push(ip.to_string());
// -> direct
node.routers = vec![];
});
node = Some(name.clone());
}
let ip = bytes_to_ip(&decoded.data)?;
state.nodes.entry(name.clone()).and_modify(|node| {
node.ips.push(ip.to_string());
// -> direct
node.routers = vec![];
});
node = Some(name.clone());
}
_ => {}
}
@ -420,7 +403,6 @@ fn handle_log(
&& ((!node_info.ips.is_empty() && !node_info.ports.is_empty())
|| node_info.routers.len() > 0)
{
println!("sending kns update for node: {}", node_info.node);
Request::new()
.target((&our.node, "net", "distro", "sys"))
.body(rmp_serde::to_vec(&net::NetAction::KnsUpdate(
@ -506,7 +488,7 @@ fn decode_routers(data: &[u8]) -> anyhow::Result<Vec<String>> {
Ok(routers)
}
pub fn bytes_to_ip(bytes: &[u8]) -> Result<IpAddr, String> {
pub fn bytes_to_ip(bytes: &[u8]) -> anyhow::Result<IpAddr> {
match bytes.len() {
16 => {
let ip_num = u128::from_be_bytes(bytes.try_into().unwrap());
@ -518,14 +500,14 @@ pub fn bytes_to_ip(bytes: &[u8]) -> Result<IpAddr, String> {
Ok(IpAddr::V6(Ipv6Addr::from(ip_num)))
}
}
_ => Err("Invalid byte length for IP address".to_string()),
_ => Err(anyhow::anyhow!("Invalid byte length for IP address")),
}
}
pub fn bytes_to_port(bytes: &[u8]) -> Result<u16, String> {
pub fn bytes_to_port(bytes: &[u8]) -> anyhow::Result<u16> {
match bytes.len() {
2 => Ok(u16::from_be_bytes([bytes[0], bytes[1]])),
_ => Err("Invalid byte length for port".to_string()),
_ => Err(anyhow::anyhow!("Invalid byte length for port")),
}
}

View File

@ -411,7 +411,7 @@ async fn handle_boot(
let boot = Boot {
username: our.name.clone(),
password_hash: password_hash,
password_hash,
timestamp: U256::from(info.timestamp),
direct: info.direct,
reset: info.reset,