more diagnostics, fixes

This commit is contained in:
dr-frmr 2024-08-08 19:12:13 +03:00
parent 19a750b5e5
commit a288fc1fd4
No known key found for this signature in database
2 changed files with 31 additions and 8 deletions

View File

@ -322,6 +322,7 @@ async fn handle_message(
)
.await;
}
Ok(())
}
Message::Request(req) => {
let timeout = req.expects_response.unwrap_or(60);
@ -330,7 +331,7 @@ async fn handle_message(
};
match req {
IncomingReq::EthAction(eth_action) => {
return handle_eth_action(state, km, timeout, eth_action).await;
handle_eth_action(state, km, timeout, eth_action).await
}
IncomingReq::EthConfigAction(eth_config_action) => {
kernel_message(
@ -344,10 +345,16 @@ async fn handle_message(
&state.send_to_loop,
)
.await;
Ok(())
}
IncomingReq::EthSubResult(eth_sub_result) => {
// forward this to rsvp, if we have the sub id in our active subs
let Some(rsvp) = km.rsvp else {
verbose_print(
&state.print_tx,
"eth: got eth_sub_result with no rsvp, ignoring",
)
.await;
return Ok(()); // no rsvp, no need to forward
};
let sub_id = match eth_sub_result {
@ -371,8 +378,14 @@ async fn handle_message(
}
// failed to send subscription update to process,
// unsubscribe from provider and close
verbose_print(
&state.print_tx,
"eth: got eth_sub_result but provider node did not match or local sub was already closed",
)
.await;
sub.close(sub_id, state).await;
sub_map.remove(&sub_id);
return Ok(());
}
}
}
@ -380,13 +393,16 @@ async fn handle_message(
// so they can stop sending us updates
verbose_print(
&state.print_tx,
"eth: got eth_sub_result but no matching sub found, unsubscribing",
&format!(
"eth: got eth_sub_result but no matching sub {} found, unsubscribing",
sub_id
),
)
.await;
kernel_message(
&state.our.clone(),
&state.our,
km.id,
km.source.clone(),
km.source,
None,
true,
None,
@ -394,6 +410,7 @@ async fn handle_message(
&state.send_to_loop,
)
.await;
Ok(())
}
IncomingReq::SubKeepalive(sub_id) => {
// source expects that we have a local sub for them with this id
@ -426,11 +443,11 @@ async fn handle_message(
&state.send_to_loop,
)
.await;
Ok(())
}
}
}
}
Ok(())
}
async fn handle_eth_action(
@ -488,7 +505,10 @@ async fn handle_eth_action(
let Some(mut sub_map) = state.active_subscriptions.get_mut(&km.source) else {
verbose_print(
&state.print_tx,
"eth: got unsubscribe but no matching subscription found",
&format!(
"eth: got unsubscribe from {} but no subscription found",
km.source
),
)
.await;
error_message(
@ -522,7 +542,10 @@ async fn handle_eth_action(
} else {
verbose_print(
&state.print_tx,
"eth: got unsubscribe but no matching subscription found",
&format!(
"eth: got unsubscribe from {} but no subscription {} found",
km.source, sub_id
),
)
.await;
error_message(

View File

@ -500,7 +500,7 @@ async fn maintain_remote_subscription(
None,
true,
None,
EthAction::UnsubscribeLogs(sub_id),
EthAction::UnsubscribeLogs(remote_sub_id),
send_to_loop,
)
.await;