diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index b23742d2..cec64a51 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -322,6 +322,7 @@ async fn handle_message( ) .await; } + Ok(()) } Message::Request(req) => { let timeout = req.expects_response.unwrap_or(60); @@ -330,7 +331,7 @@ async fn handle_message( }; match req { IncomingReq::EthAction(eth_action) => { - return handle_eth_action(state, km, timeout, eth_action).await; + handle_eth_action(state, km, timeout, eth_action).await } IncomingReq::EthConfigAction(eth_config_action) => { kernel_message( @@ -344,10 +345,16 @@ async fn handle_message( &state.send_to_loop, ) .await; + Ok(()) } IncomingReq::EthSubResult(eth_sub_result) => { // forward this to rsvp, if we have the sub id in our active subs let Some(rsvp) = km.rsvp else { + verbose_print( + &state.print_tx, + "eth: got eth_sub_result with no rsvp, ignoring", + ) + .await; return Ok(()); // no rsvp, no need to forward }; let sub_id = match eth_sub_result { @@ -371,8 +378,14 @@ async fn handle_message( } // failed to send subscription update to process, // unsubscribe from provider and close + verbose_print( + &state.print_tx, + "eth: got eth_sub_result but provider node did not match or local sub was already closed", + ) + .await; sub.close(sub_id, state).await; sub_map.remove(&sub_id); + return Ok(()); } } } @@ -380,13 +393,16 @@ async fn handle_message( // so they can stop sending us updates verbose_print( &state.print_tx, - "eth: got eth_sub_result but no matching sub found, unsubscribing", + &format!( + "eth: got eth_sub_result but no matching sub {} found, unsubscribing", + sub_id + ), ) .await; kernel_message( - &state.our.clone(), + &state.our, km.id, - km.source.clone(), + km.source, None, true, None, @@ -394,6 +410,7 @@ async fn handle_message( &state.send_to_loop, ) .await; + Ok(()) } IncomingReq::SubKeepalive(sub_id) => { // source expects that we have a local sub for them with this id @@ -426,11 +443,11 @@ async fn handle_message( &state.send_to_loop, ) .await; + Ok(()) } } } } - Ok(()) } async fn handle_eth_action( @@ -488,7 +505,10 @@ async fn handle_eth_action( let Some(mut sub_map) = state.active_subscriptions.get_mut(&km.source) else { verbose_print( &state.print_tx, - "eth: got unsubscribe but no matching subscription found", + &format!( + "eth: got unsubscribe from {} but no subscription found", + km.source + ), ) .await; error_message( @@ -522,7 +542,10 @@ async fn handle_eth_action( } else { verbose_print( &state.print_tx, - "eth: got unsubscribe but no matching subscription found", + &format!( + "eth: got unsubscribe from {} but no subscription {} found", + km.source, sub_id + ), ) .await; error_message( diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 875de503..21e78021 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -500,7 +500,7 @@ async fn maintain_remote_subscription( None, true, None, - EthAction::UnsubscribeLogs(sub_id), + EthAction::UnsubscribeLogs(remote_sub_id), send_to_loop, ) .await;