Merge pull request #212 from kinode-dao/dr/kns-indexer-add-namehash-resolver

kns_indexer: add namehash resolver
This commit is contained in:
doria 2024-01-27 17:36:52 -03:00 committed by GitHub
commit ea51fee4de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 118 additions and 119 deletions

2
Cargo.lock generated
View File

@ -2703,7 +2703,7 @@ dependencies = [
[[package]]
name = "kinode"
version = "0.5.1"
version = "0.5.2"
dependencies = [
"aes-gcm 0.10.2",
"anyhow",

View File

@ -1,7 +1,7 @@
[package]
name = "kinode"
authors = ["UqbarDAO"]
version = "0.5.1"
version = "0.5.2"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"

View File

@ -2,8 +2,8 @@ use alloy_rpc_types::Log;
use alloy_sol_types::{sol, SolEvent};
use kinode_process_lib::eth::{EthAddress, EthSubEvent, SubscribeLogsRequest};
use kinode_process_lib::{
await_message, get_typed_state, http, print_to_terminal, println, set_state, Address,
LazyLoadBlob, Message, Request, Response,
await_message, get_typed_state, http, print_to_terminal, println, set_state, Address, Message,
Request, Response,
};
use serde::{Deserialize, Serialize};
use std::collections::hash_map::{Entry, HashMap};
@ -31,6 +31,16 @@ struct State {
block: u64,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum IndexerRequests {
/// return the human readable name for a namehash
/// returns an Option<String>
NamehashToName(String),
/// return the most recent on-chain routing information for a node name.
/// returns an Option<KnsUpdate>
NodeInfo(String),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NetActions {
KnsUpdate(KnsUpdate),
@ -166,129 +176,118 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
continue;
};
if source.process == "http_server:distro:sys" {
if let Ok(body_json) = serde_json::from_slice::<serde_json::Value>(&body) {
if body_json["path"].as_str().unwrap_or_default() == "/node/:name" {
if let Some(name) = body_json["url_params"]["name"].as_str() {
if let Some(node) = state.nodes.get(name) {
Response::new()
.body(serde_json::to_vec(&http::HttpResponse {
status: 200,
headers: HashMap::from([(
"Content-Type".to_string(),
"application/json".to_string(),
)]),
})?)
.blob(LazyLoadBlob {
mime: Some("application/json".to_string()),
bytes: serde_json::to_string(&node)?.as_bytes().to_vec(),
})
.send()?;
continue;
}
}
if source.process == "eth:distro:sys" {
handle_eth_message(&our, &mut state, &body)?;
} else {
let Ok(request) = serde_json::from_slice::<IndexerRequests>(&body) else {
println!("kns_indexer: got invalid message");
continue;
};
match request {
IndexerRequests::NamehashToName(namehash) => {
let response = Response::new()
.body(serde_json::to_vec(&state.names.get(&namehash))?)
.send()?;
}
}
Response::new()
.body(serde_json::to_vec(&http::HttpResponse {
status: 404,
headers: HashMap::from([(
"Content-Type".to_string(),
"application/json".to_string(),
)]),
})?)
.send()?;
continue;
}
let Ok(msg) = serde_json::from_slice::<EthSubEvent>(&body) else {
println!("kns_indexer: got invalid message");
continue;
};
match msg {
EthSubEvent::Log(log) => {
state.block = log.block_number.expect("expect").to::<u64>();
let node_id: alloy_primitives::FixedBytes<32> = log.topics[1];
let name = match state.names.entry(node_id.to_string()) {
Entry::Occupied(o) => o.into_mut(),
Entry::Vacant(v) => v.insert(get_name(&log)),
};
let node = state
.nodes
.entry(name.to_string())
.or_insert_with(|| KnsUpdate::new(name, &node_id.to_string()));
let mut send = true;
match log.topics[0] {
KeyUpdate::SIGNATURE_HASH => {
node.public_key = KeyUpdate::abi_decode_data(&log.data, true)
.unwrap()
.0
.to_string();
}
IpUpdate::SIGNATURE_HASH => {
let ip = IpUpdate::abi_decode_data(&log.data, true).unwrap().0;
node.ip = format!(
"{}.{}.{}.{}",
(ip >> 24) & 0xFF,
(ip >> 16) & 0xFF,
(ip >> 8) & 0xFF,
ip & 0xFF
);
// when we get ip data, we should delete any router data,
// since the assignment of ip indicates an direct node
node.routers = vec![];
}
WsUpdate::SIGNATURE_HASH => {
node.port = WsUpdate::abi_decode_data(&log.data, true).unwrap().0;
// when we get port data, we should delete any router data,
// since the assignment of port indicates an direct node
node.routers = vec![];
}
RoutingUpdate::SIGNATURE_HASH => {
node.routers = RoutingUpdate::abi_decode_data(&log.data, true)
.unwrap()
.0
.iter()
.map(|r| r.to_string())
.collect::<Vec<String>>();
// when we get routing data, we should delete any ws/ip data,
// since the assignment of routers indicates an indirect node
node.ip = "".to_string();
node.port = 0;
}
_ => {
send = false;
}
}
if node.public_key != ""
&& ((node.ip != "" && node.port != 0) || node.routers.len() > 0)
&& send
{
print_to_terminal(
1,
&format!(
"kns_indexer: sending ID to net: {node:?} (blocknum {})",
state.block
),
);
Request::new()
.target((&our.node, "net", "distro", "sys"))
.try_body(NetActions::KnsUpdate(node.clone()))?
IndexerRequests::NodeInfo(name) => {
let response = Response::new()
.body(serde_json::to_vec(&state.nodes.get(&name))?)
.send()?;
}
}
}
set_state(&bincode::serialize(&state)?);
}
}
fn handle_eth_message(our: &Address, state: &mut State, body: &[u8]) -> anyhow::Result<()> {
let Ok(msg) = serde_json::from_slice::<EthSubEvent>(body) else {
return Err(anyhow::anyhow!("kns_indexer: got invalid message"));
};
match msg {
EthSubEvent::Log(log) => {
state.block = log.block_number.expect("expect").to::<u64>();
let node_id: alloy_primitives::FixedBytes<32> = log.topics[1];
let name = match state.names.entry(node_id.to_string()) {
Entry::Occupied(o) => o.into_mut(),
Entry::Vacant(v) => v.insert(get_name(&log)),
};
let node = state
.nodes
.entry(name.to_string())
.or_insert_with(|| KnsUpdate::new(name, &node_id.to_string()));
let mut send = true;
match log.topics[0] {
KeyUpdate::SIGNATURE_HASH => {
node.public_key = KeyUpdate::abi_decode_data(&log.data, true)
.unwrap()
.0
.to_string();
}
IpUpdate::SIGNATURE_HASH => {
let ip = IpUpdate::abi_decode_data(&log.data, true).unwrap().0;
node.ip = format!(
"{}.{}.{}.{}",
(ip >> 24) & 0xFF,
(ip >> 16) & 0xFF,
(ip >> 8) & 0xFF,
ip & 0xFF
);
// when we get ip data, we should delete any router data,
// since the assignment of ip indicates an direct node
node.routers = vec![];
}
WsUpdate::SIGNATURE_HASH => {
node.port = WsUpdate::abi_decode_data(&log.data, true).unwrap().0;
// when we get port data, we should delete any router data,
// since the assignment of port indicates an direct node
node.routers = vec![];
}
RoutingUpdate::SIGNATURE_HASH => {
node.routers = RoutingUpdate::abi_decode_data(&log.data, true)
.unwrap()
.0
.iter()
.map(|r| r.to_string())
.collect::<Vec<String>>();
// when we get routing data, we should delete any ws/ip data,
// since the assignment of routers indicates an indirect node
node.ip = "".to_string();
node.port = 0;
}
_ => {
send = false;
}
}
if node.public_key != ""
&& ((node.ip != "" && node.port != 0) || node.routers.len() > 0)
&& send
{
print_to_terminal(
1,
&format!(
"kns_indexer: sending ID to net: {node:?} (blocknum {})",
state.block
),
);
Request::new()
.target((&our.node, "net", "distro", "sys"))
.try_body(NetActions::KnsUpdate(node.clone()))?
.send()?;
}
}
}
set_state(&bincode::serialize(state)?);
Ok(())
}
fn get_name(log: &Log) -> String {
let decoded = NodeRegistered::abi_decode_data(&log.data, true).unwrap();
let name = match dnswire_decode(decoded.0.clone()) {