fix: more sub refactor

This commit is contained in:
dr-frmr 2024-04-25 02:42:11 +09:00
parent d3a36b378e
commit cfedc19102
No known key found for this signature in database
2 changed files with 147 additions and 113 deletions

View File

@ -46,6 +46,7 @@ struct UrlProvider {
#[derive(Debug)]
struct NodeProvider {
/// NOT CURRENTLY USED
pub trusted: bool,
/// semi-temporary flag to mark if this provider is currently usable
/// future updates will make this more dynamic
@ -365,7 +366,7 @@ async fn handle_message(
// so they can stop sending us updates
verbose_print(
&state.print_tx,
"eth: got eth_sub_result but no matching sub found",
"eth: got eth_sub_result but no matching sub found, unsubscribing",
)
.await;
kernel_message(
@ -595,6 +596,14 @@ async fn fulfill_request(
}
}
for node_provider in &mut aps.nodes {
verbose_print(
print_tx,
&format!(
"eth: attempting to fulfill via {}",
node_provider.kns_update.name
),
)
.await;
let response = forward_to_node_provider(
our,
km_id,

View File

@ -11,120 +11,75 @@ pub async fn create_new_subscription(
sub_id: u64,
eth_action: EthAction,
) {
match build_subscription(
&state.our,
km_id,
&target,
&state.send_to_loop,
&eth_action,
&state.providers,
&state.response_channels,
&state.print_tx,
)
.await
{
Ok(maybe_raw_sub) => {
// send a response to the target that the subscription was successful
kernel_message(
&state.our,
let our = state.our.clone();
let send_to_loop = state.send_to_loop.clone();
let active_subscriptions = state.active_subscriptions.clone();
let providers = state.providers.clone();
let response_channels = state.response_channels.clone();
let print_tx = state.print_tx.clone();
tokio::spawn(async move {
match tokio::time::timeout(
std::time::Duration::from_secs(30),
build_subscription(
&our,
km_id,
target.clone(),
rsvp.clone(),
false,
None,
EthResponse::Ok,
&state.send_to_loop,
)
.await;
let mut subs = state
.active_subscriptions
.entry(target.clone())
.or_insert(HashMap::new());
let active_subscriptions = state.active_subscriptions.clone();
match maybe_raw_sub {
Ok(rx) => {
let our = state.our.clone();
let send_to_loop = state.send_to_loop.clone();
let print_tx = state.print_tx.clone();
subs.insert(
sub_id,
// this is a local sub, as in, we connect to the rpc endpoint
ActiveSub::Local(tokio::spawn(async move {
// await the subscription error and kill it if so
if let Err(e) = maintain_local_subscription(
&our,
sub_id,
rx,
&target,
&rsvp,
&send_to_loop,
)
.await
{
verbose_print(
&print_tx,
"eth: closed local subscription due to error",
)
.await;
kernel_message(
&target,
&send_to_loop,
&eth_action,
&providers,
&response_channels,
&print_tx,
),
)
.await
{
Ok(Ok(maybe_raw_sub)) => {
// send a response to the target that the subscription was successful
kernel_message(
&our,
km_id,
target.clone(),
rsvp.clone(),
false,
None,
EthResponse::Ok,
&send_to_loop,
)
.await;
let mut subs = active_subscriptions
.entry(target.clone())
.or_insert(HashMap::new());
let active_subscriptions = active_subscriptions.clone();
match maybe_raw_sub {
Ok(rx) => {
let our = our.clone();
let send_to_loop = send_to_loop.clone();
let print_tx = print_tx.clone();
subs.insert(
sub_id,
// this is a local sub, as in, we connect to the rpc endpoint
ActiveSub::Local(tokio::spawn(async move {
// await the subscription error and kill it if so
if let Err(e) = maintain_local_subscription(
&our,
rand::random(),
target.clone(),
rsvp,
true,
None,
EthSubResult::Err(e),
&send_to_loop,
)
.await;
active_subscriptions.entry(target).and_modify(|sub_map| {
sub_map.remove(&km_id);
});
}
})),
);
}
Err((provider_node, remote_sub_id)) => {
// this is a remote sub, given by a relay node
let (sender, rx) = tokio::sync::mpsc::channel(10);
let keepalive_km_id = rand::random();
let (keepalive_err_sender, keepalive_err_receiver) =
tokio::sync::mpsc::channel(1);
state
.response_channels
.insert(keepalive_km_id, keepalive_err_sender);
let our = state.our.clone();
let send_to_loop = state.send_to_loop.clone();
let print_tx = state.print_tx.clone();
let response_channels = state.response_channels.clone();
subs.insert(
remote_sub_id,
ActiveSub::Remote {
provider_node: provider_node.clone(),
handle: tokio::spawn(async move {
if let Err(e) = maintain_remote_subscription(
&our,
&provider_node,
remote_sub_id,
sub_id,
keepalive_km_id,
rx,
keepalive_err_receiver,
&target,
&rsvp,
&send_to_loop,
)
.await
{
verbose_print(
&print_tx,
"eth: closed subscription with provider node due to error",
"eth: closed local subscription due to error",
)
.await;
kernel_message(
&our,
rand::random(),
target.clone(),
None,
rsvp,
true,
None,
EthSubResult::Err(e),
@ -132,21 +87,84 @@ pub async fn create_new_subscription(
)
.await;
active_subscriptions.entry(target).and_modify(|sub_map| {
sub_map.remove(&sub_id);
sub_map.remove(&km_id);
});
response_channels.remove(&keepalive_km_id);
}
}),
sender,
},
);
})),
);
}
Err((provider_node, remote_sub_id)) => {
// this is a remote sub, given by a relay node
let (sender, rx) = tokio::sync::mpsc::channel(10);
let keepalive_km_id = rand::random();
let (keepalive_err_sender, keepalive_err_receiver) =
tokio::sync::mpsc::channel(1);
response_channels.insert(keepalive_km_id, keepalive_err_sender);
let our = our.clone();
let send_to_loop = send_to_loop.clone();
let print_tx = print_tx.clone();
let response_channels = response_channels.clone();
subs.insert(
remote_sub_id,
ActiveSub::Remote {
provider_node: provider_node.clone(),
handle: tokio::spawn(async move {
if let Err(e) = maintain_remote_subscription(
&our,
&provider_node,
remote_sub_id,
sub_id,
keepalive_km_id,
rx,
keepalive_err_receiver,
&target,
&send_to_loop,
)
.await
{
verbose_print(
&print_tx,
"eth: closed subscription with provider node due to error",
)
.await;
kernel_message(
&our,
rand::random(),
target.clone(),
None,
true,
None,
EthSubResult::Err(e),
&send_to_loop,
)
.await;
active_subscriptions.entry(target).and_modify(|sub_map| {
sub_map.remove(&sub_id);
});
response_channels.remove(&keepalive_km_id);
}
}),
sender,
},
);
}
}
}
Ok(Err(e)) => {
error_message(&our, km_id, target.clone(), e, &send_to_loop).await;
}
Err(_) => {
error_message(
&our,
km_id,
target.clone(),
EthError::RpcTimeout,
&send_to_loop,
)
.await;
}
}
Err(e) => {
error_message(&state.our, km_id, target.clone(), e, &state.send_to_loop).await;
}
}
});
}
/// terrible abuse of result in return type, yes, sorry
@ -183,14 +201,14 @@ async fn build_subscription(
None => {
if let Ok(()) = activate_url_provider(url_provider).await {
verbose_print(
print_tx,
&print_tx,
&format!("eth: activated url provider {}", url_provider.url),
)
.await;
url_provider.pubsub.as_ref().unwrap()
} else {
verbose_print(
print_tx,
&print_tx,
&format!("eth: could not activate url provider {}", url_provider.url),
)
.await;
@ -213,7 +231,7 @@ async fn build_subscription(
}
Err(rpc_error) => {
verbose_print(
print_tx,
&print_tx,
&format!(
"eth: got error from url provider {}: {}",
url_provider.url, rpc_error
@ -225,14 +243,21 @@ async fn build_subscription(
}
}
}
// now we need a response channel which will be open for the duration
// of the subscription
let (sender, mut response_receiver) = tokio::sync::mpsc::channel(1);
response_channels.insert(km_id, sender);
// we need to create our own unique sub id because in the remote provider node,
// all subs will be identified under our process address.
let remote_sub_id = rand::random();
for node_provider in &mut aps.nodes {
verbose_print(
&print_tx,
&format!(
"eth: attempting to fulfill via {}",
node_provider.kns_update.name
),
)
.await;
match forward_to_node_provider(
&our,
km_id,