From 07a3a857b884cee657df2f6af0466e51688cbc1c Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Thu, 1 Feb 2024 18:58:10 -0300 Subject: [PATCH] eth & kns: reform subscribe action --- modules/kns_indexer/kns_indexer/src/lib.rs | 22 +++++-- src/eth/provider.rs | 77 +++++++++++++--------- src/eth/types.rs | 6 +- 3 files changed, 66 insertions(+), 39 deletions(-) diff --git a/modules/kns_indexer/kns_indexer/src/lib.rs b/modules/kns_indexer/kns_indexer/src/lib.rs index 5d96df8f..53b4a363 100644 --- a/modules/kns_indexer/kns_indexer/src/lib.rs +++ b/modules/kns_indexer/kns_indexer/src/lib.rs @@ -1,7 +1,10 @@ use alloy_primitives::Address as EthAddress; -use alloy_rpc_types::pubsub::SubscriptionResult; -use alloy_rpc_types::{Filter, Log}; +use alloy_rpc_types::{ + pubsub::{Params, SubscriptionKind, SubscriptionResult}, + BlockNumberOrTag, Filter, Log, +}; use alloy_sol_types::{sol, SolEvent}; + use kinode_process_lib::{ await_message, get_typed_state, print_to_terminal, println, set_state, Address, Message, Request, Response, @@ -28,7 +31,11 @@ wit_bindgen::generate!({ #[derive(Debug, Serialize, Deserialize)] pub enum EthAction { /// Subscribe to logs with a custom filter. ID is to be used to unsubscribe. - SubscribeLogs { sub_id: u64, filter: Filter }, + SubscribeLogs { + sub_id: u64, + kind: SubscriptionKind, + params: Params, + }, /// Kill a SubscribeLogs subscription of a given ID, to stop getting updates. UnsubscribeLogs(u64), } @@ -173,9 +180,10 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { ))? .send()?; - let mut filter = Filter::new() + let filter = Filter::new() .address(contract_address.unwrap().parse::().unwrap()) .from_block(0) + .to_block(BlockNumberOrTag::Latest) .events(vec![ "NodeRegistered(bytes32,bytes)", "KeyUpdate(bytes32,bytes32)", @@ -184,13 +192,15 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { "RoutingUpdate(bytes32,bytes32[])", ]); - filter = filter.from_block(0); + let params = Params::Logs(Box::new(filter)); + let kind = SubscriptionKind::Logs; Request::new() .target((&our.node, "eth", "distro", "sys")) .body(serde_json::to_vec(&EthAction::SubscribeLogs { sub_id: 8, - filter, + kind, + params, })?) .send()?; diff --git a/src/eth/provider.rs b/src/eth/provider.rs index 6f5a8ccd..bd16e956 100644 --- a/src/eth/provider.rs +++ b/src/eth/provider.rs @@ -1,8 +1,10 @@ use crate::eth::types::*; use crate::types::*; -use alloy_primitives::U256; +use alloy_primitives::{Bytes, U256}; +use alloy_providers::provider::TempProvider; use alloy_rpc_client::ClientBuilder; -use alloy_rpc_types::pubsub::SubscriptionResult; +use alloy_rpc_types::pubsub::{Params, SubscriptionKind, SubscriptionResult}; +use alloy_rpc_types::{BlockNumberOrTag, Filter}; use alloy_transport_ws::WsConnect; use anyhow::Result; use std::collections::HashMap; @@ -43,13 +45,14 @@ pub async fn provider( auth: None, }; - // http option here, although doesn't implement .get_watcher()... investigating + // note, reqwest::http is an option here, although doesn't implement .get_watcher() + // polling should be an option, investigating // let client = ClientBuilder::default().reqwest_http(Url::from_str(&rpc_url)?); let client = ClientBuilder::default().pubsub(connector).await?; let provider = alloy_providers::provider::Provider::new_with_client(client); - let x = provider.inner(); + let mut connections = RpcConnections { provider, ws_provider_subscriptions: HashMap::new(), @@ -120,27 +123,31 @@ async fn handle_request( send_to_loop: &MessageSender, ) -> Result<(), EthError> { match action { - EthAction::SubscribeLogs { sub_id, filter } => { + EthAction::SubscribeLogs { + sub_id, + kind, + params, + } => { let sub_id = (target.process.clone(), sub_id); - // if this process has already used this subscription ID, - // this subscription will **overwrite** the existing one. + let kind = serde_json::to_value(&kind).unwrap(); + let params = serde_json::to_value(¶ms).unwrap(); let id = connections .provider .inner() - .prepare::<_, U256>("eth_subscribe", filter) + .prepare("eth_subscribe", [kind, params]) .await .unwrap(); let rx = connections.provider.inner().get_watcher(id).await; - let handle = tokio::spawn(handle_subscription_stream( our.clone(), rx, target.clone(), send_to_loop.clone(), )); + connections.ws_provider_subscriptions.insert(sub_id, handle); Ok(()) } @@ -166,29 +173,35 @@ async fn handle_subscription_stream( target: Address, send_to_loop: MessageSender, ) -> Result<(), EthError> { - while let Ok(value) = rx.recv().await { - println!("got some sub!! {:?}", value); - let event: SubscriptionResult = serde_json::from_value(value.get().into()).unwrap(); - send_to_loop - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our.to_string(), - process: ETH_PROCESS_ID.clone(), - }, - target: target.clone(), - rsvp: None, - message: Message::Request(Request { - inherit: false, - expects_response: None, - body: serde_json::to_vec(&event).unwrap(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: None, - }) - .await - .unwrap(); + match rx.recv().await { + Err(e) => { + println!("got an error from the subscription stream: {:?}", e); + // TODO should we stop the subscription here? + // return Err(EthError::ProviderError(format!("{:?}", e))); + } + Ok(value) => { + let event: SubscriptionResult = serde_json::from_str(value.get()).unwrap(); + send_to_loop + .send(KernelMessage { + id: rand::random(), + source: Address { + node: our.to_string(), + process: ETH_PROCESS_ID.clone(), + }, + target: target.clone(), + rsvp: None, + message: Message::Request(Request { + inherit: false, + expects_response: None, + body: serde_json::to_vec(&event).unwrap(), + metadata: None, + capabilities: vec![], + }), + lazy_load_blob: None, + }) + .await + .unwrap(); + } } Err(EthError::SubscriptionClosed) } diff --git a/src/eth/types.rs b/src/eth/types.rs index a4b0f78a..efc9f524 100644 --- a/src/eth/types.rs +++ b/src/eth/types.rs @@ -19,7 +19,11 @@ use tokio::task::JoinHandle; #[derive(Debug, Serialize, Deserialize)] pub enum EthAction { /// Subscribe to logs with a custom filter. ID is to be used to unsubscribe. - SubscribeLogs { sub_id: u64, filter: Filter }, + SubscribeLogs { + sub_id: u64, + kind: SubscriptionKind, + params: Params, + }, /// Kill a SubscribeLogs subscription of a given ID, to stop getting updates. UnsubscribeLogs(u64), }