eth & kns: reform subscribe action

This commit is contained in:
bitful-pannul 2024-02-01 18:58:10 -03:00
parent ec4cd9b39e
commit 07a3a857b8
3 changed files with 66 additions and 39 deletions

View File

@ -1,7 +1,10 @@
use alloy_primitives::Address as EthAddress;
use alloy_rpc_types::pubsub::SubscriptionResult;
use alloy_rpc_types::{Filter, Log};
use alloy_rpc_types::{
pubsub::{Params, SubscriptionKind, SubscriptionResult},
BlockNumberOrTag, Filter, Log,
};
use alloy_sol_types::{sol, SolEvent};
use kinode_process_lib::{
await_message, get_typed_state, print_to_terminal, println, set_state, Address, Message,
Request, Response,
@ -28,7 +31,11 @@ wit_bindgen::generate!({
#[derive(Debug, Serialize, Deserialize)]
pub enum EthAction {
/// Subscribe to logs with a custom filter. ID is to be used to unsubscribe.
SubscribeLogs { sub_id: u64, filter: Filter },
SubscribeLogs {
sub_id: u64,
kind: SubscriptionKind,
params: Params,
},
/// Kill a SubscribeLogs subscription of a given ID, to stop getting updates.
UnsubscribeLogs(u64),
}
@ -173,9 +180,10 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
))?
.send()?;
let mut filter = Filter::new()
let filter = Filter::new()
.address(contract_address.unwrap().parse::<EthAddress>().unwrap())
.from_block(0)
.to_block(BlockNumberOrTag::Latest)
.events(vec![
"NodeRegistered(bytes32,bytes)",
"KeyUpdate(bytes32,bytes32)",
@ -184,13 +192,15 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
"RoutingUpdate(bytes32,bytes32[])",
]);
filter = filter.from_block(0);
let params = Params::Logs(Box::new(filter));
let kind = SubscriptionKind::Logs;
Request::new()
.target((&our.node, "eth", "distro", "sys"))
.body(serde_json::to_vec(&EthAction::SubscribeLogs {
sub_id: 8,
filter,
kind,
params,
})?)
.send()?;

View File

@ -1,8 +1,10 @@
use crate::eth::types::*;
use crate::types::*;
use alloy_primitives::U256;
use alloy_primitives::{Bytes, U256};
use alloy_providers::provider::TempProvider;
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types::pubsub::SubscriptionResult;
use alloy_rpc_types::pubsub::{Params, SubscriptionKind, SubscriptionResult};
use alloy_rpc_types::{BlockNumberOrTag, Filter};
use alloy_transport_ws::WsConnect;
use anyhow::Result;
use std::collections::HashMap;
@ -43,13 +45,14 @@ pub async fn provider(
auth: None,
};
// http option here, although doesn't implement .get_watcher()... investigating
// note, reqwest::http is an option here, although doesn't implement .get_watcher()
// polling should be an option, investigating
// let client = ClientBuilder::default().reqwest_http(Url::from_str(&rpc_url)?);
let client = ClientBuilder::default().pubsub(connector).await?;
let provider = alloy_providers::provider::Provider::new_with_client(client);
let x = provider.inner();
let mut connections = RpcConnections {
provider,
ws_provider_subscriptions: HashMap::new(),
@ -120,27 +123,31 @@ async fn handle_request(
send_to_loop: &MessageSender,
) -> Result<(), EthError> {
match action {
EthAction::SubscribeLogs { sub_id, filter } => {
EthAction::SubscribeLogs {
sub_id,
kind,
params,
} => {
let sub_id = (target.process.clone(), sub_id);
// if this process has already used this subscription ID,
// this subscription will **overwrite** the existing one.
let kind = serde_json::to_value(&kind).unwrap();
let params = serde_json::to_value(&params).unwrap();
let id = connections
.provider
.inner()
.prepare::<_, U256>("eth_subscribe", filter)
.prepare("eth_subscribe", [kind, params])
.await
.unwrap();
let rx = connections.provider.inner().get_watcher(id).await;
let handle = tokio::spawn(handle_subscription_stream(
our.clone(),
rx,
target.clone(),
send_to_loop.clone(),
));
connections.ws_provider_subscriptions.insert(sub_id, handle);
Ok(())
}
@ -166,9 +173,14 @@ async fn handle_subscription_stream(
target: Address,
send_to_loop: MessageSender,
) -> Result<(), EthError> {
while let Ok(value) = rx.recv().await {
println!("got some sub!! {:?}", value);
let event: SubscriptionResult = serde_json::from_value(value.get().into()).unwrap();
match rx.recv().await {
Err(e) => {
println!("got an error from the subscription stream: {:?}", e);
// TODO should we stop the subscription here?
// return Err(EthError::ProviderError(format!("{:?}", e)));
}
Ok(value) => {
let event: SubscriptionResult = serde_json::from_str(value.get()).unwrap();
send_to_loop
.send(KernelMessage {
id: rand::random(),
@ -190,5 +202,6 @@ async fn handle_subscription_stream(
.await
.unwrap();
}
}
Err(EthError::SubscriptionClosed)
}

View File

@ -19,7 +19,11 @@ use tokio::task::JoinHandle;
#[derive(Debug, Serialize, Deserialize)]
pub enum EthAction {
/// Subscribe to logs with a custom filter. ID is to be used to unsubscribe.
SubscribeLogs { sub_id: u64, filter: Filter },
SubscribeLogs {
sub_id: u64,
kind: SubscriptionKind,
params: Params,
},
/// Kill a SubscribeLogs subscription of a given ID, to stop getting updates.
UnsubscribeLogs(u64),
}