From cfedc1910292ec97c65aff8f885c03aa91a9d1ca Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Thu, 25 Apr 2024 02:42:11 +0900 Subject: [PATCH] fix: more sub refactor --- kinode/src/eth/mod.rs | 11 +- kinode/src/eth/subscription.rs | 249 ++++++++++++++++++--------------- 2 files changed, 147 insertions(+), 113 deletions(-) diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index 8a1046c9..c1fb20d1 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -46,6 +46,7 @@ struct UrlProvider { #[derive(Debug)] struct NodeProvider { + /// NOT CURRENTLY USED pub trusted: bool, /// semi-temporary flag to mark if this provider is currently usable /// future updates will make this more dynamic @@ -365,7 +366,7 @@ async fn handle_message( // so they can stop sending us updates verbose_print( &state.print_tx, - "eth: got eth_sub_result but no matching sub found", + "eth: got eth_sub_result but no matching sub found, unsubscribing", ) .await; kernel_message( @@ -595,6 +596,14 @@ async fn fulfill_request( } } for node_provider in &mut aps.nodes { + verbose_print( + print_tx, + &format!( + "eth: attempting to fulfill via {}", + node_provider.kns_update.name + ), + ) + .await; let response = forward_to_node_provider( our, km_id, diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index ddbf5fea..79b92d79 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -11,120 +11,75 @@ pub async fn create_new_subscription( sub_id: u64, eth_action: EthAction, ) { - match build_subscription( - &state.our, - km_id, - &target, - &state.send_to_loop, - ð_action, - &state.providers, - &state.response_channels, - &state.print_tx, - ) - .await - { - Ok(maybe_raw_sub) => { - // send a response to the target that the subscription was successful - kernel_message( - &state.our, + let our = state.our.clone(); + let send_to_loop = state.send_to_loop.clone(); + let active_subscriptions = state.active_subscriptions.clone(); + let providers = state.providers.clone(); + let response_channels = state.response_channels.clone(); + let print_tx = state.print_tx.clone(); + tokio::spawn(async move { + match tokio::time::timeout( + std::time::Duration::from_secs(30), + build_subscription( + &our, km_id, - target.clone(), - rsvp.clone(), - false, - None, - EthResponse::Ok, - &state.send_to_loop, - ) - .await; - let mut subs = state - .active_subscriptions - .entry(target.clone()) - .or_insert(HashMap::new()); - let active_subscriptions = state.active_subscriptions.clone(); - match maybe_raw_sub { - Ok(rx) => { - let our = state.our.clone(); - let send_to_loop = state.send_to_loop.clone(); - let print_tx = state.print_tx.clone(); - subs.insert( - sub_id, - // this is a local sub, as in, we connect to the rpc endpoint - ActiveSub::Local(tokio::spawn(async move { - // await the subscription error and kill it if so - if let Err(e) = maintain_local_subscription( - &our, - sub_id, - rx, - &target, - &rsvp, - &send_to_loop, - ) - .await - { - verbose_print( - &print_tx, - "eth: closed local subscription due to error", - ) - .await; - kernel_message( + &target, + &send_to_loop, + ð_action, + &providers, + &response_channels, + &print_tx, + ), + ) + .await + { + Ok(Ok(maybe_raw_sub)) => { + // send a response to the target that the subscription was successful + kernel_message( + &our, + km_id, + target.clone(), + rsvp.clone(), + false, + None, + EthResponse::Ok, + &send_to_loop, + ) + .await; + let mut subs = active_subscriptions + .entry(target.clone()) + .or_insert(HashMap::new()); + let active_subscriptions = active_subscriptions.clone(); + match maybe_raw_sub { + Ok(rx) => { + let our = our.clone(); + let send_to_loop = send_to_loop.clone(); + let print_tx = print_tx.clone(); + subs.insert( + sub_id, + // this is a local sub, as in, we connect to the rpc endpoint + ActiveSub::Local(tokio::spawn(async move { + // await the subscription error and kill it if so + if let Err(e) = maintain_local_subscription( &our, - rand::random(), - target.clone(), - rsvp, - true, - None, - EthSubResult::Err(e), - &send_to_loop, - ) - .await; - active_subscriptions.entry(target).and_modify(|sub_map| { - sub_map.remove(&km_id); - }); - } - })), - ); - } - Err((provider_node, remote_sub_id)) => { - // this is a remote sub, given by a relay node - let (sender, rx) = tokio::sync::mpsc::channel(10); - let keepalive_km_id = rand::random(); - let (keepalive_err_sender, keepalive_err_receiver) = - tokio::sync::mpsc::channel(1); - state - .response_channels - .insert(keepalive_km_id, keepalive_err_sender); - let our = state.our.clone(); - let send_to_loop = state.send_to_loop.clone(); - let print_tx = state.print_tx.clone(); - let response_channels = state.response_channels.clone(); - subs.insert( - remote_sub_id, - ActiveSub::Remote { - provider_node: provider_node.clone(), - handle: tokio::spawn(async move { - if let Err(e) = maintain_remote_subscription( - &our, - &provider_node, - remote_sub_id, sub_id, - keepalive_km_id, rx, - keepalive_err_receiver, &target, + &rsvp, &send_to_loop, ) .await { verbose_print( &print_tx, - "eth: closed subscription with provider node due to error", + "eth: closed local subscription due to error", ) .await; kernel_message( &our, rand::random(), target.clone(), - None, + rsvp, true, None, EthSubResult::Err(e), @@ -132,21 +87,84 @@ pub async fn create_new_subscription( ) .await; active_subscriptions.entry(target).and_modify(|sub_map| { - sub_map.remove(&sub_id); + sub_map.remove(&km_id); }); - response_channels.remove(&keepalive_km_id); } - }), - sender, - }, - ); + })), + ); + } + Err((provider_node, remote_sub_id)) => { + // this is a remote sub, given by a relay node + let (sender, rx) = tokio::sync::mpsc::channel(10); + let keepalive_km_id = rand::random(); + let (keepalive_err_sender, keepalive_err_receiver) = + tokio::sync::mpsc::channel(1); + response_channels.insert(keepalive_km_id, keepalive_err_sender); + let our = our.clone(); + let send_to_loop = send_to_loop.clone(); + let print_tx = print_tx.clone(); + let response_channels = response_channels.clone(); + subs.insert( + remote_sub_id, + ActiveSub::Remote { + provider_node: provider_node.clone(), + handle: tokio::spawn(async move { + if let Err(e) = maintain_remote_subscription( + &our, + &provider_node, + remote_sub_id, + sub_id, + keepalive_km_id, + rx, + keepalive_err_receiver, + &target, + &send_to_loop, + ) + .await + { + verbose_print( + &print_tx, + "eth: closed subscription with provider node due to error", + ) + .await; + kernel_message( + &our, + rand::random(), + target.clone(), + None, + true, + None, + EthSubResult::Err(e), + &send_to_loop, + ) + .await; + active_subscriptions.entry(target).and_modify(|sub_map| { + sub_map.remove(&sub_id); + }); + response_channels.remove(&keepalive_km_id); + } + }), + sender, + }, + ); + } } } + Ok(Err(e)) => { + error_message(&our, km_id, target.clone(), e, &send_to_loop).await; + } + Err(_) => { + error_message( + &our, + km_id, + target.clone(), + EthError::RpcTimeout, + &send_to_loop, + ) + .await; + } } - Err(e) => { - error_message(&state.our, km_id, target.clone(), e, &state.send_to_loop).await; - } - } + }); } /// terrible abuse of result in return type, yes, sorry @@ -183,14 +201,14 @@ async fn build_subscription( None => { if let Ok(()) = activate_url_provider(url_provider).await { verbose_print( - print_tx, + &print_tx, &format!("eth: activated url provider {}", url_provider.url), ) .await; url_provider.pubsub.as_ref().unwrap() } else { verbose_print( - print_tx, + &print_tx, &format!("eth: could not activate url provider {}", url_provider.url), ) .await; @@ -213,7 +231,7 @@ async fn build_subscription( } Err(rpc_error) => { verbose_print( - print_tx, + &print_tx, &format!( "eth: got error from url provider {}: {}", url_provider.url, rpc_error @@ -225,14 +243,21 @@ async fn build_subscription( } } } - // now we need a response channel which will be open for the duration - // of the subscription + let (sender, mut response_receiver) = tokio::sync::mpsc::channel(1); response_channels.insert(km_id, sender); // we need to create our own unique sub id because in the remote provider node, // all subs will be identified under our process address. let remote_sub_id = rand::random(); for node_provider in &mut aps.nodes { + verbose_print( + &print_tx, + &format!( + "eth: attempting to fulfill via {}", + node_provider.kns_update.name + ), + ) + .await; match forward_to_node_provider( &our, km_id,