WIP mvp eth interface

This commit is contained in:
dr-frmr 2024-01-12 00:47:43 -03:00
parent 47eaadafe7
commit 28fafb0db1
No known key found for this signature in database
2 changed files with 67 additions and 316 deletions

View File

@ -15,50 +15,46 @@ use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use url::Url;
/// The ETH provider runtime process is responsible for connecting to one or more ETH RPC providers
/// and using them to service indexing requests from other apps. This could also be done by a wasm
/// app, but in the future, this process will hopefully expand in scope to perform more complex
/// indexing and ETH node responsibilities.
pub async fn provider(
our: String,
rpc_url: String,
mut send_to_loop: MessageSender,
send_to_loop: MessageSender,
mut recv_in_client: MessageReceiver,
print_tx: PrintSender,
) -> Result<()> {
bind_websockets(&our, &send_to_loop).await;
let mut connections = RpcConnections::default();
connections.ws_rpc_url = Some(rpc_url.to_string());
let connections = Arc::new(Mutex::new(connections));
match Url::parse(&rpc_url).unwrap().scheme() {
// for now, we can only handle WebSocket RPC URLs. In the future, we should
// be able to handle HTTP too, at least.
match Url::parse(&rpc_url)?.scheme() {
"http" | "https" => {
return Err(anyhow::anyhow!("eth: http provider not supported yet!"));
}
"ws" | "wss" => {
bootstrap_websocket_connections(&our, &rpc_url, connections.clone(), &mut send_to_loop)
.await
.map_err(|e| {
anyhow::anyhow!(
"eth: error bootstrapping websocket connections to {}: {:?}",
rpc_url,
e
)
})?;
}
"ws" | "wss" => {}
_ => {
return Err(anyhow::anyhow!("eth: provider must use http or ws!"));
}
}
while let Some(km) = recv_in_client.recv().await {
if let Message::Request(req) = &km.message {
match handle_request(&our, &km, req, &connections, &send_to_loop).await {
Ok(()) => {}
Err(e) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("eth: error handling request: {:?}", e),
})
.await;
}
// this module only handles requests, ignores all responses
let Message::Request(req) = &km.message else {
continue;
};
let Ok(action) = serde_json::from_slice::<EthAction>(&req.body) else {
continue;
};
match handle_request(&our, action, &send_to_loop).await {
Ok(()) => {}
Err(e) => {
let _ = print_tx
.send(Printout {
verbosity: 0,
content: format!("eth: error handling request: {:?}", e),
})
.await;
}
}
}
@ -67,88 +63,15 @@ pub async fn provider(
async fn handle_request(
our: &str,
km: &KernelMessage,
req: &Request,
connections: &Arc<Mutex<RpcConnections>>,
send_to_loop: &MessageSender,
) -> Result<()> {
if let Ok(action) = serde_json::from_slice::<HttpServerRequest>(&req.body) {
handle_http_server_request(action, km, connections).await
} else if let Ok(action) = serde_json::from_slice::<EthRequest>(&req.body) {
handle_eth_request(action, our, km, connections, send_to_loop).await
} else {
Err(anyhow::anyhow!("malformed request"))
}
}
async fn handle_http_server_request(
action: HttpServerRequest,
km: &KernelMessage,
connections: &Arc<Mutex<RpcConnections>>,
) -> Result<(), anyhow::Error> {
if let HttpServerRequest::WebSocketPush {
channel_id,
message_type,
} = action
{
if message_type == WsMessageType::Text {
let bytes = &km.lazy_load_blob.as_ref().unwrap().bytes;
let text = std::str::from_utf8(bytes).unwrap();
let mut json: serde_json::Value = serde_json::from_str(text)?;
let mut id = json["id"].as_u64().unwrap();
id += channel_id as u64;
json["id"] = serde_json::Value::from(id);
let new_text = json.to_string();
let mut connections_guard = connections.lock().await;
connections_guard
.ws_sender_ids
.insert(id as u32, channel_id);
if let Some(ws_sender) = &mut connections_guard.ws_sender {
let _ = ws_sender.send(TungsteniteMessage::Text(new_text)).await;
}
}
}
Ok(())
}
async fn handle_eth_request(
action: EthRequest,
our: &str,
km: &KernelMessage,
connections: &Arc<Mutex<RpcConnections>>,
action: EthAction,
send_to_loop: &MessageSender,
) -> Result<(), anyhow::Error> {
match action {
EthRequest::SubscribeLogs(req) => {
let handle = tokio::spawn(spawn_provider_read_stream(
our.to_string(),
req,
km.clone(),
connections.clone(),
send_to_loop.clone(),
));
let mut connections_guard = connections.lock().await;
let ws_provider_subscription = connections_guard
.ws_provider_subscriptions
.entry(km.id)
.or_insert(WsProviderSubscription::default());
ws_provider_subscription.handle = Some(handle);
drop(connections_guard);
EthAction::SubscribeLogs(req) => {
todo!()
}
EthRequest::UnsubscribeLogs(channel_id) => {
let mut connections_guard = connections.lock().await;
if let Some(ws_provider_subscription) = connections_guard
.ws_provider_subscriptions
.remove(&channel_id)
{
ws_provider_subscription.kill().await;
}
EthAction::UnsubscribeLogs(channel_id) => {
todo!()
}
}
Ok(())

View File

@ -1,222 +1,50 @@
use crate::types::*;
use dashmap::DashMap;
use ethers::prelude::Provider;
use ethers::types::{Filter, U256};
use ethers_providers::{Http, Middleware, Ws};
use futures::stream::SplitSink;
use ethers::types::{Filter, Log, U256};
use ethers_providers::{Middleware, Ws};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::task::JoinHandle;
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
pub type WsRequestIds = Arc<DashMap<u32, u32>>;
/// The Request type that can be made to eth:sys:nectar. Currently primitive, this
/// enum will expand to support more actions in the future.
///
/// Will be serialized and deserialized using `serde_json::to_vec` and `serde_json::from_slice`.
#[derive(Debug, Serialize, Deserialize)]
pub enum EthAction {
/// Subscribe to logs with a custom filter. ID is to be used to unsubscribe.
SubscribeLogs { sub_id: u64, filter: Filter },
/// Kill a SubscribeLogs subscription of a given ID, to stop getting updates.
UnsubscribeLogs(u64),
}
/// The Request type which a process will get from using SubscribeLogs to subscribe
/// to a log.
///
/// Will be serialized and deserialized using `serde_json::to_vec` and `serde_json::from_slice`.
#[derive(Debug, Serialize, Deserialize)]
pub enum EthSubEvent {
Log(Log),
}
//
// Internal types
//
/// Primary state object of the `eth` module
pub struct RpcConnections {
pub ws_rpc_url: String,
pub ws_provider_subscriptions: HashMap<u64, WsProviderSubscription>,
}
#[derive(Default)]
pub struct WsProviderSubscription {
pub handle: Option<JoinHandle<()>>,
pub provider: Option<Provider<Ws>>,
pub subscription: Option<U256>,
pub handle: JoinHandle<()>,
pub provider: Provider<Ws>,
pub subscription: U256,
}
impl WsProviderSubscription {
pub async fn kill(&self) {
if let Some(provider) = &self.provider {
if let Some(subscription) = &self.subscription {
let _ = provider.unsubscribe(subscription).await;
}
}
if let Some(handle) = &self.handle {
handle.abort();
}
let _ = self.provider.unsubscribe(self.subscription).await;
self.handle.abort();
}
}
pub struct RpcConnections {
pub ws_sender:
Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>>,
pub ws_sender_ids: WsRequestIds,
pub ws_rpc_url: Option<String>,
pub ws_provider_subscriptions: HashMap<u64, WsProviderSubscription>,
pub http_rpc_url: Option<String>,
pub uq_provider: Option<NodeId>,
pub ws_provider: Option<Provider<Ws>>,
pub http_provider: Option<Provider<Http>>,
}
impl Default for RpcConnections {
fn default() -> Self {
Self {
ws_sender: None,
ws_sender_ids: Arc::new(DashMap::new()),
ws_provider: None,
ws_provider_subscriptions: HashMap::<u64, WsProviderSubscription>::new(),
http_provider: None,
uq_provider: None,
http_rpc_url: None,
ws_rpc_url: None,
}
}
}
// #[derive(Debug, Serialize, Deserialize)]
// pub enum EthRpcError {
// NoRsvp,
// BadJson,
// NoJson,
// EventSubscriptionFailed,
// }
// impl EthRpcError {
// pub fn _kind(&self) -> &str {
// match *self {
// EthRpcError::NoRsvp { .. } => "NoRsvp",
// EthRpcError::BadJson { .. } => "BapJson",
// EthRpcError::NoJson { .. } => "NoJson",
// EthRpcError::EventSubscriptionFailed { .. } => "EventSubscriptionFailed",
// }
// }
// }
#[derive(Debug, Serialize, Deserialize)]
pub struct SubscribeLogs {
pub filter: Filter,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum EthRequest {
SubscribeLogs(SubscribeLogs),
UnsubscribeLogs(u64),
}
// #[derive(Debug, Serialize, Deserialize)]
// struct EthEventSubscription {
// addresses: Option<Vec<String>>,
// from_block: Option<u64>,
// to_block: Option<u64>,
// events: Option<Vec<String>>, // aka topic0s
// topic1: Option<U256>,
// topic2: Option<U256>,
// topic3: Option<U256>,
// }
// #[derive(Debug, Serialize, Deserialize)]
// pub enum ProviderAction {
// HttpServerRequest(HttpServerRequest),
// EthRpcAction(EthRpcAction),
// }
// #[derive(Debug, Serialize, Deserialize)]
// pub enum EthRpcAction {
// JsonRpcRequest(String),
// Eth(EthMethod),
// Debug(DebugMethod),
// Net(NetMethod),
// Trace(TraceMethod),
// TxPool(TxPoolMethod),
// }
// #[derive(Debug, Serialize, Deserialize)]
// pub enum DebugMethod {
// GetRawBlock,
// GetRawHeader,
// GetRawReceipts,
// GetRawTransaction,
// TraceBlock,
// TraceBlockByHash,
// TraceBlockByNumber,
// TraceCall,
// TraceCallMany,
// TraceTransaction,
// }
// #[derive(Debug, Serialize, Deserialize)]
// pub enum EthMethod {
// Accounts,
// BlockNumber,
// Call,
// ChainId,
// CreateAccessList,
// EstimateGas,
// FeeHistory,
// GasPrice,
// GetBalance,
// GetBlockByHash,
// GetBlockByNumber,
// GetBlockReceipts,
// GetBlockTransactionCountByHash,
// GetBlockTransactionCountByNumber,
// GetCode,
// GetFilterChanges,
// GetFilterLogs,
// GetLogs,
// GetStorageAt,
// GetTransactionByBlockHashAndIndex,
// GetTransactionByBlockNumberAndIndex,
// GetTransactionByHash,
// GetTransactionCount,
// GetTransactionReceipt,
// GetUncleByBlockHashAndIndex,
// GetUncleByBlockNumberAndIndex,
// GetUncleCountByBlockHash,
// GetUncleCountByBlockNumber,
// MaxPriorityFeePerGas,
// Mining,
// NewBlockFilter,
// NewFilter,
// NewPendingTransactionFilter,
// ProtocolVersion,
// SendRawTransaction,
// SendTransaction,
// Sign,
// SignTransaction,
// SignTypedData,
// Subscribe,
// Syncing,
// UninstallFilter,
// }
// #[derive(Debug, Serialize, Deserialize)]
// pub enum NetMethod {
// Listening,
// PeerCount,
// Version,
// }
// #[derive(Debug, Serialize, Deserialize)]
// pub enum TraceMethod {
// Call,
// CallMany,
// Get,
// RawTransaction,
// ReplayBlockTransactions,
// ReplayTransaction,
// Transaction,
// }
// #[derive(Debug, Serialize, Deserialize)]
// pub enum TxPoolMethod {
// Content,
// Inspect,
// Status,
// }
// #[derive(Debug, Serialize, Deserialize)]
// pub enum EthProviderError {
// NoRsvp,
// BadJson,
// NoJson,
// EventSubscriptionFailed,
// }
// impl EthProviderError {
// pub fn _kind(&self) -> &str {
// match *self {
// EthProviderError::NoRsvp { .. } => "NoRsvp",
// EthProviderError::BadJson { .. } => "BapJson",
// EthProviderError::NoJson { .. } => "NoJson",
// EthProviderError::EventSubscriptionFailed { .. } => "EventSubscriptionFailed",
// }
// }
// }