fix: refactor eth provider subs again

This commit is contained in:
dr-frmr 2024-05-13 15:11:32 -06:00
parent ac408e6c2f
commit 54e6f58bf5
No known key found for this signature in database

View File

@ -33,6 +33,23 @@ pub async fn create_new_subscription(
) )
.await .await
{ {
// if building the subscription fails, send an error message to the target
Ok(Err(e)) => {
error_message(&our, km_id, target.clone(), e, &send_to_loop).await;
}
// if building the subscription times out, send an error message to the target
Err(_) => {
error_message(
&our,
km_id,
target.clone(),
EthError::RpcTimeout,
&send_to_loop,
)
.await;
}
// if building the subscription is successful, start the subscription
// and in this task, maintain and clean up after it.
Ok(Ok(maybe_raw_sub)) => { Ok(Ok(maybe_raw_sub)) => {
// send a response to the target that the subscription was successful // send a response to the target that the subscription was successful
kernel_message( kernel_message(
@ -49,47 +66,43 @@ pub async fn create_new_subscription(
let mut subs = active_subscriptions let mut subs = active_subscriptions
.entry(target.clone()) .entry(target.clone())
.or_insert(HashMap::new()); .or_insert(HashMap::new());
let our = our.clone();
let send_to_loop = send_to_loop.clone();
let print_tx = print_tx.clone();
let active_subscriptions = active_subscriptions.clone(); let active_subscriptions = active_subscriptions.clone();
match maybe_raw_sub { match maybe_raw_sub {
Ok(rx) => { Ok(rx) => {
let our = our.clone();
let send_to_loop = send_to_loop.clone();
let print_tx = print_tx.clone();
subs.insert( subs.insert(
sub_id, sub_id,
// this is a local sub, as in, we connect to the rpc endpoint // this is a local sub, as in, we connect to the rpc endpoint
ActiveSub::Local(tokio::spawn(async move { ActiveSub::Local(tokio::spawn(async move {
// await the subscription error and kill it if so // await the subscription error and kill it if so
if let Err(e) = maintain_local_subscription( let e = maintain_local_subscription(
&our, &our,
sub_id, sub_id,
rx, rx,
&target, &target,
&rsvp, &rsvp,
&send_to_loop, &send_to_loop,
&active_subscriptions,
) )
.await .await;
{ verbose_print(
verbose_print( &print_tx,
&print_tx, &format!("eth: closed local subscription due to error {e:?}"),
"eth: closed local subscription due to error", )
) .await;
.await; kernel_message(
kernel_message( &our,
&our, rand::random(),
rand::random(), target.clone(),
target.clone(), rsvp,
rsvp, true,
true, None,
None, EthSubResult::Err(e),
EthSubResult::Err(e), &send_to_loop,
&send_to_loop, )
) .await;
.await;
active_subscriptions.entry(target).and_modify(|sub_map| {
sub_map.remove(&km_id);
});
}
})), })),
); );
} }
@ -100,16 +113,13 @@ pub async fn create_new_subscription(
let (keepalive_err_sender, keepalive_err_receiver) = let (keepalive_err_sender, keepalive_err_receiver) =
tokio::sync::mpsc::channel(1); tokio::sync::mpsc::channel(1);
response_channels.insert(keepalive_km_id, keepalive_err_sender); response_channels.insert(keepalive_km_id, keepalive_err_sender);
let our = our.clone();
let send_to_loop = send_to_loop.clone();
let print_tx = print_tx.clone();
let response_channels = response_channels.clone(); let response_channels = response_channels.clone();
subs.insert( subs.insert(
remote_sub_id, remote_sub_id,
ActiveSub::Remote { ActiveSub::Remote {
provider_node: provider_node.clone(), provider_node: provider_node.clone(),
handle: tokio::spawn(async move { handle: tokio::spawn(async move {
if let Err(e) = maintain_remote_subscription( let e = maintain_remote_subscription(
&our, &our,
&provider_node, &provider_node,
remote_sub_id, remote_sub_id,
@ -119,30 +129,26 @@ pub async fn create_new_subscription(
keepalive_err_receiver, keepalive_err_receiver,
&target, &target,
&send_to_loop, &send_to_loop,
) &active_subscriptions,
.await &response_channels,
{ )
verbose_print( .await;
&print_tx, verbose_print(
"eth: closed subscription with provider node due to error", &print_tx,
&format!("eth: closed subscription with provider node due to error {e:?}"),
)
.await;
kernel_message(
&our,
rand::random(),
target.clone(),
None,
true,
None,
EthSubResult::Err(e),
&send_to_loop,
) )
.await; .await;
kernel_message(
&our,
rand::random(),
target.clone(),
None,
true,
None,
EthSubResult::Err(e),
&send_to_loop,
)
.await;
active_subscriptions.entry(target).and_modify(|sub_map| {
sub_map.remove(&sub_id);
});
response_channels.remove(&keepalive_km_id);
}
}), }),
sender, sender,
}, },
@ -150,19 +156,6 @@ pub async fn create_new_subscription(
} }
} }
} }
Ok(Err(e)) => {
error_message(&our, km_id, target.clone(), e, &send_to_loop).await;
}
Err(_) => {
error_message(
&our,
km_id,
target.clone(),
EthError::RpcTimeout,
&send_to_loop,
)
.await;
}
} }
}); });
} }
@ -311,13 +304,18 @@ async fn maintain_local_subscription(
target: &Address, target: &Address,
rsvp: &Option<Address>, rsvp: &Option<Address>,
send_to_loop: &MessageSender, send_to_loop: &MessageSender,
) -> Result<(), EthSubError> { active_subscriptions: &ActiveSubscriptions,
) -> EthSubError {
while let Ok(value) = rx.recv().await { while let Ok(value) = rx.recv().await {
let result: SubscriptionResult = let result: SubscriptionResult = match serde_json::from_str(value.get()) {
serde_json::from_str(value.get()).map_err(|e| EthSubError { Ok(res) => res,
id: sub_id, Err(e) => {
error: e.to_string(), return EthSubError {
})?; id: sub_id,
error: e.to_string(),
}
}
};
kernel_message( kernel_message(
our, our,
rand::random(), rand::random(),
@ -330,10 +328,15 @@ async fn maintain_local_subscription(
) )
.await; .await;
} }
Err(EthSubError { active_subscriptions
.entry(target.clone())
.and_modify(|sub_map| {
sub_map.remove(&sub_id);
});
EthSubError {
id: sub_id, id: sub_id,
error: "subscription closed unexpectedly".to_string(), error: "subscription closed unexpectedly".to_string(),
}) }
} }
/// handle the subscription updates from a remote provider, /// handle the subscription updates from a remote provider,
@ -352,12 +355,14 @@ async fn maintain_remote_subscription(
mut net_error_rx: ProcessMessageReceiver, mut net_error_rx: ProcessMessageReceiver,
target: &Address, target: &Address,
send_to_loop: &MessageSender, send_to_loop: &MessageSender,
) -> Result<(), EthSubError> { active_subscriptions: &ActiveSubscriptions,
response_channels: &ResponseChannels,
) -> EthSubError {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
let mut last_received = tokio::time::Instant::now(); let mut last_received = tokio::time::Instant::now();
let two_hours = tokio::time::Duration::from_secs(2 * 3600); let two_hours = tokio::time::Duration::from_secs(2 * 3600);
loop { let e = loop {
tokio::select! { tokio::select! {
incoming = rx.recv() => { incoming = rx.recv() => {
match incoming { match incoming {
@ -380,16 +385,16 @@ async fn maintain_remote_subscription(
.await; .await;
} }
Some(EthSubResult::Err(e)) => { Some(EthSubResult::Err(e)) => {
return Err(EthSubError { break EthSubError {
id: sub_id, id: sub_id,
error: e.error, error: e.error,
}); };
} }
None => { None => {
return Err(EthSubError { break EthSubError {
id: sub_id, id: sub_id,
error: "subscription closed unexpectedly".to_string(), error: "subscription closed unexpectedly".to_string(),
}); };
} }
} }
} }
@ -408,18 +413,25 @@ async fn maintain_remote_subscription(
} }
incoming = net_error_rx.recv() => { incoming = net_error_rx.recv() => {
if let Some(Err(_net_error)) = incoming { if let Some(Err(_net_error)) = incoming {
return Err(EthSubError { break EthSubError {
id: sub_id, id: sub_id,
error: "subscription node-provider failed keepalive".to_string(), error: "subscription node-provider failed keepalive".to_string(),
}); };
} }
} }
_ = tokio::time::sleep_until(last_received + two_hours) => { _ = tokio::time::sleep_until(last_received + two_hours) => {
return Err(EthSubError { break EthSubError {
id: sub_id, id: sub_id,
error: "No updates received for 2 hours, subscription considered dead.".to_string(), error: "No updates received for 2 hours, subscription considered dead.".to_string(),
}); };
} }
} }
} };
active_subscriptions
.entry(target.clone())
.and_modify(|sub_map| {
sub_map.remove(&sub_id);
});
response_channels.remove(&keepalive_km_id);
e
} }