fix: refactor eth provider subs

This commit is contained in:
dr-frmr 2024-04-24 23:43:47 +09:00
parent d15ffa4a52
commit 35e1692de7
No known key found for this signature in database
2 changed files with 69 additions and 44 deletions

View File

@ -456,19 +456,15 @@ async fn handle_eth_action(
// before returning an error.
match eth_action {
EthAction::SubscribeLogs { sub_id, .. } => {
tokio::spawn(subscription::create_new_subscription(
state.our.to_string(),
subscription::create_new_subscription(
state,
km.id,
km.source.clone(),
km.rsvp,
state.send_to_loop.clone(),
sub_id,
eth_action,
state.providers.clone(),
state.active_subscriptions.clone(),
state.response_channels.clone(),
state.print_tx.clone(),
));
)
.await;
}
EthAction::UnsubscribeLogs(sub_id) => {
let mut sub_map = state
@ -553,7 +549,7 @@ async fn fulfill_request(
};
// first, try any url providers we have for this chain,
// then if we have none or they all fail, go to node provider.
// then if we have none or they all fail, go to node providers.
// finally, if no provider works, return an error.
// bump the successful provider to the front of the list for future requests
@ -652,15 +648,13 @@ async fn forward_to_node_provider(
else {
return EthResponse::Err(EthError::RpcTimeout);
};
let Message::Response((resp, _context)) = response_km.message else {
// if we hit this, they spoofed a request with same id, ignore and possibly punish
return EthResponse::Err(EthError::RpcMalformedResponse);
};
let Ok(eth_response) = serde_json::from_slice::<EthResponse>(&resp.body) else {
// if we hit this, they sent a malformed response, ignore and possibly punish
return EthResponse::Err(EthError::RpcMalformedResponse);
};
eth_response
if let Message::Response((resp, _context)) = response_km.message {
if let Ok(eth_response) = serde_json::from_slice::<EthResponse>(&resp.body) {
return eth_response;
}
}
// if we hit this, they sent a malformed response, ignore and possibly punish
EthResponse::Err(EthError::RpcMalformedResponse)
}
async fn handle_eth_config_action(

View File

@ -4,50 +4,49 @@ use alloy_rpc_types::pubsub::SubscriptionResult;
/// cleans itself up when the subscription is closed or fails.
pub async fn create_new_subscription(
our: String,
state: &ModuleState,
km_id: u64,
target: Address,
rsvp: Option<Address>,
send_to_loop: MessageSender,
sub_id: u64,
eth_action: EthAction,
providers: Providers,
active_subscriptions: ActiveSubscriptions,
response_channels: ResponseChannels,
print_tx: PrintSender,
) {
verbose_print(&print_tx, "eth: creating new subscription").await;
verbose_print(&state.print_tx, "eth: creating new subscription").await;
match build_subscription(
&our,
&state.our,
km_id,
&target,
&send_to_loop,
&state.send_to_loop,
&eth_action,
&providers,
&response_channels,
&print_tx,
&state.providers,
&state.response_channels,
&state.print_tx,
)
.await
{
Ok(maybe_raw_sub) => {
// send a response to the target that the subscription was successful
kernel_message(
&our,
&state.our,
km_id,
target.clone(),
rsvp.clone(),
false,
None,
EthResponse::Ok,
&send_to_loop,
&state.send_to_loop,
)
.await;
let mut subs = active_subscriptions
let mut subs = state
.active_subscriptions
.entry(target.clone())
.or_insert(HashMap::new());
let active_subscriptions = active_subscriptions.clone();
let active_subscriptions = state.active_subscriptions.clone();
match maybe_raw_sub {
Ok(rx) => {
let our = state.our.clone();
let send_to_loop = state.send_to_loop.clone();
let print_tx = state.print_tx.clone();
subs.insert(
sub_id,
// this is a local sub, as in, we connect to the rpc endpoint
@ -92,7 +91,13 @@ pub async fn create_new_subscription(
let keepalive_km_id = rand::random();
let (keepalive_err_sender, keepalive_err_receiver) =
tokio::sync::mpsc::channel(1);
response_channels.insert(keepalive_km_id, keepalive_err_sender);
state
.response_channels
.insert(keepalive_km_id, keepalive_err_sender);
let our = state.our.clone();
let send_to_loop = state.send_to_loop.clone();
let print_tx = state.print_tx.clone();
let response_channels = state.response_channels.clone();
subs.insert(
remote_sub_id,
ActiveSub::Remote {
@ -140,7 +145,7 @@ pub async fn create_new_subscription(
}
}
Err(e) => {
error_message(&our, km_id, target.clone(), e, &send_to_loop).await;
error_message(&state.our, km_id, target.clone(), e, &state.send_to_loop).await;
}
}
}
@ -171,32 +176,58 @@ async fn build_subscription(
// first, try any url providers we have for this chain,
// then if we have none or they all fail, go to node providers.
// finally, if no provider works, return an error.
for url_provider in &mut aps.urls {
// bump the successful provider to the front of the list for future requests
for (index, url_provider) in aps.urls.iter_mut().enumerate() {
let pubsub = match &url_provider.pubsub {
Some(pubsub) => pubsub,
None => {
if let Ok(()) = activate_url_provider(url_provider).await {
verbose_print(print_tx, "eth: activated a url provider").await;
verbose_print(
print_tx,
&format!("eth: activated url provider {}", url_provider.url),
)
.await;
url_provider.pubsub.as_ref().unwrap()
} else {
verbose_print(
print_tx,
&format!("eth: could not activate url provider {}", url_provider.url),
)
.await;
continue;
}
}
};
let kind = serde_json::to_value(&kind).unwrap();
let params = serde_json::to_value(&params).unwrap();
if let Ok(id) = pubsub
match pubsub
.inner()
.prepare("eth_subscribe", [kind, params])
.await
{
let rx = pubsub.inner().get_raw_subscription(id).await;
return Ok(Ok(rx));
Ok(id) => {
let rx = pubsub.inner().get_raw_subscription(id).await;
let successful_provider = aps.urls.remove(index);
aps.urls.insert(0, successful_provider);
return Ok(Ok(rx));
}
Err(rpc_error) => {
verbose_print(
print_tx,
&format!(
"eth: got error from url provider {}: {}",
url_provider.url, rpc_error
),
)
.await;
// this provider failed and needs to be reset
url_provider.pubsub = None;
}
}
// this provider failed and needs to be reset
url_provider.pubsub = None;
}
// now we need a response channel
// now we need a response channel which will be open for the duration
// of the subscription
let (sender, mut response_receiver) = tokio::sync::mpsc::channel(1);
response_channels.insert(km_id, sender);
// we need to create our own unique sub id because in the remote provider node,