eth: spawn threads for requests, send provider

This commit is contained in:
bitful-pannul 2024-02-05 15:39:14 -03:00
parent 49ae108f3a
commit 49ec4509b5
2 changed files with 94 additions and 124 deletions

View File

@ -1,12 +1,13 @@
use crate::eth::types::*;
use crate::types::*;
use alloy_pubsub::RawSubscription;
use alloy_pubsub::{PubSubFrontend, RawSubscription};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types::pubsub::SubscriptionResult;
use alloy_transport_ws::WsConnect;
use anyhow::Result;
use std::collections::HashMap;
use dashmap::DashMap;
use std::sync::Arc;
use tokio::task::JoinHandle;
use url::Url;
/// The ETH provider runtime process is responsible for connecting to one or more ETH RPC providers
@ -18,7 +19,7 @@ pub async fn provider(
rpc_url: String,
send_to_loop: MessageSender,
mut recv_in_client: MessageReceiver,
print_tx: PrintSender,
_print_tx: PrintSender,
) -> Result<()> {
let our = Arc::new(our);
// for now, we can only handle WebSocket RPC URLs. In the future, we should
@ -51,118 +52,93 @@ pub async fn provider(
let provider = alloy_providers::provider::Provider::new_with_client(client);
let mut connections = RpcConnections {
provider,
ws_provider_subscriptions: HashMap::new(),
};
// handles of longrunning subscriptions.
let connections: DashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>> = DashMap::new();
// turn into dashmap so we can share across threads
let connections = Arc::new(connections);
let provider = Arc::new(provider);
while let Some(km) = recv_in_client.recv().await {
// this module only handles requests, ignores all responses
let Message::Request(req) = &km.message else {
continue;
};
let Ok(action) = serde_json::from_slice::<EthAction>(&req.body) else {
continue;
};
match handle_request(
our.clone(),
&km.rsvp.unwrap_or(km.source.clone()),
km.id,
action,
&mut connections,
&send_to_loop,
)
.await
{
Ok(()) => {}
Err(e) => {
let _ = print_tx
.send(Printout {
verbosity: 0,
content: format!("eth: error handling request: {:?}", e),
})
.await;
if req.expects_response.is_some() {
send_to_loop
.send(KernelMessage {
id: km.id,
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: Address {
node: our.to_string(),
process: km.source.process.clone(),
},
rsvp: None,
message: Message::Response((
Response {
inherit: false,
body: serde_json::to_vec::<Result<(), EthError>>(&Err(e))?,
metadata: None,
capabilities: vec![],
},
None,
)),
lazy_load_blob: None,
})
.await?;
}
// clone Arcs
let our = our.clone();
let send_to_loop = send_to_loop.clone();
let provider = provider.clone();
let connections = connections.clone();
tokio::spawn(async move {
if let Err(e) = handle_request(
&our,
&km,
&send_to_loop,
provider.clone(),
connections.clone(),
)
.await
{
println!("got error: {:?}", e);
}
}
});
}
Err(anyhow::anyhow!("eth: fatal: message receiver closed!"))
}
async fn handle_request(
our: Arc<String>,
target: &Address,
id: u64,
action: EthAction,
connections: &mut RpcConnections,
our: &str,
km: &KernelMessage,
send_to_loop: &MessageSender,
provider: Arc<alloy_providers::provider::Provider<PubSubFrontend>>,
connections: Arc<DashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>>>,
) -> Result<(), EthError> {
match action {
let Message::Request(req) = &km.message else {
return Err(EthError::ProviderError(
"eth: only accepts requests".to_string(),
));
};
let action = serde_json::from_slice::<EthAction>(&req.body).map_err(|e| {
EthError::ProviderError(format!("eth: failed to deserialize request: {:?}", e))
})?;
// we might want some of these in payloads.. sub items?
let return_body: EthResponse = match action {
EthAction::SubscribeLogs {
sub_id,
kind,
params,
} => {
let sub_id = (target.process.clone(), sub_id);
let sub_id = (km.target.process.clone(), sub_id);
let kind = serde_json::to_value(&kind).unwrap();
let params = serde_json::to_value(&params).unwrap();
let id = connections
.provider
let id = provider
.inner()
.prepare("eth_subscribe", [kind, params])
.await
.unwrap();
let rx = connections.provider.inner().get_raw_subscription(id).await;
let target = km.source.clone(); // rsvp?
let rx = provider.inner().get_raw_subscription(id).await;
let handle = tokio::spawn(handle_subscription_stream(
our.clone(),
our.to_string(),
sub_id.1.clone(),
rx,
target.clone(),
target,
send_to_loop.clone(),
));
connections.ws_provider_subscriptions.insert(sub_id, handle);
Ok(())
connections.insert(sub_id, handle);
EthResponse::Ok
}
EthAction::UnsubscribeLogs(sub_id) => {
let sub_id = (target.process.clone(), sub_id);
let sub_id = (km.target.process.clone(), sub_id);
let handle = connections
.ws_provider_subscriptions
.remove(&sub_id)
.ok_or(EthError::SubscriptionNotFound)?;
handle.abort();
Ok(())
handle.1.abort();
EthResponse::Ok
}
EthAction::Request { method, params } => {
let method = to_static_str(&method).ok_or(EthError::ProviderError(format!(
@ -171,51 +147,51 @@ async fn handle_request(
)))?;
// throw transportErrorKinds straight back to process
let ass: serde_json::Value = connections
.provider
.inner()
.prepare(method, params)
.await
.unwrap();
// send response back to loop:
send_to_loop
.send(KernelMessage {
id,
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: target.clone(),
rsvp: None,
message: Message::Response((
Response {
inherit: false,
body: serde_json::to_vec(&EthResponse::Request(ass)).unwrap(),
metadata: None,
capabilities: vec![],
},
None,
)),
lazy_load_blob: None,
})
.await
.unwrap();
Ok(())
}
_ => {
println!("eth: unhandled action: {:?}", action);
// will be handled soon.
let response: serde_json::Value =
provider.inner().prepare(method, params).await.unwrap();
Ok(())
EthResponse::Request(response)
}
}
};
// todo: fix km.clone() and metadata.clone()
if let Some(target) = km.clone().rsvp.or_else(|| {
req.expects_response.map(|_| Address {
node: our.to_string(),
process: km.source.process.clone(),
})
}) {
let response = KernelMessage {
id: km.id,
source: Address {
node: our.to_string(),
process: VFS_PROCESS_ID.clone(),
},
target: target.clone(),
rsvp: None,
message: Message::Response((
Response {
inherit: false,
body: serde_json::to_vec(&return_body).unwrap(),
metadata: req.metadata.clone(),
capabilities: vec![],
},
None,
)),
lazy_load_blob: None,
};
// Send the response, handling potential errors appropriately
let _ = send_to_loop.send(response).await;
};
Ok(())
}
/// Executed as a long-lived task. The JoinHandle is stored in the `connections` map.
/// This task is responsible for connecting to the ETH RPC provider and streaming logs
/// for a specific subscription made by a process.
async fn handle_subscription_stream(
our: Arc<String>,
our: String,
sub_id: u64,
mut rx: RawSubscription,
target: Address,
@ -228,12 +204,13 @@ async fn handle_subscription_stream(
// return Err(EthError::ProviderError(format!("{:?}", e)));
}
Ok(value) => {
let event: SubscriptionResult = serde_json::from_str(value.get()).unwrap();
let event: SubscriptionResult = serde_json::from_str(value.get())
.map_err(|e| EthError::ProviderError(format!("{:?}", e)))?;
send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.to_string(),
node: our,
process: ETH_PROCESS_ID.clone(),
},
target: target.clone(),

View File

@ -85,10 +85,3 @@ pub fn to_static_str(method: &str) -> Option<&'static str> {
_ => None,
}
}
/// Primary state object of the `eth` module
pub struct RpcConnections {
// todo generics when they work properly: pub struct RpcConnections<T>, where T: Transport
pub provider: Provider<PubSubFrontend>,
pub ws_provider_subscriptions: HashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>>,
}