diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index f9240b0a..21cd4a9e 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -257,8 +257,6 @@ async fn build_subscription( ) .await; } - let alloy_sub_id = rx.local_id(); - let alloy_sub_id: alloy::primitives::U256 = alloy_sub_id.clone().into(); return Ok(Ok((rx, chain_id))); } Err(rpc_error) => { @@ -389,7 +387,7 @@ async fn maintain_local_subscription( loop { tokio::select! { _ = close_receiver.recv() => { - unsubscribe(rx, &chain_id, providers); + unsubscribe(rx, &chain_id, providers).await; return Ok(()); }, value = rx.recv() => { @@ -424,14 +422,14 @@ async fn maintain_local_subscription( .and_modify(|sub_map| { sub_map.remove(&sub_id); }); - unsubscribe(rx, &chain_id, providers); + unsubscribe(rx, &chain_id, providers).await; Err(EthSubError { id: sub_id, error: format!("subscription ({target}) closed unexpectedly"), }) } -fn unsubscribe(rx: RawSubscription, chain_id: &u64, providers: &Providers) { +async fn unsubscribe(rx: RawSubscription, chain_id: &u64, providers: &Providers) { let alloy_sub_id = rx.local_id(); let alloy_sub_id = alloy_sub_id.clone().into(); let Some(chain_providers) = providers.get_mut(chain_id) else { @@ -441,7 +439,14 @@ fn unsubscribe(rx: RawSubscription, chain_id: &u64, providers: &Providers) { let Some(pubsub) = url.pubsub.as_ref() else { continue; }; - let x = pubsub.unsubscribe(alloy_sub_id); + if let Err(err) = pubsub.unsubscribe(alloy_sub_id) { + let _ = print_tx + .send(Printout { + verbosity: 0, + content: "unsubscribe from ETH RPC failed".to_string(), + }) + .await; + } } }