From f8a7e8fc5399ec2d201fbe28fabb8aefa5f9fcd3 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Wed, 21 Aug 2024 08:14:36 -0700 Subject: [PATCH 1/8] kns: sub for notes --- kinode/packages/kns_indexer/kns_indexer/src/lib.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index aef9788e..8b32e25c 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -531,6 +531,12 @@ fn handle_log( } }, } + + if !state.listening_newblocks && !pending_notes.is_empty() { + print_to_terminal(0, "subscribing to newHeads..."); + listen_to_new_blocks_loop(); // sub_id: 3 + state.listening_newblocks = true; + } } } _log => { From fe50f314d96d0c2e9f52fe6fda50d42b4eef0a07 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Wed, 21 Aug 2024 09:48:49 -0700 Subject: [PATCH 2/8] add wip --- kinode/src/eth/mod.rs | 7 +- kinode/src/eth/subscription.rs | 149 +++++++++++++++++++++------------ 2 files changed, 98 insertions(+), 58 deletions(-) diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index 77812ea2..f6c67d85 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -100,7 +100,7 @@ type ResponseChannels = Arc>; #[derive(Debug)] enum ActiveSub { - Local(JoinHandle<()>), + Local((tokio::sync::mpsc::Sender, JoinHandle<()>)), Remote { provider_node: String, handle: JoinHandle<()>, @@ -111,8 +111,9 @@ enum ActiveSub { impl ActiveSub { async fn close(&self, sub_id: u64, state: &ModuleState) { match self { - ActiveSub::Local(handle) => { - handle.abort(); + ActiveSub::Local((close_sender, _handle)) => { + close_sender.send(true).await.unwrap(); + //handle.abort(); } ActiveSub::Remote { provider_node, diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 21e78021..7a8ed7da 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -1,4 +1,5 @@ use crate::eth::*; +use alloy::primitives::{B256, U256}; use alloy::pubsub::RawSubscription; use alloy::rpc::types::eth::pubsub::SubscriptionResult; @@ -70,40 +71,51 @@ pub async fn create_new_subscription( let send_to_loop = send_to_loop.clone(); let print_tx = print_tx.clone(); let active_subscriptions = active_subscriptions.clone(); + let providers = providers.clone(); + let (close_sender, close_receiver) = tokio::sync::mpsc::channel(1); match maybe_raw_sub { - Ok(rx) => { + Ok((rx, chain_id)) => { subs.insert( sub_id, // this is a local sub, as in, we connect to the rpc endpoint - ActiveSub::Local(tokio::spawn(async move { - // await the subscription error and kill it if so - let e = maintain_local_subscription( - &our, - sub_id, - rx, - &target, - &rsvp, - &send_to_loop, - &active_subscriptions, - ) - .await; - verbose_print( - &print_tx, - &format!("eth: closed local subscription due to error {e:?}"), - ) - .await; - kernel_message( - &our, - rand::random(), - target.clone(), - rsvp, - true, - None, - EthSubResult::Err(e), - &send_to_loop, - ) - .await; - })), + ActiveSub::Local(( + close_sender, + tokio::spawn(async move { + // await the subscription error and kill it if so + let r = maintain_local_subscription( + &our, + sub_id, + rx, + &target, + &rsvp, + &send_to_loop, + &active_subscriptions, + chain_id, + &providers, + close_receiver, + ) + .await; + let Err(e) = r else { + return; + }; + verbose_print( + &print_tx, + &format!("eth: closed local subscription due to error {e:?}"), + ) + .await; + kernel_message( + &our, + rand::random(), + target.clone(), + rsvp, + true, + None, + EthSubResult::Err(e), + &send_to_loop, + ) + .await; + }), + )), ); } Err((provider_node, remote_sub_id)) => { @@ -169,7 +181,7 @@ async fn build_subscription( providers: &Providers, response_channels: &ResponseChannels, print_tx: &PrintSender, -) -> Result, EthError> { +) -> Result, EthError> { let EthAction::SubscribeLogs { chain_id, kind, @@ -244,7 +256,7 @@ async fn build_subscription( ) .await; } - return Ok(Ok(rx)); + return Ok(Ok((rx, chain_id))); } Err(rpc_error) => { verbose_print( @@ -367,38 +379,65 @@ async fn maintain_local_subscription( rsvp: &Option
, send_to_loop: &MessageSender, active_subscriptions: &ActiveSubscriptions, -) -> EthSubError { - while let Ok(value) = rx.recv().await { - let result: SubscriptionResult = match serde_json::from_str(value.get()) { - Ok(res) => res, - Err(e) => { - return EthSubError { - id: sub_id, - error: e.to_string(), + chain_id: u64, + providers: &Providers, + mut close_receiver: tokio::sync::mpsc::Receiver, +) -> Result<(), EthSubError> { + loop { + tokio::select! { + _ = close_receiver.recv() => { + let alloy_sub_id = rx.local_id(); + let alloy_sub_id = alloy_sub_id.into(); + //let alloy_sub_id = rx.local_id().; + //let alloy_sub_id = alloy::primitives::U256::from(alloy_sub_id.clone()); + //let alloy_sub_id: alloy::primitives::Uint<256, 4> = alloy::primitives::Uint::from(alloy_sub_id); + let Some(chain_providers) = providers.get_mut(&chain_id) else { + return Ok(()); //? + }; + for url in chain_providers.urls { + let Some(pubsub) = url.pubsub else { + continue; + }; + pubsub.unsubscribe(alloy_sub_id); } - } - }; - kernel_message( - our, - rand::random(), - target.clone(), - rsvp.clone(), - true, - None, - EthSubResult::Ok(EthSub { id: sub_id, result }), - &send_to_loop, - ) - .await; + return Ok(()); + }, + value = rx.recv() => { + let Ok(value) = value else { + break; + }; + let result: SubscriptionResult = match serde_json::from_str(value.get()) { + Ok(res) => res, + Err(e) => { + return Err(EthSubError { + id: sub_id, + error: e.to_string(), + }); + } + }; + kernel_message( + our, + rand::random(), + target.clone(), + rsvp.clone(), + true, + None, + EthSubResult::Ok(EthSub { id: sub_id, result }), + &send_to_loop, + ) + .await; + }, + } } active_subscriptions .entry(target.clone()) .and_modify(|sub_map| { sub_map.remove(&sub_id); }); - EthSubError { + Err(EthSubError { id: sub_id, error: "subscription closed unexpectedly".to_string(), - } + }) } /// handle the subscription updates from a remote provider, From 9c20cf5c7c1411fc2ae52006805b8a798249fe36 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 21 Aug 2024 16:49:14 +0000 Subject: [PATCH 3/8] Format Rust code using rustfmt --- kinode/src/eth/subscription.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 7a8ed7da..cdc0bd46 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -100,7 +100,9 @@ pub async fn create_new_subscription( }; verbose_print( &print_tx, - &format!("eth: closed local subscription due to error {e:?}"), + &format!( + "eth: closed local subscription due to error {e:?}" + ), ) .await; kernel_message( From e1106b40973a62f5aaf9463135a96f389f98b278 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Wed, 21 Aug 2024 20:01:45 +0300 Subject: [PATCH 4/8] eth: borrow fixes unsubscribe --- kinode/src/eth/subscription.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index cdc0bd46..d78eac99 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -1,5 +1,4 @@ use crate::eth::*; -use alloy::primitives::{B256, U256}; use alloy::pubsub::RawSubscription; use alloy::rpc::types::eth::pubsub::SubscriptionResult; @@ -389,18 +388,16 @@ async fn maintain_local_subscription( tokio::select! { _ = close_receiver.recv() => { let alloy_sub_id = rx.local_id(); - let alloy_sub_id = alloy_sub_id.into(); - //let alloy_sub_id = rx.local_id().; - //let alloy_sub_id = alloy::primitives::U256::from(alloy_sub_id.clone()); - //let alloy_sub_id: alloy::primitives::Uint<256, 4> = alloy::primitives::Uint::from(alloy_sub_id); + let alloy_sub_id = alloy_sub_id.clone().into(); let Some(chain_providers) = providers.get_mut(&chain_id) else { return Ok(()); //? }; - for url in chain_providers.urls { - let Some(pubsub) = url.pubsub else { + for url in chain_providers.urls.iter() { + let Some(pubsub) = url.pubsub.as_ref() else { continue; }; - pubsub.unsubscribe(alloy_sub_id); + let x = pubsub.unsubscribe(alloy_sub_id); + println!("we just tried unsubscribing unsubscribed: {:?}", x); } return Ok(()); }, From db9eda4d2e0211485bc1b9bdac0c5b68ef9b58f2 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Wed, 21 Aug 2024 15:28:14 -0700 Subject: [PATCH 5/8] add some prints (and unsub in another case) --- Cargo.lock | 342 +++++++++++++++--- kinode/Cargo.toml | 3 +- kinode/packages/app_store/chain/src/lib.rs | 2 + .../packages/app_store/ui/package-lock.json | 2 + .../kns_indexer/kns_indexer/src/lib.rs | 18 +- kinode/src/eth/mod.rs | 6 +- kinode/src/eth/subscription.rs | 35 +- lib/Cargo.toml | 3 +- 8 files changed, 335 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bd147683..760f5844 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,21 +97,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ba1c79677c9ce51c8d45e20845b05e6fb070ea2c863fba03ad6af2c778474bd" dependencies = [ "alloy-consensus 0.1.4", - "alloy-contract", "alloy-core", "alloy-eips 0.1.4", "alloy-genesis 0.1.4", "alloy-json-rpc 0.1.4", - "alloy-network", - "alloy-provider", - "alloy-pubsub", - "alloy-rpc-client", + "alloy-provider 0.1.4", + "alloy-rpc-client 0.1.4", "alloy-rpc-types 0.1.4", "alloy-serde 0.1.4", - "alloy-signer", + "alloy-transport-http 0.1.4", +] + +[[package]] +name = "alloy" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-consensus 0.2.1", + "alloy-contract", + "alloy-core", + "alloy-eips 0.2.1", + "alloy-genesis 0.2.1", + "alloy-json-rpc 0.2.1", + "alloy-network 0.2.1", + "alloy-provider 0.2.1", + "alloy-pubsub", + "alloy-rpc-client 0.2.1", + "alloy-rpc-types 0.2.1", + "alloy-serde 0.2.1", + "alloy-signer 0.2.1", "alloy-signer-local", - "alloy-transport 0.1.4", - "alloy-transport-http", + "alloy-transport 0.2.1", + "alloy-transport-http 0.2.1", "alloy-transport-ws", ] @@ -153,21 +170,34 @@ dependencies = [ "serde", ] +[[package]] +name = "alloy-consensus" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-eips 0.2.1", + "alloy-primitives", + "alloy-rlp", + "alloy-serde 0.2.1", + "c-kzg", + "serde", +] + [[package]] name = "alloy-contract" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dc6957ff706f9e5f6fd42f52a93e4bce476b726c92d077b348de28c4a76730c" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" dependencies = [ "alloy-dyn-abi", "alloy-json-abi", - "alloy-network", + "alloy-network 0.2.1", + "alloy-network-primitives", "alloy-primitives", - "alloy-provider", + "alloy-provider 0.2.1", "alloy-pubsub", - "alloy-rpc-types-eth", + "alloy-rpc-types-eth 0.2.1", "alloy-sol-types", - "alloy-transport 0.1.4", + "alloy-transport 0.2.1", "futures", "futures-util", "thiserror", @@ -230,6 +260,20 @@ dependencies = [ "sha2", ] +[[package]] +name = "alloy-eips" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-primitives", + "alloy-rlp", + "alloy-serde 0.2.1", + "c-kzg", + "once_cell", + "serde", + "sha2", +] + [[package]] name = "alloy-genesis" version = "0.1.0" @@ -251,6 +295,16 @@ dependencies = [ "serde", ] +[[package]] +name = "alloy-genesis" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-primitives", + "alloy-serde 0.2.1", + "serde", +] + [[package]] name = "alloy-json-abi" version = "0.7.7" @@ -288,6 +342,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "alloy-json-rpc" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-primitives", + "alloy-sol-types", + "serde", + "serde_json", + "thiserror", + "tracing", +] + [[package]] name = "alloy-network" version = "0.1.4" @@ -298,9 +365,9 @@ dependencies = [ "alloy-eips 0.1.4", "alloy-json-rpc 0.1.4", "alloy-primitives", - "alloy-rpc-types-eth", + "alloy-rpc-types-eth 0.1.4", "alloy-serde 0.1.4", - "alloy-signer", + "alloy-signer 0.1.4", "alloy-sol-types", "async-trait", "auto_impl", @@ -308,6 +375,36 @@ dependencies = [ "thiserror", ] +[[package]] +name = "alloy-network" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-consensus 0.2.1", + "alloy-eips 0.2.1", + "alloy-json-rpc 0.2.1", + "alloy-network-primitives", + "alloy-primitives", + "alloy-rpc-types-eth 0.2.1", + "alloy-serde 0.2.1", + "alloy-signer 0.2.1", + "alloy-sol-types", + "async-trait", + "auto_impl", + "futures-utils-wasm", + "thiserror", +] + +[[package]] +name = "alloy-network-primitives" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-primitives", + "alloy-serde 0.2.1", + "serde", +] + [[package]] name = "alloy-primitives" version = "0.7.7" @@ -340,18 +437,16 @@ dependencies = [ "alloy-consensus 0.1.4", "alloy-eips 0.1.4", "alloy-json-rpc 0.1.4", - "alloy-network", + "alloy-network 0.1.4", "alloy-primitives", - "alloy-pubsub", - "alloy-rpc-client", - "alloy-rpc-types-eth", + "alloy-rpc-client 0.1.4", + "alloy-rpc-types-eth 0.1.4", "alloy-transport 0.1.4", - "alloy-transport-http", - "alloy-transport-ws", + "alloy-transport-http 0.1.4", "async-stream", "async-trait", "auto_impl", - "dashmap", + "dashmap 5.5.3", "futures", "futures-utils-wasm", "lru", @@ -365,14 +460,48 @@ dependencies = [ ] [[package]] -name = "alloy-pubsub" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a7341322d9bc0e49f6e9fd9f2eb8e30f73806f2dd12cbb3d6bab2694c921f87" +name = "alloy-provider" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" dependencies = [ - "alloy-json-rpc 0.1.4", + "alloy-chains", + "alloy-consensus 0.2.1", + "alloy-eips 0.2.1", + "alloy-json-rpc 0.2.1", + "alloy-network 0.2.1", + "alloy-network-primitives", "alloy-primitives", - "alloy-transport 0.1.4", + "alloy-pubsub", + "alloy-rpc-client 0.2.1", + "alloy-rpc-types-eth 0.2.1", + "alloy-transport 0.2.1", + "alloy-transport-http 0.2.1", + "alloy-transport-ws", + "async-stream", + "async-trait", + "auto_impl", + "dashmap 6.0.1", + "futures", + "futures-utils-wasm", + "lru", + "pin-project", + "reqwest 0.12.5", + "serde", + "serde_json", + "thiserror", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "alloy-pubsub" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-json-rpc 0.2.1", + "alloy-primitives", + "alloy-transport 0.2.1", "bimap", "futures", "serde", @@ -412,10 +541,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ba31bae67773fd5a60020bea900231f8396202b7feca4d0c70c6b59308ab4a8" dependencies = [ "alloy-json-rpc 0.1.4", + "alloy-transport 0.1.4", + "alloy-transport-http 0.1.4", + "futures", + "pin-project", + "reqwest 0.12.5", + "serde", + "serde_json", + "tokio", + "tokio-stream", + "tower", + "tracing", + "url", +] + +[[package]] +name = "alloy-rpc-client" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-json-rpc 0.2.1", "alloy-primitives", "alloy-pubsub", - "alloy-transport 0.1.4", - "alloy-transport-http", + "alloy-transport 0.2.1", + "alloy-transport-http 0.2.1", "alloy-transport-ws", "futures", "pin-project", @@ -453,10 +602,20 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "184a7a42c7ba9141cc9e76368356168c282c3bc3d9e5d78f3556bdfe39343447" dependencies = [ - "alloy-rpc-types-eth", + "alloy-rpc-types-eth 0.1.4", "alloy-serde 0.1.4", ] +[[package]] +name = "alloy-rpc-types" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-rpc-types-eth 0.2.1", + "alloy-serde 0.2.1", + "serde", +] + [[package]] name = "alloy-rpc-types-eth" version = "0.1.4" @@ -475,6 +634,24 @@ dependencies = [ "thiserror", ] +[[package]] +name = "alloy-rpc-types-eth" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-consensus 0.2.1", + "alloy-eips 0.2.1", + "alloy-network-primitives", + "alloy-primitives", + "alloy-rlp", + "alloy-serde 0.2.1", + "alloy-sol-types", + "itertools 0.13.0", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "alloy-serde" version = "0.1.0" @@ -496,6 +673,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "alloy-serde" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-primitives", + "serde", + "serde_json", +] + [[package]] name = "alloy-signer" version = "0.1.4" @@ -511,15 +698,27 @@ dependencies = [ ] [[package]] -name = "alloy-signer-local" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dfc9c26fe6c6f1bad818c9a976de9044dd12e1f75f1f156a801ee3e8148c1b6" +name = "alloy-signer" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" dependencies = [ - "alloy-consensus 0.1.4", - "alloy-network", "alloy-primitives", - "alloy-signer", + "async-trait", + "auto_impl", + "elliptic-curve", + "k256", + "thiserror", +] + +[[package]] +name = "alloy-signer-local" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-consensus 0.2.1", + "alloy-network 0.2.1", + "alloy-primitives", + "alloy-signer 0.2.1", "async-trait", "k256", "rand 0.8.5", @@ -636,6 +835,24 @@ dependencies = [ "url", ] +[[package]] +name = "alloy-transport" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-json-rpc 0.2.1", + "base64 0.22.1", + "futures-util", + "futures-utils-wasm", + "serde", + "serde_json", + "thiserror", + "tokio", + "tower", + "tracing", + "url", +] + [[package]] name = "alloy-transport-http" version = "0.1.4" @@ -651,14 +868,27 @@ dependencies = [ "url", ] +[[package]] +name = "alloy-transport-http" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-json-rpc 0.2.1", + "alloy-transport 0.2.1", + "reqwest 0.12.5", + "serde_json", + "tower", + "tracing", + "url", +] + [[package]] name = "alloy-transport-ws" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aec83fd052684556c78c54df111433493267234d82321c2236560c752f595f20" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" dependencies = [ "alloy-pubsub", - "alloy-transport 0.1.4", + "alloy-transport 0.2.1", "futures", "http 1.1.0", "rustls", @@ -1878,6 +2108,20 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "dashmap" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -3285,7 +3529,7 @@ name = "kinode" version = "0.9.0" dependencies = [ "aes-gcm", - "alloy", + "alloy 0.2.1", "alloy-primitives", "alloy-sol-macro", "alloy-sol-types", @@ -3296,7 +3540,7 @@ dependencies = [ "chrono", "clap", "crossterm", - "dashmap", + "dashmap 5.5.3", "flate2", "futures", "generic-array", @@ -3372,7 +3616,7 @@ name = "kinode_process_lib" version = "0.9.0" source = "git+https://github.com/kinode-dao/process_lib?tag=v0.9.0#284f202376b3cd3ce0c03aa660a006fc6187f236" dependencies = [ - "alloy", + "alloy 0.1.4", "alloy-primitives", "alloy-sol-macro", "alloy-sol-types", @@ -3394,7 +3638,7 @@ name = "kinode_process_lib" version = "0.9.0" source = "git+https://github.com/kinode-dao/process_lib?branch=develop#5c1d8ed36cf10688808c09357ef0e43225396097" dependencies = [ - "alloy", + "alloy 0.1.4", "alloy-primitives", "alloy-sol-macro", "alloy-sol-types", @@ -3513,7 +3757,7 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" name = "lib" version = "0.9.0" dependencies = [ - "alloy", + "alloy 0.2.1", "kit 0.6.8", "lazy_static", "rand 0.8.5", diff --git a/kinode/Cargo.toml b/kinode/Cargo.toml index c0e15786..6f78be02 100644 --- a/kinode/Cargo.toml +++ b/kinode/Cargo.toml @@ -26,7 +26,8 @@ simulation-mode = [] [dependencies] aes-gcm = "0.10.3" -alloy = { version = "0.1.3", features = [ +#alloy = { version = "0.1.3", features = [ +alloy = { git = "https://github.com/bitful-pannul/alloy.git", rev = "c73e70d", features = [ "consensus", "contract", "json-rpc", diff --git a/kinode/packages/app_store/chain/src/lib.rs b/kinode/packages/app_store/chain/src/lib.rs index 61d0b1d4..ca7ae7d3 100644 --- a/kinode/packages/app_store/chain/src/lib.rs +++ b/kinode/packages/app_store/chain/src/lib.rs @@ -123,6 +123,7 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow } } else { // attempt to resubscribe + println!("attempting resub"); state .kimap .provider @@ -341,6 +342,7 @@ pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State) { let filter = app_store_filter(state); // get past logs, subscribe to new ones. // subscribe first so we don't miss any logs + println!("subscribing..."); state.kimap.provider.subscribe_loop(1, filter.clone()); for log in fetch_logs( &state.kimap.provider, diff --git a/kinode/packages/app_store/ui/package-lock.json b/kinode/packages/app_store/ui/package-lock.json index 54d87528..f65b58a3 100644 --- a/kinode/packages/app_store/ui/package-lock.json +++ b/kinode/packages/app_store/ui/package-lock.json @@ -3749,6 +3749,8 @@ }, "node_modules/@parcel/watcher-wasm/node_modules/napi-wasm": { "version": "1.1.0", + "resolved": "https://registry.npmjs.org/napi-wasm/-/napi-wasm-1.1.0.tgz", + "integrity": "sha512-lHwIAJbmLSjF9VDRm9GoVOy9AGp3aIvkjv+Kvz9h16QR3uSVYH78PNQUnT2U4X53mhlnV2M7wrhibQ3GHicDmg==", "inBundle": true, "license": "MIT" }, diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 8b32e25c..ce61063a 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -223,10 +223,8 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { } } - if !state.listening_newblocks - && (!pending_requests.is_empty() || !pending_notes.is_empty()) - { - print_to_terminal(0, "subscribing to newHeads..."); + if !state.listening_newblocks && !pending_requests.is_empty() { + print_to_terminal(0, "subscribing to newHeads for req..."); listen_to_new_blocks_loop(); // sub_id: 3 state.listening_newblocks = true; } @@ -269,6 +267,7 @@ fn handle_eth_message( } else if e.id == 2 { eth_provider.subscribe_loop(2, notes_filter.clone()); } else if e.id == 3 { + print_to_terminal(0, "subscribing to newHeads for retry..."); listen_to_new_blocks_loop(); } } @@ -527,16 +526,15 @@ fn handle_log( .entry(block_number) .or_default() .push((decoded, 0)); + if !state.listening_newblocks { + print_to_terminal(0, "subscribing to newHeads for note..."); + listen_to_new_blocks_loop(); // sub_id: 3 + state.listening_newblocks = true; + } } } }, } - - if !state.listening_newblocks && !pending_notes.is_empty() { - print_to_terminal(0, "subscribing to newHeads..."); - listen_to_new_blocks_loop(); // sub_id: 3 - state.listening_newblocks = true; - } } } _log => { diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index f6c67d85..d8b50ed3 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -507,13 +507,15 @@ async fn handle_eth_action( verbose_print( &state.print_tx, &format!( - "eth: handling {} from {}", + "eth: handling {} from {}; active_subs len: {:?}", + //"eth: handling {} from {}", match ð_action { EthAction::SubscribeLogs { .. } => "subscribe", EthAction::UnsubscribeLogs(_) => "unsubscribe", EthAction::Request { .. } => "request", }, - km.source + km.source, + state.active_subscriptions.iter().map(|v| v.len()).collect::>(), ), ) .await; diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index d78eac99..75334c23 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -257,6 +257,9 @@ async fn build_subscription( ) .await; } + let alloy_sub_id = rx.local_id(); + let alloy_sub_id: alloy::primitives::U256 = alloy_sub_id.clone().into(); + println!("{target} making sub {:?}", alloy_sub_id); return Ok(Ok((rx, chain_id))); } Err(rpc_error) => { @@ -387,22 +390,12 @@ async fn maintain_local_subscription( loop { tokio::select! { _ = close_receiver.recv() => { - let alloy_sub_id = rx.local_id(); - let alloy_sub_id = alloy_sub_id.clone().into(); - let Some(chain_providers) = providers.get_mut(&chain_id) else { - return Ok(()); //? - }; - for url in chain_providers.urls.iter() { - let Some(pubsub) = url.pubsub.as_ref() else { - continue; - }; - let x = pubsub.unsubscribe(alloy_sub_id); - println!("we just tried unsubscribing unsubscribed: {:?}", x); - } + unsubscribe(rx, &chain_id, providers); return Ok(()); }, value = rx.recv() => { let Ok(value) = value else { + println!("sub failed: {:?}\r", value.unwrap_err()); break; }; let result: SubscriptionResult = match serde_json::from_str(value.get()) { @@ -433,12 +426,28 @@ async fn maintain_local_subscription( .and_modify(|sub_map| { sub_map.remove(&sub_id); }); + unsubscribe(rx, &chain_id, providers); Err(EthSubError { id: sub_id, - error: "subscription closed unexpectedly".to_string(), + error: format!("subscription ({target}) closed unexpectedly"), }) } +fn unsubscribe(rx: RawSubscription, chain_id: &u64, providers: &Providers) { + let alloy_sub_id = rx.local_id(); + let alloy_sub_id = alloy_sub_id.clone().into(); + let Some(chain_providers) = providers.get_mut(chain_id) else { + return; //? + }; + for url in chain_providers.urls.iter() { + let Some(pubsub) = url.pubsub.as_ref() else { + continue; + }; + let x = pubsub.unsubscribe(alloy_sub_id); + println!("we just tried unsubscribing {:?} unsubscribed: {:?}\r", alloy_sub_id, x); + } +} + /// handle the subscription updates from a remote provider, /// and also perform keepalive checks on that provider. /// current keepalive is 30s, this can be adjusted as desired diff --git a/lib/Cargo.toml b/lib/Cargo.toml index c82aba34..a3add244 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -15,7 +15,8 @@ kit = { git = "https://github.com/kinode-dao/kit", tag = "v0.6.8" } tokio = "1.28" [dependencies] -alloy = { version = "0.1.3", features = [ +#alloy = { version = "0.1.3", features = [ +alloy = { git = "https://github.com/bitful-pannul/alloy.git", rev = "c73e70d", features = [ "json-rpc", "rpc-types", "rpc-types-eth", From 79c8abf3b689ca192257bf8824a3049edd1bd813 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 21 Aug 2024 22:28:43 +0000 Subject: [PATCH 6/8] Format Rust code using rustfmt --- kinode/src/eth/mod.rs | 6 +++++- kinode/src/eth/subscription.rs | 5 ++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index d8b50ed3..bd6802c9 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -515,7 +515,11 @@ async fn handle_eth_action( EthAction::Request { .. } => "request", }, km.source, - state.active_subscriptions.iter().map(|v| v.len()).collect::>(), + state + .active_subscriptions + .iter() + .map(|v| v.len()) + .collect::>(), ), ) .await; diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 75334c23..7d2d5ce1 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -444,7 +444,10 @@ fn unsubscribe(rx: RawSubscription, chain_id: &u64, providers: &Providers) { continue; }; let x = pubsub.unsubscribe(alloy_sub_id); - println!("we just tried unsubscribing {:?} unsubscribed: {:?}\r", alloy_sub_id, x); + println!( + "we just tried unsubscribing {:?} unsubscribed: {:?}\r", + alloy_sub_id, x + ); } } From f0a66d056410d08536758fccd4958ee571be7b65 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Thu, 22 Aug 2024 04:19:00 +0300 Subject: [PATCH 7/8] kns_indexer: replace newHeads sub with ticker --- kinode/Cargo.toml | 1 - .../packages/app_store/ui/package-lock.json | 2 - .../kns_indexer/kns_indexer/src/lib.rs | 208 ++++-------------- kinode/packages/kns_indexer/pkg/manifest.json | 6 +- lib/Cargo.toml | 1 - 5 files changed, 48 insertions(+), 170 deletions(-) diff --git a/kinode/Cargo.toml b/kinode/Cargo.toml index 6f78be02..4f2c9be5 100644 --- a/kinode/Cargo.toml +++ b/kinode/Cargo.toml @@ -26,7 +26,6 @@ simulation-mode = [] [dependencies] aes-gcm = "0.10.3" -#alloy = { version = "0.1.3", features = [ alloy = { git = "https://github.com/bitful-pannul/alloy.git", rev = "c73e70d", features = [ "consensus", "contract", diff --git a/kinode/packages/app_store/ui/package-lock.json b/kinode/packages/app_store/ui/package-lock.json index f65b58a3..54d87528 100644 --- a/kinode/packages/app_store/ui/package-lock.json +++ b/kinode/packages/app_store/ui/package-lock.json @@ -3749,8 +3749,6 @@ }, "node_modules/@parcel/watcher-wasm/node_modules/napi-wasm": { "version": "1.1.0", - "resolved": "https://registry.npmjs.org/napi-wasm/-/napi-wasm-1.1.0.tgz", - "integrity": "sha512-lHwIAJbmLSjF9VDRm9GoVOy9AGp3aIvkjv+Kvz9h16QR3uSVYH78PNQUnT2U4X53mhlnV2M7wrhibQ3GHicDmg==", "inBundle": true, "license": "MIT" }, diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index ce61063a..cd5059fa 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -4,7 +4,7 @@ use crate::kinode::process::kns_indexer::{ use alloy_primitives::keccak256; use alloy_sol_types::SolEvent; use kinode_process_lib::{ - await_message, call_init, eth, kimap, net, print_to_terminal, println, Address, Message, + await_message, call_init, eth, kimap, net, print_to_terminal, println, timer, Address, Message, Request, Response, }; use serde::{Deserialize, Serialize}; @@ -37,6 +37,7 @@ const KIMAP_FIRST_BLOCK: u64 = 1; // local const MAX_PENDING_ATTEMPTS: u8 = 3; const SUBSCRIPTION_TIMEOUT: u64 = 60; +const NEW_BLOCK_TICK: u64 = 3000; // 3s #[derive(Clone, Debug, Serialize, Deserialize)] struct State { @@ -49,8 +50,6 @@ struct State { nodes: HashMap, // last block we have an update from last_block: u64, - // whether we are listening for new blocks - listening_newblocks: bool, } // note: not defined in wit api right now like IndexerRequests. @@ -82,7 +81,6 @@ fn init(our: Address) { nodes: HashMap::new(), names: HashMap::new(), last_block: KIMAP_FIRST_BLOCK, - listening_newblocks: false, }; if let Err(e) = main(our, state) { @@ -138,7 +136,10 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // if subscription results come back in the wrong order, we store them here // until the right block is reached. - let mut pending_requests: BTreeMap> = BTreeMap::new(); + + // pending_requests temporarily on timeout. + // very naughty. + // let mut pending_requests: BTreeMap> = BTreeMap::new(); let mut pending_notes: BTreeMap> = BTreeMap::new(); fetch_and_process_logs( @@ -159,7 +160,20 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { let Ok(message) = await_message() else { continue; }; + // if true, time to go check current block number and handle pending notes. + let tick = message.is_local(&our) && message.source().process == "timer:distro:sys"; let Message::Request { source, body, .. } = message else { + if tick { + handle_eth_message( + &mut state, + ð_provider, + tick, + &mut pending_notes, + &[], + &mints_filter, + ¬es_filter, + )?; + } continue; }; @@ -167,7 +181,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { handle_eth_message( &mut state, ð_provider, - &mut pending_requests, + tick, &mut pending_notes, &body, &mints_filter, @@ -181,53 +195,26 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { ref hash, ref block, }) => { - // make sure we've seen the whole block - if *block < state.last_block { - Response::new() - .body(serde_json::to_vec(&IndexerResponses::Name( - state.names.get(hash).cloned(), - ))?) - .send()?; - } else { - pending_requests - .entry(*block) - .or_insert(vec![]) - .push(request); - } + // TODO: make sure we've seen the whole block, while actually + // sending a response to the proper place. + Response::new() + .body(serde_json::to_vec(&IndexerResponses::Name( + state.names.get(hash).cloned(), + ))?) + .send()?; } + IndexerRequests::NodeInfo(NodeInfoRequest { ref name, block }) => { - // make sure we've seen the whole block - if block < state.last_block { - Response::new() - .body(serde_json::to_vec(&IndexerResponses::NodeInfo( - state.nodes.get(name).cloned(), - ))?) - .send()?; - } else { - pending_requests - .entry(block) - .or_insert(vec![]) - .push(request); - } + Response::new() + .body(serde_json::to_vec(&IndexerResponses::NodeInfo( + state.nodes.get(name).cloned(), + ))?) + .send()?; } IndexerRequests::GetState(GetStateRequest { block }) => { - // make sure we've seen the whole block - if block < state.last_block { - Response::new().body(serde_json::to_vec(&state)?).send()?; - } else { - pending_requests - .entry(block) - .or_insert(vec![]) - .push(request); - } + Response::new().body(serde_json::to_vec(&state)?).send()?; } } - - if !state.listening_newblocks && !pending_requests.is_empty() { - print_to_terminal(0, "subscribing to newHeads for req..."); - listen_to_new_blocks_loop(); // sub_id: 3 - state.listening_newblocks = true; - } } } } @@ -235,7 +222,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { fn handle_eth_message( state: &mut State, eth_provider: ð::Provider, - pending_requests: &mut BTreeMap>, + tick: bool, pending_notes: &mut BTreeMap>, body: &[u8], mints_filter: ð::Filter, @@ -247,14 +234,6 @@ fn handle_eth_message( if let Err(e) = handle_log(state, pending_notes, &log) { print_to_terminal(1, &format!("log-handling error! {e:?}")); } - } else if let eth::SubscriptionResult::Header(header) = result { - if let Some(block) = header.number { - // risque.. - // pending_requests/notes are kicked off with block numbers - // that are ahead of state.last_block. can be risky if event subscriptions and newHeads - // are completely out of sync. - state.last_block = block; - } } } Ok(Err(e)) => { @@ -266,78 +245,21 @@ fn handle_eth_message( eth_provider.subscribe_loop(1, mints_filter.clone()); } else if e.id == 2 { eth_provider.subscribe_loop(2, notes_filter.clone()); - } else if e.id == 3 { - print_to_terminal(0, "subscribing to newHeads for retry..."); - listen_to_new_blocks_loop(); } } - Err(e) => { - return Err(e.into()); + _ => {} + } + if tick { + let block_number = eth_provider.get_block_number(); + if let Ok(block_number) = block_number { + print_to_terminal(1, &format!("new block: {}", block_number)); + state.last_block = block_number; } } - - handle_pending_requests(state, pending_requests)?; handle_pending_notes(state, pending_notes)?; - // if both pending_requests and pending_notes are empty, we kill the newHeads subscription - if state.listening_newblocks && pending_requests.is_empty() && pending_notes.is_empty() { - if let Err(e) = eth_provider.unsubscribe(3) { - print_to_terminal(0, &format!("failed to unsubscribe from newHeads: {e:?}")); - } else { - state.listening_newblocks = false; - print_to_terminal(0, "unsubscribed from newHeads"); - } - } - - Ok(()) -} - -fn handle_pending_requests( - state: &mut State, - pending_requests: &mut BTreeMap>, -) -> anyhow::Result<()> { - // check the pending_requests btreemap to see if there are any requests that - // can be handled now that the state block has been updated - if pending_requests.is_empty() { - return Ok(()); - } - let mut blocks_to_remove = vec![]; - for (block, requests) in pending_requests.iter() { - // make sure we've seen the whole block - if *block < state.last_block { - for request in requests.iter() { - match request { - IndexerRequests::NamehashToName(NamehashToNameRequest { hash, .. }) => { - Response::new() - .body(serde_json::to_vec(&IndexerResponses::Name( - state.names.get(hash).cloned(), - ))?) - .send() - .unwrap(); - } - IndexerRequests::NodeInfo(NodeInfoRequest { name, .. }) => { - Response::new() - .body(serde_json::to_vec(&IndexerResponses::NodeInfo( - state.nodes.get(name).cloned(), - ))?) - .send() - .unwrap(); - } - IndexerRequests::GetState(GetStateRequest { .. }) => { - Response::new() - .body(serde_json::to_vec(&state)?) - .send() - .unwrap(); - } - } - } - blocks_to_remove.push(*block); - } else { - break; - } - } - for block in blocks_to_remove.iter() { - pending_requests.remove(block); + if !pending_notes.is_empty() { + timer::set_timer(NEW_BLOCK_TICK, None); } Ok(()) @@ -526,11 +448,6 @@ fn handle_log( .entry(block_number) .or_default() .push((decoded, 0)); - if !state.listening_newblocks { - print_to_terminal(0, "subscribing to newHeads for note..."); - listen_to_new_blocks_loop(); // sub_id: 3 - state.listening_newblocks = true; - } } } }, @@ -669,40 +586,3 @@ pub fn bytes_to_port(bytes: &[u8]) -> anyhow::Result { _ => Err(anyhow::anyhow!("Invalid byte length for port")), } } - -fn listen_to_new_blocks_loop() { - loop { - let eth_newheads_sub = eth::EthAction::SubscribeLogs { - sub_id: 3, - chain_id: CHAIN_ID, - kind: eth::SubscriptionKind::NewHeads, - params: eth::Params::Bool(false), - }; - - match Request::to(("our", "eth", "distro", "sys")) - .body(serde_json::to_vec(ð_newheads_sub).unwrap()) - .send_and_await_response(SUBSCRIPTION_TIMEOUT) - { - Ok(Ok(Message::Response { body, .. })) => { - match serde_json::from_slice::(&body) { - Ok(eth::EthResponse::Ok) => { - print_to_terminal(0, "successfully subscribed to newHeads"); - break; - } - Ok(eth::EthResponse::Err(e)) => { - print_to_terminal(0, &format!("failed to subscribe to new blocks: {e:?}")); - } - _ => { - print_to_terminal(0, "sailed to subscribe to new blocks, weird response."); - } - } - } - _ => { - print_to_terminal(0, "Failed to subscribe to new blocks, no response."); - } - } - - print_to_terminal(0, "retrying new block subscription in 5 seconds..."); - std::thread::sleep(std::time::Duration::from_secs(5)); - } -} diff --git a/kinode/packages/kns_indexer/pkg/manifest.json b/kinode/packages/kns_indexer/pkg/manifest.json index d29393b9..86365326 100644 --- a/kinode/packages/kns_indexer/pkg/manifest.json +++ b/kinode/packages/kns_indexer/pkg/manifest.json @@ -7,11 +7,13 @@ "request_capabilities": [ "eth:distro:sys", "http_server:distro:sys", - "net:distro:sys" + "net:distro:sys", + "timer:distro:sys" ], "grant_capabilities": [ "eth:distro:sys", - "http_server:distro:sys" + "http_server:distro:sys", + "timer:distro:sys" ], "public": false } diff --git a/lib/Cargo.toml b/lib/Cargo.toml index a3add244..aafefc67 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -15,7 +15,6 @@ kit = { git = "https://github.com/kinode-dao/kit", tag = "v0.6.8" } tokio = "1.28" [dependencies] -#alloy = { version = "0.1.3", features = [ alloy = { git = "https://github.com/bitful-pannul/alloy.git", rev = "c73e70d", features = [ "json-rpc", "rpc-types", From d72462dfe3311402d8be8879cbd0f915b8323cb4 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Wed, 21 Aug 2024 20:01:42 -0700 Subject: [PATCH 8/8] app_store: delay kns queries by 5s to allow kns time to process block --- kinode/packages/app_store/chain/src/lib.rs | 37 +++++++++++-------- kinode/packages/app_store/pkg/manifest.json | 5 ++- .../kns_indexer/kns_indexer/src/lib.rs | 8 ++-- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/kinode/packages/app_store/chain/src/lib.rs b/kinode/packages/app_store/chain/src/lib.rs index ca7ae7d3..e4d3ed5d 100644 --- a/kinode/packages/app_store/chain/src/lib.rs +++ b/kinode/packages/app_store/chain/src/lib.rs @@ -12,7 +12,7 @@ use alloy_sol_types::SolEvent; use kinode::process::chain::ChainResponses; use kinode_process_lib::{ await_message, call_init, eth, get_blob, get_state, http, kernel_types as kt, kimap, - print_to_terminal, println, Address, Message, PackageId, Request, Response, + print_to_terminal, println, timer, Address, Message, PackageId, Request, Response, }; use std::{ collections::{HashMap, HashSet}, @@ -45,6 +45,8 @@ const KIMAP_FIRST_BLOCK: u64 = kimap::KIMAP_FIRST_BLOCK; #[cfg(feature = "simulation-mode")] const KIMAP_FIRST_BLOCK: u64 = 1; +const DELAY_MS: u64 = 5_000; + #[derive(Debug, Serialize, Deserialize)] pub struct State { /// the kimap helper we are using @@ -106,7 +108,18 @@ fn init(our: Address) { } fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow::Result<()> { - if message.is_request() { + if !message.is_request() { + if message.is_local(&our) && message.source().process == "timer:distro:sys" { + // handling of ETH RPC subscriptions delayed by DELAY_MS + // to allow kns to have a chance to process block: handle now + let Some(context) = message.context() else { + return Err(anyhow::anyhow!("foo")); + }; + let log = serde_json::from_slice(context)?; + handle_eth_log(our, state, log)?; + return Ok(()); + } + } else { let req: Req = serde_json::from_slice(message.body())?; match req { Req::Eth(eth_result) => { @@ -118,8 +131,10 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow } if let Ok(eth::EthSub { result, .. }) = eth_result { - if let eth::SubscriptionResult::Log(log) = result { - handle_eth_log(our, state, *log)?; + if let eth::SubscriptionResult::Log(ref log) = result { + // delay handling of ETH RPC subscriptions by DELAY_MS + // to allow kns to have a chance to process block + timer::set_timer(DELAY_MS, Some(serde_json::to_vec(log)?)); } } else { // attempt to resubscribe @@ -131,21 +146,15 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow } } Req::Request(chains) => { - handle_local_request(our, state, chains)?; + handle_local_request(state, chains)?; } } - } else { - return Err(anyhow::anyhow!("not a request")); } Ok(()) } -fn handle_local_request( - our: &Address, - state: &mut State, - req: ChainRequests, -) -> anyhow::Result<()> { +fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result<()> { match req { ChainRequests::GetApp(package_id) => { let onchain_app = state @@ -265,9 +274,7 @@ fn handle_eth_log(our: &Address, state: &mut State, log: eth::Log) -> anyhow::Re // if ~metadata-uri is also empty, this is an unpublish action! if metadata_uri.is_empty() { state.published.remove(&package_id); - if is_our_package { - state.listings.remove(&package_id); - } + state.listings.remove(&package_id); return Ok(()); } return Err(anyhow::anyhow!("metadata hash not found")); diff --git a/kinode/packages/app_store/pkg/manifest.json b/kinode/packages/app_store/pkg/manifest.json index c1643020..315d9641 100644 --- a/kinode/packages/app_store/pkg/manifest.json +++ b/kinode/packages/app_store/pkg/manifest.json @@ -49,7 +49,8 @@ "kns_indexer:kns_indexer:sys", "vfs:distro:sys", "http_client:distro:sys", - "eth:distro:sys" + "eth:distro:sys", + "timer:distro:sys" ], "public": false }, @@ -98,4 +99,4 @@ ], "public": false } -] \ No newline at end of file +] diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index cd5059fa..f6275212 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -401,6 +401,10 @@ fn handle_log( pending_notes: &mut BTreeMap>, log: ð::Log, ) -> anyhow::Result<()> { + if let Some(block) = log.block_number { + state.last_block = block; + } + match log.topics()[0] { kimap::contract::Mint::SIGNATURE_HASH => { let decoded = kimap::contract::Mint::decode_log_data(log.data(), true).unwrap(); @@ -459,10 +463,6 @@ fn handle_log( } }; - if let Some(block) = log.block_number { - state.last_block = block; - } - Ok(()) }