From 326e68d54a0148862429fd26413e63d3c35c65cc Mon Sep 17 00:00:00 2001 From: commercium-sys Date: Fri, 22 Dec 2023 11:23:45 -0800 Subject: [PATCH] moved types into types.rs for eth --- src/eth/mod.rs | 1 + src/eth/provider.rs | 310 +++----------------------------------------- src/eth/types.rs | 43 +++++- 3 files changed, 59 insertions(+), 295 deletions(-) diff --git a/src/eth/mod.rs b/src/eth/mod.rs index 8336397f..2b68c41d 100644 --- a/src/eth/mod.rs +++ b/src/eth/mod.rs @@ -1 +1,2 @@ pub mod provider; +pub mod types; \ No newline at end of file diff --git a/src/eth/provider.rs b/src/eth/provider.rs index e61444a6..52dbc9e9 100644 --- a/src/eth/provider.rs +++ b/src/eth/provider.rs @@ -9,42 +9,6 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use std::collections::HashMap; -#[derive(Debug, Serialize, Deserialize)] -enum EthRpcAction { - SubscribeEvents(EthEventSubscription), - Unsubscribe(u64), -} - -#[derive(Debug, Serialize, Deserialize)] -struct EthEventSubscription { - addresses: Option>, - from_block: Option, - to_block: Option, - events: Option>, // aka topic0s - topic1: Option, - topic2: Option, - topic3: Option, -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum EthProviderError { - NoRsvp, - BadJson, - NoJson, - EventSubscriptionFailed, -} - -impl EthProviderError { - pub fn _kind(&self) -> &str { - match *self { - EthProviderError::NoRsvp { .. } => "NoRsvp", - EthProviderError::BadJson { .. } => "BapJson", - EthProviderError::NoJson { .. } => "NoJson", - EthProviderError::EventSubscriptionFailed { .. } => "EventSubscriptionFailed", - } - } -} - pub async fn provider( our: String, rpc_url: String, @@ -84,12 +48,15 @@ pub async fn provider( while let Some(km) = recv_in_client.recv().await { match km.message { - Message::Request(request) => { + Message::Request(Request { ref ipc, .. }) => { println!("eth request"); + handle_request(ipc)?; } - Message::Response(response) => { + Message::Response((Response { ref ipc, .. }, ..)) => { println!("eth response"); + handle_response(ipc)?; } + Message::Response(_) => todo!(), _ => {} } @@ -99,264 +66,25 @@ pub async fn provider( Ok(()) } -pub async fn eth_provider( - our: String, - rpc_url: String, - send_to_loop: MessageSender, - mut recv_in_client: MessageReceiver, - print_tx: PrintSender, -) -> Result<()> { - let mut subscriptions = - HashMap::>>::new(); +fn handle_request (ipc: &Vec) -> Result<()> { - while let Some(message) = recv_in_client.recv().await { - let our = our.clone(); - let send_to_loop = send_to_loop.clone(); - let print_tx = print_tx.clone(); + let Ok(message) = serde_json::from_slice::(ipc) else { + return Ok(()); + }; - let KernelMessage { - ref source, - ref rsvp, - message: - Message::Request(Request { - expects_response, - ipc: ref json_bytes, - .. - }), - .. - } = message - else { - panic!("eth_rpc: bad message"); - }; - - let target = if expects_response.is_some() { - Address { - node: our.clone(), - process: source.process.clone(), - } - } else { - let Some(rsvp) = rsvp else { - send_to_loop - .send(make_error_message( - our.clone(), - &message, - EthProviderError::NoRsvp, - )) - .await - .unwrap(); - continue; - }; - rsvp.clone() - }; - - // let call_data = content.payload.bytes.content.clone().unwrap_or(vec![]); - - let Ok(action) = serde_json::from_slice::(json_bytes) else { - send_to_loop - .send(make_error_message( - our.clone(), - &message, - EthProviderError::BadJson, - )) - .await - .unwrap(); - continue; - }; - - match action { - EthRpcAction::SubscribeEvents(sub) => { - send_to_loop - .send(KernelMessage { - id: message.id, - source: Address { - node: our.clone(), - process: ETH_PROCESS_ID.clone(), - }, - target: match &message.rsvp { - None => message.source.clone(), - Some(rsvp) => rsvp.clone(), - }, - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - ipc: serde_json::to_vec::>(&Ok( - message.id, - )) - .unwrap(), - metadata: None, - }, - None, - )), - payload: None, - signed_capabilities: None, - }) - .await - .unwrap(); - - let mut filter = Filter::new(); - if let Some(addresses) = sub.addresses { - filter = filter.address(ValueOrArray::Array( - addresses.into_iter().map(|s| s.parse().unwrap()).collect(), - )); - } - - // TODO is there a cleaner way to do all of this? - if let Some(from_block) = sub.from_block { - filter = filter.from_block(from_block); - } - if let Some(to_block) = sub.to_block { - filter = filter.to_block(to_block); - } - if let Some(events) = sub.events { - filter = filter.events(&events); - } - if let Some(topic1) = sub.topic1 { - filter = filter.topic1(topic1); - } - if let Some(topic2) = sub.topic2 { - filter = filter.topic2(topic2); - } - if let Some(topic3) = sub.topic3 { - filter = filter.topic3(topic3); - } - - let rpc_url = rpc_url.clone(); - - let handle = tokio::task::spawn(async move { - // when connection dies you need to restart at the last block you saw - // otherwise you replay events unnecessarily - let mut from_block: U64 = - filter.clone().get_from_block().unwrap_or(U64::zero()); - loop { - // NOTE give main.rs uses rpc_url and panics if it can't connect, we do - // know that this should work in theory...can keep trying to reconnect - let Ok(ws_rpc) = Provider::::connect(rpc_url.clone()).await else { - // TODO grab and print error - let _ = print_tx - .send(Printout { - verbosity: 0, - content: "eth_rpc: connection failed, retrying in 5s" - .to_string(), - }) - .await; - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - continue; - }; - - match ws_rpc - .subscribe_logs(&filter.clone().from_block(from_block)) - .await - { - Err(e) => { - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("eth_rpc: subscription error: {:?}", e), - }) - .await; - continue; - } - Ok(mut stream) => { - let _ = print_tx - .send(Printout { - verbosity: 0, - content: "eth_rpc: connection established".to_string(), - }) - .await; - - while let Some(event) = stream.next().await { - send_to_loop.send( - KernelMessage { - id: rand::random(), - source: Address { - node: our.clone(), - process: ETH_PROCESS_ID.clone(), - }, - target: target.clone(), - rsvp: None, - message: Message::Request(Request { - inherit: false, - expects_response: None, - ipc: json!({ - "EventSubscription": serde_json::to_value(event.clone()).unwrap() - }).to_string().into_bytes(), - metadata: None, - }), - payload: None, - signed_capabilities: None, - } - ).await.unwrap(); - from_block = event.block_number.unwrap_or(from_block); - } - let _ = print_tx - .send(Printout { - verbosity: 0, - content: - "eth_rpc: subscription connection lost, reconnecting" - .to_string(), - }) - .await; - } - }; - } - }); - subscriptions.insert(message.id, handle); - } - EthRpcAction::Unsubscribe(sub_id) => { - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("eth_rpc: unsubscribing from {}", sub_id), - }) - .await; - - if let Some(handle) = subscriptions.remove(&sub_id) { - handle.abort(); - } else { - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("eth_rpc: no task found with id {}", sub_id), - }) - .await; - } - } - } - } + println!("request message {:?}", message); Ok(()) } -// -// helpers -// +fn handle_response (ipc: &Vec) -> Result<()> { -fn make_error_message( - our_name: String, - km: &KernelMessage, - error: EthProviderError, -) -> KernelMessage { - KernelMessage { - id: km.id, - source: Address { - node: our_name.clone(), - process: ETH_PROCESS_ID.clone(), - }, - target: match &km.rsvp { - None => km.source.clone(), - Some(rsvp) => rsvp.clone(), - }, - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - ipc: serde_json::to_vec::>(&Err(error)).unwrap(), - metadata: None, - }, - None, - )), - payload: None, - signed_capabilities: None, - } + let Ok(message) = serde_json::from_slice::(ipc) else { + return Ok(()); + }; + + println!("response message {:?}", message); + + Ok(()) } + diff --git a/src/eth/types.rs b/src/eth/types.rs index 574799ac..f17a0a4f 100644 --- a/src/eth/types.rs +++ b/src/eth/types.rs @@ -1,3 +1,7 @@ +use crate::http::types::HttpServerAction; +use ethers::types::{ValueOrArray, U256, U64}; +use serde::{Deserialize, Serialize}; + #[derive(Debug, Serialize, Deserialize)] struct EthEventSubscription { addresses: Option>, @@ -9,14 +13,22 @@ struct EthEventSubscription { topic3: Option, } -struct EthAccounts { - addresses: Option>, +#[derive(Debug, Serialize, Deserialize)] +pub enum ProviderAction { + HttpServerAction(HttpServerAction), + EthRpcAction(EthRpcAction), } -struct EthBlockNumber { - +#[derive(Debug, Serialize, Deserialize)] +pub enum EthRpcAction { + Eth(EthMethod), + Debug(DebugMethod), + Net(NetMethod), + Trace(TraceMethod), + TxPool(TxPoolMethod), } +#[derive(Debug, Serialize, Deserialize)] enum DebugMethod { GetRawBlock, GetRawHeader, @@ -30,6 +42,7 @@ enum DebugMethod { TraceTransaction, } +#[derive(Debug, Serialize, Deserialize)] enum EthMethod { Accounts, BlockNumber, @@ -75,12 +88,14 @@ enum EthMethod { UninstallFilter, } +#[derive(Debug, Serialize, Deserialize)] enum NetMethod { Listening, PeerCount, Version, } +#[derive(Debug, Serialize, Deserialize)] enum TraceMethod { Call, CallMany, @@ -91,8 +106,28 @@ enum TraceMethod { Transaction, } +#[derive(Debug, Serialize, Deserialize)] enum TxPoolMethod { Content, Inspect, Status, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum EthProviderError { + NoRsvp, + BadJson, + NoJson, + EventSubscriptionFailed, +} + +impl EthProviderError { + pub fn _kind(&self) -> &str { + match *self { + EthProviderError::NoRsvp { .. } => "NoRsvp", + EthProviderError::BadJson { .. } => "BapJson", + EthProviderError::NoJson { .. } => "NoJson", + EthProviderError::EventSubscriptionFailed { .. } => "EventSubscriptionFailed", + } + } } \ No newline at end of file