diff --git a/Cargo.toml b/Cargo.toml index a97858d6..e349c1ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,6 @@ zip = "0.6" [features] simulation-mode = [] -eth = [] [dependencies] aes-gcm = "0.10.2" diff --git a/modules/app_store/pkg/manifest.json b/modules/app_store/pkg/manifest.json index ed3636ec..a73e2fe6 100644 --- a/modules/app_store/pkg/manifest.json +++ b/modules/app_store/pkg/manifest.json @@ -12,7 +12,7 @@ "net:sys:uqbar", "vfs:sys:uqbar", "kernel:sys:uqbar", - "eth_rpc:sys:uqbar", + "eth:sys:uqbar", { "process": "vfs:sys:uqbar", "params": { diff --git a/modules/qns_indexer/pkg/manifest.json b/modules/qns_indexer/pkg/manifest.json index e482a7e2..76cb755c 100644 --- a/modules/qns_indexer/pkg/manifest.json +++ b/modules/qns_indexer/pkg/manifest.json @@ -7,7 +7,7 @@ "request_messaging": [ "net:sys:uqbar", "http_server:sys:uqbar", - "eth_rpc:sys:uqbar" + "eth:sys:uqbar" ], "public": true } diff --git a/modules/qns_indexer/qns_indexer/src/lib.rs b/modules/qns_indexer/qns_indexer/src/lib.rs index 523bd850..4b3eddac 100644 --- a/modules/qns_indexer/qns_indexer/src/lib.rs +++ b/modules/qns_indexer/qns_indexer/src/lib.rs @@ -155,7 +155,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { continue; }; let Message::Request { source, ipc, .. } = message else { - // TODO we should store the subscription ID for eth_rpc + // TODO we should store the subscription ID for eth // incase we want to cancel/reset it continue; }; diff --git a/src/eth/provider.rs b/src/eth/provider.rs index 51302aec..6198e730 100644 --- a/src/eth/provider.rs +++ b/src/eth/provider.rs @@ -33,7 +33,7 @@ pub async fn provider( mut recv_in_client: MessageReceiver, print_tx: PrintSender, ) -> Result<()> { - println!("eth_rpc: starting"); + println!("eth: starting"); let open_ws = KernelMessage { id: rand::random(), diff --git a/src/eth_rpc.rs b/src/eth_rpc.rs deleted file mode 100644 index d8a2abfa..00000000 --- a/src/eth_rpc.rs +++ /dev/null @@ -1,301 +0,0 @@ -use crate::types::*; -use anyhow::Result; -use ethers::core::types::Filter; -use ethers::prelude::Provider; -use ethers::types::{ValueOrArray, U256, U64}; -use ethers_providers::{Middleware, StreamExt, Ws}; -use serde::{Deserialize, Serialize}; -use serde_json::json; -use std::collections::HashMap; - -#[derive(Debug, Serialize, Deserialize)] -enum EthRpcAction { - SubscribeEvents(EthEventSubscription), - Unsubscribe(u64), -} - -#[derive(Debug, Serialize, Deserialize)] -struct EthEventSubscription { - addresses: Option>, - from_block: Option, - to_block: Option, - events: Option>, // aka topic0s - topic1: Option, - topic2: Option, - topic3: Option, -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum EthRpcError { - NoRsvp, - BadJson, - NoJson, - EventSubscriptionFailed, -} -impl EthRpcError { - pub fn _kind(&self) -> &str { - match *self { - EthRpcError::NoRsvp { .. } => "NoRsvp", - EthRpcError::BadJson { .. } => "BapJson", - EthRpcError::NoJson { .. } => "NoJson", - EthRpcError::EventSubscriptionFailed { .. } => "EventSubscriptionFailed", - } - } -} - -pub async fn eth_rpc( - our: String, - rpc_url: String, - send_to_loop: MessageSender, - mut recv_in_client: MessageReceiver, - print_tx: PrintSender, -) -> Result<()> { - let mut subscriptions = HashMap::>>::new(); - - while let Some(message) = recv_in_client.recv().await { - let our = our.clone(); - let send_to_loop = send_to_loop.clone(); - let print_tx = print_tx.clone(); - - let KernelMessage { - ref source, - ref rsvp, - message: - Message::Request(Request { - expects_response, - ipc: ref json_bytes, - .. - }), - .. - } = message - else { - panic!("eth_rpc: bad message"); - }; - - let target = if expects_response.is_some() { - Address { - node: our.clone(), - process: source.process.clone(), - } - } else { - let Some(rsvp) = rsvp else { - send_to_loop - .send(make_error_message( - our.clone(), - &message, - EthRpcError::NoRsvp, - )) - .await - .unwrap(); - continue; - }; - rsvp.clone() - }; - - // let call_data = content.payload.bytes.content.clone().unwrap_or(vec![]); - - let Ok(action) = serde_json::from_slice::(json_bytes) else { - send_to_loop - .send(make_error_message( - our.clone(), - &message, - EthRpcError::BadJson, - )) - .await - .unwrap(); - continue; - }; - - match action { - EthRpcAction::SubscribeEvents(sub) => { - send_to_loop - .send(KernelMessage { - id: message.id, - source: Address { - node: our.clone(), - process: ETH_RPC_PROCESS_ID.clone(), - }, - target: match &message.rsvp { - None => message.source.clone(), - Some(rsvp) => rsvp.clone(), - }, - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - ipc: serde_json::to_vec::>( - &Ok(message.id), - ) - .unwrap(), - metadata: None, - }, - None, - )), - payload: None, - signed_capabilities: None, - }) - .await - .unwrap(); - - let mut filter = Filter::new(); - if let Some(addresses) = sub.addresses { - filter = filter.address(ValueOrArray::Array( - addresses.into_iter().map(|s| s.parse().unwrap()).collect(), - )); - } - - // TODO is there a cleaner way to do all of this? - if let Some(from_block) = sub.from_block { - filter = filter.from_block(from_block); - } - if let Some(to_block) = sub.to_block { - filter = filter.to_block(to_block); - } - if let Some(events) = sub.events { - filter = filter.events(&events); - } - if let Some(topic1) = sub.topic1 { - filter = filter.topic1(topic1); - } - if let Some(topic2) = sub.topic2 { - filter = filter.topic2(topic2); - } - if let Some(topic3) = sub.topic3 { - filter = filter.topic3(topic3); - } - - let rpc_url = rpc_url.clone(); - - let handle = tokio::task::spawn(async move { - // when connection dies you need to restart at the last block you saw - // otherwise you replay events unnecessarily - let mut from_block: U64 = - filter.clone().get_from_block().unwrap_or(U64::zero()); - loop { - // NOTE give main.rs uses rpc_url and panics if it can't connect, we do - // know that this should work in theory...can keep trying to reconnect - let Ok(ws_rpc) = Provider::::connect(rpc_url.clone()).await else { - // TODO grab and print error - let _ = print_tx - .send(Printout { - verbosity: 0, - content: "eth_rpc: connection failed, retrying in 5s" - .to_string(), - }) - .await; - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - continue; - }; - - match ws_rpc - .subscribe_logs(&filter.clone().from_block(from_block)) - .await - { - Err(e) => { - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("eth_rpc: subscription error: {:?}", e), - }) - .await; - continue; - } - Ok(mut stream) => { - let _ = print_tx - .send(Printout { - verbosity: 0, - content: "eth_rpc: connection established".to_string(), - }) - .await; - - while let Some(event) = stream.next().await { - send_to_loop.send( - KernelMessage { - id: rand::random(), - source: Address { - node: our.clone(), - process: ETH_RPC_PROCESS_ID.clone(), - }, - target: target.clone(), - rsvp: None, - message: Message::Request(Request { - inherit: false, - expects_response: None, - ipc: json!({ - "EventSubscription": serde_json::to_value(event.clone()).unwrap() - }).to_string().into_bytes(), - metadata: None, - }), - payload: None, - signed_capabilities: None, - } - ).await.unwrap(); - from_block = event.block_number.unwrap_or(from_block); - } - let _ = print_tx - .send(Printout { - verbosity: 0, - content: - "eth_rpc: subscription connection lost, reconnecting" - .to_string(), - }) - .await; - } - }; - } - }); - subscriptions.insert(message.id, handle); - } - EthRpcAction::Unsubscribe(sub_id) => { - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("eth_rpc: unsubscribing from {}", sub_id), - }) - .await; - - if let Some(handle) = subscriptions.remove(&sub_id) { - handle.abort(); - } else { - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("eth_rpc: no task found with id {}", sub_id), - }) - .await; - } - } - } - } - - Ok(()) -} - -// -// helpers -// - -fn make_error_message(our_name: String, km: &KernelMessage, error: EthRpcError) -> KernelMessage { - KernelMessage { - id: km.id, - source: Address { - node: our_name.clone(), - process: ETH_RPC_PROCESS_ID.clone(), - }, - target: match &km.rsvp { - None => km.source.clone(), - Some(rsvp) => rsvp.clone(), - }, - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - ipc: serde_json::to_vec::>(&Err(error)).unwrap(), - metadata: None, - }, - None, - )), - payload: None, - signed_capabilities: None, - } -} diff --git a/src/main.rs b/src/main.rs index 93d092a6..9917b1e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,6 @@ use tokio::{fs, time::timeout}; use ring::{rand::SystemRandom, signature, signature::KeyPair}; mod eth; -mod eth_rpc; mod http; mod kernel; mod keygen; @@ -33,7 +32,6 @@ const WEBSOCKET_SENDER_CHANNEL_CAPACITY: usize = 32; const HTTP_CHANNEL_CAPACITY: usize = 32; const HTTP_CLIENT_CHANNEL_CAPACITY: usize = 32; const ETH_PROVIDER_CHANNEL_CAPACITY: usize = 32; -const ETH_RPC_CHANNEL_CAPACITY: usize = 32; const VFS_CHANNEL_CAPACITY: usize = 1_000; const CAP_CHANNEL_CAPACITY: usize = 1_000; const KV_CHANNEL_CAPACITY: usize = 1_000; @@ -173,8 +171,6 @@ async fn main() { mpsc::channel(HTTP_CHANNEL_CAPACITY); let (eth_provider_sender, eth_provider_receiver): (MessageSender, MessageReceiver) = mpsc::channel(ETH_PROVIDER_CHANNEL_CAPACITY); - let (eth_rpc_sender, eth_rpc_receiver): (MessageSender, MessageReceiver) = - mpsc::channel(ETH_RPC_CHANNEL_CAPACITY); // http client performs http requests on behalf of processes let (http_client_sender, http_client_receiver): (MessageSender, MessageReceiver) = mpsc::channel(HTTP_CLIENT_CHANNEL_CAPACITY); @@ -338,11 +334,6 @@ async fn main() { eth_provider_sender, true, ), - ( - ProcessId::new(Some("eth_rpc"), "sys", "uqbar"), - eth_rpc_sender, - true, - ), ( ProcessId::new(Some("vfs"), "sys", "uqbar"), vfs_message_sender, @@ -463,7 +454,7 @@ async fn main() { timer_service_receiver, print_sender.clone(), )); - #[cfg(feature = "eth")] + #[cfg(not(feature = "simulation-mode"))] tasks.spawn(eth::provider::provider( our.name.clone(), rpc_url.clone(), @@ -471,14 +462,6 @@ async fn main() { eth_provider_receiver, print_sender.clone(), )); - #[cfg(not(feature = "simulation-mode"))] - tasks.spawn(eth_rpc::eth_rpc( - our.name.clone(), - rpc_url.clone(), - kernel_message_sender.clone(), - eth_rpc_receiver, - print_sender.clone(), - )); tasks.spawn(vfs::vfs( our.name.clone(), kernel_message_sender.clone(), diff --git a/src/types.rs b/src/types.rs index 8c2ab7fd..feab7a86 100644 --- a/src/types.rs +++ b/src/types.rs @@ -6,7 +6,6 @@ use thiserror::Error; lazy_static::lazy_static! { pub static ref ETH_PROCESS_ID: ProcessId = ProcessId::new(Some("eth"), "sys", "uqbar"); - pub static ref ETH_RPC_PROCESS_ID: ProcessId = ProcessId::new(Some("eth_rpc"), "sys", "uqbar"); pub static ref HTTP_CLIENT_PROCESS_ID: ProcessId = ProcessId::new(Some("http_client"), "sys", "uqbar"); pub static ref HTTP_SERVER_PROCESS_ID: ProcessId = ProcessId::new(Some("http_server"), "sys", "uqbar"); pub static ref KERNEL_PROCESS_ID: ProcessId = ProcessId::new(Some("kernel"), "sys", "uqbar");