diff --git a/kinode/default_providers_testnet.json b/kinode/default_providers_testnet.json index c502f959..3e46a815 100644 --- a/kinode/default_providers_testnet.json +++ b/kinode/default_providers_testnet.json @@ -9,6 +9,16 @@ "allow": [], "deny": [] }, + { + "chain_id": 11155111, + "trusted": false, + "provider": { + "RpcUrl": "wss://eth-sepolia.g.alchemy.com/v2/a4bRKYnvC0uT2l1rzVDAvldH3OPKQnKm" + }, + "public": false, + "allow": [], + "deny": [] + }, { "chain_id": 11155111, "trusted": true, diff --git a/kinode/src/eth/provider.rs b/kinode/src/eth/provider.rs index da89ba2f..d1272052 100644 --- a/kinode/src/eth/provider.rs +++ b/kinode/src/eth/provider.rs @@ -14,7 +14,7 @@ use std::sync::Arc; use tokio::task::JoinHandle; use url::Url; -/// mapping of chain id to ordered(TODO) list of providers +/// mapping of chain id to ordered lists of providers type Providers = Arc>; struct ActiveProviders { @@ -37,7 +37,7 @@ struct NodeProvider { type ActiveSubscriptions = Arc>>; enum ActiveSub { - Local(JoinHandle>), + Local(JoinHandle<()>), Remote(String), // name of node providing this subscription for us } @@ -93,6 +93,7 @@ pub async fn provider( caps_oracle: CapMessageSender, print_tx: PrintSender, ) -> Result<()> { + println!("provider: on\r"); let our = Arc::new(our); let mut access_settings = AccessSettings { @@ -156,6 +157,7 @@ async fn handle_message( return handle_eth_action( our, access_settings, + send_to_loop, km, eth_action, providers, @@ -205,6 +207,7 @@ async fn handle_passthrough_response( async fn handle_eth_action( our: &str, access_settings: &mut AccessSettings, + send_to_loop: &MessageSender, km: KernelMessage, eth_action: EthAction, providers: &mut Providers, @@ -231,27 +234,119 @@ async fn handle_eth_action( EthAction::SubscribeLogs { sub_id, chain_id, - kind, - params, + ref kind, + ref params, } => { - todo!() + let new_sub = ActiveSub::Local(tokio::spawn(create_new_subscription( + our.to_string(), + km.id, + km.source.clone(), + km.rsvp, + send_to_loop.clone(), + eth_action, + providers.clone(), + ))); + let mut subs = active_subscriptions + .entry(km.source.process) + .or_insert(HashMap::new()); + subs.insert(sub_id, new_sub); } EthAction::UnsubscribeLogs(sub_id) => { active_subscriptions .entry(km.source.process) .and_modify(|sub_map| { - sub_map.remove(&sub_id); + if let Some(sub) = sub_map.get_mut(&sub_id) { + match sub { + ActiveSub::Local(handle) => { + handle.abort(); + } + ActiveSub::Remote(node) => { + // TODO send to them asking to abort + } + } + } }); - Ok(()) } EthAction::Request { chain_id, method, params, } => { - todo!() + //todo } } + Ok(()) +} + +/// spawned as a task +async fn create_new_subscription( + our: String, + km_id: u64, + target: Address, + rsvp: Option
, + send_to_loop: MessageSender, + eth_action: EthAction, + providers: Providers, +) { + let EthAction::SubscribeLogs { + sub_id, + chain_id, + kind, + params, + } = eth_action + else { + return; + }; + let Some(aps) = providers.get_mut(&chain_id) else { + send_to_loop + .send(make_error_message( + &our, + sub_id, + target, + EthError::NoRpcForChain, + )) + .await + .expect("eth: kernel sender died!"); + return; + }; + // first, try any url providers we have for this chain, + // then if we have none or they all fail, go to node providers. + // finally, if no provider works, return an error. + for url_provider in &aps.urls { + if let Some(pubsub) = &url_provider.pubsub { + let kind = serde_json::to_value(&kind).unwrap(); + let params = serde_json::to_value(¶ms).unwrap(); + if let Ok(id) = pubsub + .inner() + .prepare("eth_subscribe", [kind, params]) + .await + { + let rx = pubsub.inner().get_raw_subscription(id).await; + if let Err(e) = + handle_subscription_stream(&our, sub_id, rx, &target, &rsvp, &send_to_loop) + .await + { + send_to_loop + .send(make_error_message(&our, sub_id, target, e)) + .await + .expect("eth: kernel sender died!"); + } + return; + } + } + } + for node_provider in &aps.nodes { + // todo + } + send_to_loop + .send(make_error_message( + &our, + sub_id, + target, + EthError::NoRpcForChain, + )) + .await + .expect("eth: kernel sender died!"); } async fn handle_eth_config_action( @@ -517,12 +612,12 @@ async fn handle_eth_config_action( /// This task is responsible for connecting to the ETH RPC provider and streaming logs /// for a specific subscription made by a process. async fn handle_subscription_stream( - our: String, + our: &str, sub_id: u64, mut rx: RawSubscription, - target: Address, - rsvp: Option
, - send_to_loop: MessageSender, + target: &Address, + rsvp: &Option
, + send_to_loop: &MessageSender, ) -> Result<(), EthError> { loop { match rx.recv().await { @@ -540,7 +635,7 @@ async fn handle_subscription_stream( .send(KernelMessage { id: rand::random(), source: Address { - node: our.clone(), + node: our.to_string(), process: ETH_PROCESS_ID.clone(), }, target: target.clone(), @@ -559,7 +654,7 @@ async fn handle_subscription_stream( lazy_load_blob: None, }) .await - .unwrap(); + .map_err(|_| EthError::RpcError("eth: sender died".to_string()))?; } } } diff --git a/lib/src/eth.rs b/lib/src/eth.rs index c4f996a6..04cd0d09 100644 --- a/lib/src/eth.rs +++ b/lib/src/eth.rs @@ -58,6 +58,8 @@ pub enum EthResponse { #[derive(Debug, Serialize, Deserialize)] pub enum EthError { + /// No RPC provider for the chain + NoRpcForChain, /// Underlying transport error TransportError(String), /// Subscription closed