Merge pull request #214 from kinode-dao/dr/kns-indexer-add-pending-resolver

kns_indexer: add pending_requests map to only respond to discrete que…
This commit is contained in:
doria 2024-01-29 14:39:43 -03:00 committed by GitHub
commit 9a500bcf81
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2,11 +2,14 @@ use alloy_rpc_types::Log;
use alloy_sol_types::{sol, SolEvent};
use kinode_process_lib::eth::{EthAddress, EthSubEvent, SubscribeLogsRequest};
use kinode_process_lib::{
await_message, get_typed_state, http, print_to_terminal, println, set_state, Address, Message,
await_message, get_typed_state, print_to_terminal, println, set_state, Address, Message,
Request, Response,
};
use serde::{Deserialize, Serialize};
use std::collections::hash_map::{Entry, HashMap};
use std::collections::{
hash_map::{Entry, HashMap},
BTreeMap,
};
use std::str::FromStr;
use std::string::FromUtf8Error;
@ -31,14 +34,21 @@ struct State {
block: u64,
}
/// IndexerRequests are used to query discrete information from the indexer
/// for example, if you want to know the human readable name for a namehash,
/// you would send a NamehashToName request.
/// If you want to know the most recent on-chain routing information for a
/// human readable name, you would send a NodeInfo request.
/// The block parameter specifies the recency of the data: the indexer will
/// not respond until it has processed events up to the specified block.
#[derive(Debug, Serialize, Deserialize)]
pub enum IndexerRequests {
/// return the human readable name for a namehash
/// returns an Option<String>
NamehashToName(String),
NamehashToName { hash: String, block: u64 },
/// return the most recent on-chain routing information for a node name.
/// returns an Option<KnsUpdate>
NodeInfo(String),
NodeInfo { name: String, block: u64 },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -163,7 +173,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
])
.send()?;
http::bind_http_path("/node/:name", false, false)?;
let mut pending_requests: BTreeMap<u64, Vec<IndexerRequests>> = BTreeMap::new();
loop {
let Ok(message) = await_message() else {
@ -171,13 +181,13 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
continue;
};
let Message::Request { source, body, .. } = message else {
// TODO we should store the subscription ID for eth
// incase we want to cancel/reset it
// TODO we could store the subscription ID for eth
// in case we want to cancel/reset it
continue;
};
if source.process == "eth:distro:sys" {
handle_eth_message(&our, &mut state, &body)?;
handle_eth_message(&our, &mut state, &mut pending_requests, &body)?;
} else {
let Ok(request) = serde_json::from_slice::<IndexerRequests>(&body) else {
println!("kns_indexer: got invalid message");
@ -185,22 +195,41 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
};
match request {
IndexerRequests::NamehashToName(namehash) => {
let response = Response::new()
.body(serde_json::to_vec(&state.names.get(&namehash))?)
IndexerRequests::NamehashToName { ref hash, block } => {
if block <= state.block {
Response::new()
.body(serde_json::to_vec(&state.names.get(hash))?)
.send()?;
} else {
pending_requests
.entry(block)
.or_insert(vec![])
.push(request);
}
IndexerRequests::NodeInfo(name) => {
let response = Response::new()
.body(serde_json::to_vec(&state.nodes.get(&name))?)
}
IndexerRequests::NodeInfo { ref name, block } => {
if block <= state.block {
Response::new()
.body(serde_json::to_vec(&state.nodes.get(name))?)
.send()?;
} else {
pending_requests
.entry(block)
.or_insert(vec![])
.push(request);
}
}
}
}
}
}
fn handle_eth_message(our: &Address, state: &mut State, body: &[u8]) -> anyhow::Result<()> {
fn handle_eth_message(
our: &Address,
state: &mut State,
pending_requests: &mut BTreeMap<u64, Vec<IndexerRequests>>,
body: &[u8],
) -> anyhow::Result<()> {
let Ok(msg) = serde_json::from_slice::<EthSubEvent>(body) else {
return Err(anyhow::anyhow!("kns_indexer: got invalid message"));
};
@ -284,6 +313,36 @@ fn handle_eth_message(our: &Address, state: &mut State, body: &[u8]) -> anyhow::
}
}
}
// check the pending_requests btreemap to see if there are any requests that
// can be handled now that the state block has been updated
let mut blocks_to_remove = vec![];
for (block, requests) in pending_requests.iter() {
if *block <= state.block {
for request in requests.iter() {
match request {
IndexerRequests::NamehashToName { hash, .. } => {
Response::new()
.body(serde_json::to_vec(&state.names.get(hash))?)
.send()
.unwrap();
}
IndexerRequests::NodeInfo { name, .. } => {
Response::new()
.body(serde_json::to_vec(&state.nodes.get(name))?)
.send()
.unwrap();
}
}
}
blocks_to_remove.push(*block);
} else {
break;
}
}
for block in blocks_to_remove.iter() {
pending_requests.remove(block);
}
set_state(&bincode::serialize(state)?);
Ok(())
}