app_store: delay kns queries by 5s to allow kns time to process block

This commit is contained in:
hosted-fornet 2024-08-21 20:01:42 -07:00
parent f0a66d0564
commit d72462dfe3
3 changed files with 29 additions and 21 deletions

View File

@ -12,7 +12,7 @@ use alloy_sol_types::SolEvent;
use kinode::process::chain::ChainResponses;
use kinode_process_lib::{
await_message, call_init, eth, get_blob, get_state, http, kernel_types as kt, kimap,
print_to_terminal, println, Address, Message, PackageId, Request, Response,
print_to_terminal, println, timer, Address, Message, PackageId, Request, Response,
};
use std::{
collections::{HashMap, HashSet},
@ -45,6 +45,8 @@ const KIMAP_FIRST_BLOCK: u64 = kimap::KIMAP_FIRST_BLOCK;
#[cfg(feature = "simulation-mode")]
const KIMAP_FIRST_BLOCK: u64 = 1;
const DELAY_MS: u64 = 5_000;
#[derive(Debug, Serialize, Deserialize)]
pub struct State {
/// the kimap helper we are using
@ -106,7 +108,18 @@ fn init(our: Address) {
}
fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow::Result<()> {
if message.is_request() {
if !message.is_request() {
if message.is_local(&our) && message.source().process == "timer:distro:sys" {
// handling of ETH RPC subscriptions delayed by DELAY_MS
// to allow kns to have a chance to process block: handle now
let Some(context) = message.context() else {
return Err(anyhow::anyhow!("foo"));
};
let log = serde_json::from_slice(context)?;
handle_eth_log(our, state, log)?;
return Ok(());
}
} else {
let req: Req = serde_json::from_slice(message.body())?;
match req {
Req::Eth(eth_result) => {
@ -118,8 +131,10 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow
}
if let Ok(eth::EthSub { result, .. }) = eth_result {
if let eth::SubscriptionResult::Log(log) = result {
handle_eth_log(our, state, *log)?;
if let eth::SubscriptionResult::Log(ref log) = result {
// delay handling of ETH RPC subscriptions by DELAY_MS
// to allow kns to have a chance to process block
timer::set_timer(DELAY_MS, Some(serde_json::to_vec(log)?));
}
} else {
// attempt to resubscribe
@ -131,21 +146,15 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow
}
}
Req::Request(chains) => {
handle_local_request(our, state, chains)?;
handle_local_request(state, chains)?;
}
}
} else {
return Err(anyhow::anyhow!("not a request"));
}
Ok(())
}
fn handle_local_request(
our: &Address,
state: &mut State,
req: ChainRequests,
) -> anyhow::Result<()> {
fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result<()> {
match req {
ChainRequests::GetApp(package_id) => {
let onchain_app = state
@ -265,9 +274,7 @@ fn handle_eth_log(our: &Address, state: &mut State, log: eth::Log) -> anyhow::Re
// if ~metadata-uri is also empty, this is an unpublish action!
if metadata_uri.is_empty() {
state.published.remove(&package_id);
if is_our_package {
state.listings.remove(&package_id);
}
state.listings.remove(&package_id);
return Ok(());
}
return Err(anyhow::anyhow!("metadata hash not found"));

View File

@ -49,7 +49,8 @@
"kns_indexer:kns_indexer:sys",
"vfs:distro:sys",
"http_client:distro:sys",
"eth:distro:sys"
"eth:distro:sys",
"timer:distro:sys"
],
"public": false
},
@ -98,4 +99,4 @@
],
"public": false
}
]
]

View File

@ -401,6 +401,10 @@ fn handle_log(
pending_notes: &mut BTreeMap<u64, Vec<(kimap::contract::Note, u8)>>,
log: &eth::Log,
) -> anyhow::Result<()> {
if let Some(block) = log.block_number {
state.last_block = block;
}
match log.topics()[0] {
kimap::contract::Mint::SIGNATURE_HASH => {
let decoded = kimap::contract::Mint::decode_log_data(log.data(), true).unwrap();
@ -459,10 +463,6 @@ fn handle_log(
}
};
if let Some(block) = log.block_number {
state.last_block = block;
}
Ok(())
}