eth provider: try and get more diagnostics to provider term

This commit is contained in:
dr-frmr 2024-08-08 16:05:13 +03:00
parent e876891e1c
commit 81fa748d0a
No known key found for this signature in database
2 changed files with 39 additions and 17 deletions

View File

@ -354,19 +354,25 @@ async fn handle_message(
Ok(EthSub { id, .. }) => id,
Err(EthSubError { id, .. }) => id,
};
if let Some(sub_map) = state.active_subscriptions.get(&rsvp) {
if let Some(ActiveSub::Remote {
provider_node,
sender,
..
}) = sub_map.get(&sub_id)
{
if provider_node == &km.source.node {
if let Ok(()) = sender.send(eth_sub_result).await {
// successfully sent a subscription update from a
// remote provider to one of our processes
return Ok(());
if let Some(mut sub_map) = state.active_subscriptions.get_mut(&rsvp) {
if let Some(sub) = sub_map.get(&sub_id) {
if let ActiveSub::Remote {
provider_node,
sender,
..
} = sub
{
if provider_node == &km.source.node {
if let Ok(()) = sender.send(eth_sub_result).await {
// successfully sent a subscription update from a
// remote provider to one of our processes
return Ok(());
}
}
// failed to send subscription update to process,
// unsubscribe from provider and close
sub.close(sub_id, state).await;
sub_map.remove(&sub_id);
}
}
}
@ -479,12 +485,29 @@ async fn handle_eth_action(
.await;
}
EthAction::UnsubscribeLogs(sub_id) => {
let mut sub_map = state
.active_subscriptions
.entry(km.source.clone())
.or_insert(HashMap::new());
let Some(mut sub_map) = state.active_subscriptions.get_mut(&km.source) else {
verbose_print(
&state.print_tx,
"eth: got unsubscribe but no matching subscription found",
)
.await;
error_message(
&state.our,
km.id,
km.source,
EthError::MalformedRequest,
&state.send_to_loop,
)
.await;
return Ok(());
};
if let Some(sub) = sub_map.remove(&sub_id) {
sub.close(sub_id, state).await;
verbose_print(
&state.print_tx,
&format!("eth: closed subscription {} for {}", sub_id, km.source.node),
)
.await;
kernel_message(
&state.our,
km.id,

View File

@ -113,7 +113,6 @@ pub async fn create_new_subscription(
let (keepalive_err_sender, keepalive_err_receiver) =
tokio::sync::mpsc::channel(1);
response_channels.insert(keepalive_km_id, keepalive_err_sender);
let response_channels = response_channels.clone();
subs.insert(
remote_sub_id,
ActiveSub::Remote {