Merge pull request #519 from kinode-dao/hf/fix-eth-provider

eth: fix remote provider
This commit is contained in:
nick.kino 2024-08-30 16:36:55 -07:00 committed by GitHub
commit e1923d94fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 113 additions and 38 deletions

1
Cargo.lock generated
View File

@ -3547,6 +3547,7 @@ dependencies = [
"hex",
"hmac",
"http 1.1.0",
"indexmap",
"jwt",
"kit 0.6.10",
"lazy_static",

View File

@ -58,6 +58,7 @@ generic-array = "0.14.7"
hex = "0.4.3"
hmac = "0.12"
http = "1.1.0"
indexmap = "2.4"
jwt = "0.16"
lib = { path = "../lib" }
lazy_static = "1.4.0"

View File

@ -4,11 +4,14 @@ use alloy::rpc::client::WsConnect;
use alloy::rpc::json_rpc::RpcError;
use anyhow::Result;
use dashmap::DashMap;
use indexmap::IndexMap;
use lib::types::core::*;
use lib::types::eth::*;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use url::Url;
@ -158,8 +161,15 @@ struct ModuleState {
send_to_loop: MessageSender,
/// our sender for terminal prints
print_tx: PrintSender,
/// cache of ETH requests
request_cache: RequestCache,
}
type RequestCache = Arc<Mutex<IndexMap<Vec<u8>, (EthResponse, Instant)>>>;
const DELAY_MS: u64 = 1_000;
const MAX_REQUEST_CACHE_LEN: usize = 500;
/// TODO replace with alloy abstraction
fn valid_method(method: &str) -> Option<&'static str> {
match method {
@ -240,6 +250,7 @@ pub async fn provider(
response_channels: Arc::new(DashMap::new()),
send_to_loop,
print_tx,
request_cache: Arc::new(Mutex::new(IndexMap::new())),
};
// convert saved configs into data structure that we will use to route queries
@ -598,13 +609,14 @@ async fn handle_eth_action(
}
}
EthAction::Request { .. } => {
let (sender, receiver) = tokio::sync::mpsc::channel(1);
let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
state.response_channels.insert(km.id, sender);
let our = state.our.to_string();
let send_to_loop = state.send_to_loop.clone();
let providers = state.providers.clone();
let response_channels = state.response_channels.clone();
let print_tx = state.print_tx.clone();
let mut request_cache = Arc::clone(&state.request_cache);
tokio::spawn(async move {
match tokio::time::timeout(
std::time::Duration::from_secs(timeout),
@ -612,26 +624,72 @@ async fn handle_eth_action(
&our,
km.id,
&send_to_loop,
eth_action,
providers,
receiver,
&eth_action,
&providers,
&mut receiver,
&print_tx,
&mut request_cache,
),
)
.await
{
Ok(response) => {
kernel_message(
&our,
km.id,
km.rsvp.unwrap_or(km.source),
None,
false,
None,
response,
&send_to_loop,
)
.await;
if let EthResponse::Err(EthError::RpcError(_)) = response {
// try one more time after 1s delay in case RPC is rate limiting
std::thread::sleep(std::time::Duration::from_millis(DELAY_MS));
match tokio::time::timeout(
std::time::Duration::from_secs(timeout),
fulfill_request(
&our,
km.id,
&send_to_loop,
&eth_action,
&providers,
&mut receiver,
&print_tx,
&mut request_cache,
),
)
.await
{
Ok(response) => {
kernel_message(
&our,
km.id,
km.rsvp.clone().unwrap_or(km.source.clone()),
None,
false,
None,
response,
&send_to_loop,
)
.await;
}
Err(_) => {
// task timeout
error_message(
&our,
km.id,
km.source.clone(),
EthError::RpcTimeout,
&send_to_loop,
)
.await;
}
}
} else {
kernel_message(
&our,
km.id,
km.rsvp.unwrap_or(km.source),
None,
false,
None,
response,
&send_to_loop,
)
.await;
}
}
Err(_) => {
// task timeout
@ -650,19 +708,31 @@ async fn fulfill_request(
our: &str,
km_id: u64,
send_to_loop: &MessageSender,
eth_action: EthAction,
providers: Providers,
mut remote_request_receiver: ProcessMessageReceiver,
eth_action: &EthAction,
providers: &Providers,
remote_request_receiver: &mut ProcessMessageReceiver,
print_tx: &PrintSender,
request_cache: &mut RequestCache,
) -> EthResponse {
let serialized_action = serde_json::to_vec(eth_action).unwrap();
let EthAction::Request {
chain_id,
ref chain_id,
ref method,
ref params,
} = eth_action
else {
return EthResponse::Err(EthError::PermissionDenied); // will never hit
};
{
let mut request_cache = request_cache.lock().await;
if let Some((cache_hit, time_of_hit)) = request_cache.shift_remove(&serialized_action) {
// refresh cache entry (it is most recently accessed) & return it
if time_of_hit.elapsed() < Duration::from_millis(DELAY_MS) {
request_cache.insert(serialized_action, (cache_hit.clone(), time_of_hit));
return cache_hit;
}
}
}
let Some(method) = valid_method(&method) else {
return EthResponse::Err(EthError::InvalidMethod(method.to_string()));
};
@ -703,7 +773,7 @@ async fn fulfill_request(
match pubsub.raw_request(method.into(), params.clone()).await {
Ok(value) => {
let mut is_replacement_successful = true;
providers.entry(chain_id).and_modify(|aps| {
providers.entry(chain_id.clone()).and_modify(|aps| {
let Some(index) = find_index(
&aps.urls.iter().map(|u| u.url.as_str()).collect(),
&url_provider.url,
@ -724,7 +794,14 @@ async fn fulfill_request(
)
.await;
}
return EthResponse::Response { value };
let response = EthResponse::Response { value };
let mut request_cache = request_cache.lock().await;
if request_cache.len() >= MAX_REQUEST_CACHE_LEN {
// drop 10% oldest cache entries
request_cache.drain(0..MAX_REQUEST_CACHE_LEN / 10);
}
request_cache.insert(serialized_action, (response.clone(), Instant::now()));
return response;
}
Err(rpc_error) => {
verbose_print(
@ -741,7 +818,7 @@ async fn fulfill_request(
}
// this provider failed and needs to be reset
let mut is_reset_successful = true;
providers.entry(chain_id).and_modify(|aps| {
providers.entry(chain_id.clone()).and_modify(|aps| {
let Some(index) = find_index(
&aps.urls.iter().map(|u| u.url.as_str()).collect(),
&url_provider.url,
@ -787,7 +864,7 @@ async fn fulfill_request(
node_provider,
eth_action.clone(),
send_to_loop,
&mut remote_request_receiver,
remote_request_receiver,
)
.await;
if let EthResponse::Err(e) = response {

View File

@ -386,24 +386,20 @@ async fn maintain_local_subscription(
mut close_receiver: tokio::sync::mpsc::Receiver<bool>,
print_tx: &PrintSender,
) -> Result<(), EthSubError> {
loop {
let e = loop {
tokio::select! {
_ = close_receiver.recv() => {
unsubscribe(rx, &chain_id, providers, print_tx).await;
//unsubscribe(rx, &chain_id, providers, print_tx).await;
return Ok(());
},
value = rx.recv() => {
let Ok(value) = value else {
break;
let value = match value {
Ok(v) => v,
Err(e) => break e.to_string(),
};
let result: SubscriptionResult = match serde_json::from_str(value.get()) {
Ok(res) => res,
Err(e) => {
return Err(EthSubError {
id: sub_id,
error: e.to_string(),
});
}
Err(e) => break e.to_string(),
};
kernel_message(
our,
@ -418,16 +414,16 @@ async fn maintain_local_subscription(
.await;
},
}
}
};
active_subscriptions
.entry(target.clone())
.and_modify(|sub_map| {
sub_map.remove(&sub_id);
});
unsubscribe(rx, &chain_id, providers, print_tx).await;
//unsubscribe(rx, &chain_id, providers, print_tx).await;
Err(EthSubError {
id: sub_id,
error: format!("subscription ({target}) closed unexpectedly"),
error: format!("subscription ({target}) closed unexpectedly {e}"),
})
}

View File

@ -53,14 +53,14 @@ pub struct EthSubError {
///
/// In the case of an [`EthAction::SubscribeLogs`] request, the response will indicate if
/// the subscription was successfully created or not.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum EthResponse {
Ok,
Response { value: serde_json::Value },
Err(EthError),
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum EthError {
/// RPC provider returned an error
RpcError(ErrorPayload),