This commit is contained in:
dr-frmr 2024-02-21 14:02:39 -03:00
parent 96f47a77cb
commit 6206ad50bf
No known key found for this signature in database
3 changed files with 121 additions and 14 deletions

View File

@ -9,6 +9,16 @@
"allow": [],
"deny": []
},
{
"chain_id": 11155111,
"trusted": false,
"provider": {
"RpcUrl": "wss://eth-sepolia.g.alchemy.com/v2/a4bRKYnvC0uT2l1rzVDAvldH3OPKQnKm"
},
"public": false,
"allow": [],
"deny": []
},
{
"chain_id": 11155111,
"trusted": true,

View File

@ -14,7 +14,7 @@ use std::sync::Arc;
use tokio::task::JoinHandle;
use url::Url;
/// mapping of chain id to ordered(TODO) list of providers
/// mapping of chain id to ordered lists of providers
type Providers = Arc<DashMap<u64, ActiveProviders>>;
struct ActiveProviders {
@ -37,7 +37,7 @@ struct NodeProvider {
type ActiveSubscriptions = Arc<DashMap<ProcessId, HashMap<u64, ActiveSub>>>;
enum ActiveSub {
Local(JoinHandle<Result<(), EthError>>),
Local(JoinHandle<()>),
Remote(String), // name of node providing this subscription for us
}
@ -93,6 +93,7 @@ pub async fn provider(
caps_oracle: CapMessageSender,
print_tx: PrintSender,
) -> Result<()> {
println!("provider: on\r");
let our = Arc::new(our);
let mut access_settings = AccessSettings {
@ -156,6 +157,7 @@ async fn handle_message(
return handle_eth_action(
our,
access_settings,
send_to_loop,
km,
eth_action,
providers,
@ -205,6 +207,7 @@ async fn handle_passthrough_response(
async fn handle_eth_action(
our: &str,
access_settings: &mut AccessSettings,
send_to_loop: &MessageSender,
km: KernelMessage,
eth_action: EthAction,
providers: &mut Providers,
@ -231,27 +234,119 @@ async fn handle_eth_action(
EthAction::SubscribeLogs {
sub_id,
chain_id,
kind,
params,
ref kind,
ref params,
} => {
todo!()
let new_sub = ActiveSub::Local(tokio::spawn(create_new_subscription(
our.to_string(),
km.id,
km.source.clone(),
km.rsvp,
send_to_loop.clone(),
eth_action,
providers.clone(),
)));
let mut subs = active_subscriptions
.entry(km.source.process)
.or_insert(HashMap::new());
subs.insert(sub_id, new_sub);
}
EthAction::UnsubscribeLogs(sub_id) => {
active_subscriptions
.entry(km.source.process)
.and_modify(|sub_map| {
sub_map.remove(&sub_id);
if let Some(sub) = sub_map.get_mut(&sub_id) {
match sub {
ActiveSub::Local(handle) => {
handle.abort();
}
ActiveSub::Remote(node) => {
// TODO send to them asking to abort
}
}
}
});
Ok(())
}
EthAction::Request {
chain_id,
method,
params,
} => {
todo!()
//todo
}
}
Ok(())
}
/// spawned as a task
async fn create_new_subscription(
our: String,
km_id: u64,
target: Address,
rsvp: Option<Address>,
send_to_loop: MessageSender,
eth_action: EthAction,
providers: Providers,
) {
let EthAction::SubscribeLogs {
sub_id,
chain_id,
kind,
params,
} = eth_action
else {
return;
};
let Some(aps) = providers.get_mut(&chain_id) else {
send_to_loop
.send(make_error_message(
&our,
sub_id,
target,
EthError::NoRpcForChain,
))
.await
.expect("eth: kernel sender died!");
return;
};
// first, try any url providers we have for this chain,
// then if we have none or they all fail, go to node providers.
// finally, if no provider works, return an error.
for url_provider in &aps.urls {
if let Some(pubsub) = &url_provider.pubsub {
let kind = serde_json::to_value(&kind).unwrap();
let params = serde_json::to_value(&params).unwrap();
if let Ok(id) = pubsub
.inner()
.prepare("eth_subscribe", [kind, params])
.await
{
let rx = pubsub.inner().get_raw_subscription(id).await;
if let Err(e) =
handle_subscription_stream(&our, sub_id, rx, &target, &rsvp, &send_to_loop)
.await
{
send_to_loop
.send(make_error_message(&our, sub_id, target, e))
.await
.expect("eth: kernel sender died!");
}
return;
}
}
}
for node_provider in &aps.nodes {
// todo
}
send_to_loop
.send(make_error_message(
&our,
sub_id,
target,
EthError::NoRpcForChain,
))
.await
.expect("eth: kernel sender died!");
}
async fn handle_eth_config_action(
@ -517,12 +612,12 @@ async fn handle_eth_config_action(
/// This task is responsible for connecting to the ETH RPC provider and streaming logs
/// for a specific subscription made by a process.
async fn handle_subscription_stream(
our: String,
our: &str,
sub_id: u64,
mut rx: RawSubscription,
target: Address,
rsvp: Option<Address>,
send_to_loop: MessageSender,
target: &Address,
rsvp: &Option<Address>,
send_to_loop: &MessageSender,
) -> Result<(), EthError> {
loop {
match rx.recv().await {
@ -540,7 +635,7 @@ async fn handle_subscription_stream(
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.clone(),
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: target.clone(),
@ -559,7 +654,7 @@ async fn handle_subscription_stream(
lazy_load_blob: None,
})
.await
.unwrap();
.map_err(|_| EthError::RpcError("eth: sender died".to_string()))?;
}
}
}

View File

@ -58,6 +58,8 @@ pub enum EthResponse {
#[derive(Debug, Serialize, Deserialize)]
pub enum EthError {
/// No RPC provider for the chain
NoRpcForChain,
/// Underlying transport error
TransportError(String),
/// Subscription closed