diff --git a/Cargo.lock b/Cargo.lock index 1a7b819f..b6c28a76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3029,6 +3029,7 @@ dependencies = [ "static_dir", "thiserror", "tokio", + "tokio-stream", "tokio-tungstenite 0.20.1", "url", "uuid 1.4.1", diff --git a/Cargo.toml b/Cargo.toml index 85d281b5..69047f08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ snow = { version = "0.9.3", features = ["ring-resolver"] } static_dir = "0.2.0" thiserror = "1.0" tokio = { version = "1.28", features = ["fs", "macros", "rt-multi-thread", "signal", "sync"] } +tokio-stream = "0.1.14" tokio-tungstenite = "0.20.1" url = "2.4.1" uuid = { version = "1.1.2", features = ["serde", "v4"] } diff --git a/modules/ndns_indexer/ndns_indexer/Cargo.lock b/modules/ndns_indexer/ndns_indexer/Cargo.lock index 5a58234d..41e73de6 100644 --- a/modules/ndns_indexer/ndns_indexer/Cargo.lock +++ b/modules/ndns_indexer/ndns_indexer/Cargo.lock @@ -1123,8 +1123,9 @@ dependencies = [ [[package]] name = "nectar_process_lib" version = "0.5.0" -source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?tag=v0.5.0-alpha#235f4aae4d8078e86d3114753ed79ccedb947ebe" +source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?rev=d19ff3d#d19ff3d15df2155e441939dd6432342f9ec1b340" dependencies = [ + "alloy-rpc-types", "anyhow", "bincode", "ethers-core", diff --git a/modules/ndns_indexer/ndns_indexer/Cargo.toml b/modules/ndns_indexer/ndns_indexer/Cargo.toml index 39de1b9f..99030cff 100644 --- a/modules/ndns_indexer/ndns_indexer/Cargo.toml +++ b/modules/ndns_indexer/ndns_indexer/Cargo.toml @@ -20,7 +20,7 @@ hex = "0.4.3" rmp-serde = "1.1.2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -nectar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", tag = "v0.5.0-alpha", features = ["eth"] } +nectar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "d19ff3d", features = ["eth"] } wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "efcc759" } [lib] diff --git a/modules/ndns_indexer/ndns_indexer/src/lib.rs b/modules/ndns_indexer/ndns_indexer/src/lib.rs index 5b380fd0..1b3ee316 100644 --- a/modules/ndns_indexer/ndns_indexer/src/lib.rs +++ b/modules/ndns_indexer/ndns_indexer/src/lib.rs @@ -1,6 +1,6 @@ use alloy_rpc_types::Log; use alloy_sol_types::{sol, SolEvent}; -use nectar_process_lib::eth::{EthAddress, SubscribeLogsRequest}; +use nectar_process_lib::eth::{EthAddress, EthSubEvent, SubscribeLogsRequest}; use nectar_process_lib::{ await_message, get_typed_state, http, print_to_terminal, println, set_state, Address, LazyLoadBlob, Message, Request, Response, @@ -31,11 +31,6 @@ struct State { block: u64, } -#[derive(Debug, Serialize, Deserialize)] -enum IndexerActions { - EventSubscription(Log), -} - #[derive(Clone, Debug, Serialize, Deserialize)] pub enum NetActions { NdnsUpdate(NdnsUpdate), @@ -146,7 +141,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { ))? .send()?; - SubscribeLogsRequest::new() + SubscribeLogsRequest::new(1) // subscription id 1 .address(EthAddress::from_str(contract_address.unwrap().as_str())?) .from_block(state.block - 1) .events(vec![ @@ -206,20 +201,20 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { continue; } - let Ok(msg) = serde_json::from_slice::(&body) else { + let Ok(msg) = serde_json::from_slice::(&body) else { println!("ndns_indexer: got invalid message"); continue; }; match msg { - IndexerActions::EventSubscription(e) => { - state.block = e.clone().block_number.expect("expect").to::(); + EthSubEvent::Log(log) => { + state.block = log.block_number.expect("expect").to::(); - let node_id: alloy_primitives::FixedBytes<32> = e.topics[1]; + let node_id: alloy_primitives::FixedBytes<32> = log.topics[1]; - let name = match state.names.entry(node_id.clone().to_string()) { + let name = match state.names.entry(node_id.to_string()) { Entry::Occupied(o) => o.into_mut(), - Entry::Vacant(v) => v.insert(get_name(&e)), + Entry::Vacant(v) => v.insert(get_name(&log)), }; let node = state @@ -229,15 +224,15 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { let mut send = true; - match e.topics[0].clone() { + match log.topics[0] { KeyUpdate::SIGNATURE_HASH => { - node.public_key = KeyUpdate::abi_decode_data(&e.data, true) + node.public_key = KeyUpdate::abi_decode_data(&log.data, true) .unwrap() .0 .to_string(); } IpUpdate::SIGNATURE_HASH => { - let ip = IpUpdate::abi_decode_data(&e.data, true).unwrap().0; + let ip = IpUpdate::abi_decode_data(&log.data, true).unwrap().0; node.ip = format!( "{}.{}.{}.{}", (ip >> 24) & 0xFF, @@ -247,10 +242,10 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { ); } WsUpdate::SIGNATURE_HASH => { - node.port = WsUpdate::abi_decode_data(&e.data, true).unwrap().0; + node.port = WsUpdate::abi_decode_data(&log.data, true).unwrap().0; } RoutingUpdate::SIGNATURE_HASH => { - node.routers = RoutingUpdate::abi_decode_data(&e.data, true) + node.routers = RoutingUpdate::abi_decode_data(&log.data, true) .unwrap() .0 .iter() diff --git a/src/eth/provider.rs b/src/eth/provider.rs index 77395c6a..bc815d73 100644 --- a/src/eth/provider.rs +++ b/src/eth/provider.rs @@ -1,20 +1,15 @@ use crate::eth::types::*; -use crate::http::server_types::{HttpServerAction, HttpServerRequest, WsMessageType}; use crate::types::*; use anyhow::Result; use ethers::prelude::Provider; +use ethers::types::Filter; use ethers_providers::{Middleware, StreamExt, Ws}; -use futures::stream::SplitStream; -use futures::SinkExt; -use serde_json::json; +use std::collections::HashMap; use std::sync::Arc; -use tokio::net::TcpStream; -use tokio::sync::Mutex; -use tokio_tungstenite::connect_async; -use tokio_tungstenite::tungstenite::Message as TungsteniteMessage; -use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; use url::Url; +const WS_RECONNECTS: usize = 10_000; // TODO workshop this + /// The ETH provider runtime process is responsible for connecting to one or more ETH RPC providers /// and using them to service indexing requests from other apps. This could also be done by a wasm /// app, but in the future, this process will hopefully expand in scope to perform more complex @@ -26,18 +21,36 @@ pub async fn provider( mut recv_in_client: MessageReceiver, print_tx: PrintSender, ) -> Result<()> { + let our = Arc::new(our); // for now, we can only handle WebSocket RPC URLs. In the future, we should // be able to handle HTTP too, at least. match Url::parse(&rpc_url)?.scheme() { "http" | "https" => { - return Err(anyhow::anyhow!("eth: http provider not supported yet!")); + return Err(anyhow::anyhow!( + "eth: fatal: http provider not supported yet!" + )); } "ws" | "wss" => {} _ => { - return Err(anyhow::anyhow!("eth: provider must use http or ws!")); + return Err(anyhow::anyhow!("eth: fatal: provider must use http or ws!")); } } + let provider = match Provider::::connect_with_reconnects(&rpc_url, WS_RECONNECTS).await { + Ok(provider) => provider, + Err(e) => { + return Err(anyhow::anyhow!( + "eth: fatal: given RPC URL could not connect! {e:?}" + )); + } + }; + println!("eth: provider made\r"); + + let mut connections = RpcConnections { + provider, + ws_provider_subscriptions: HashMap::new(), + }; + while let Some(km) = recv_in_client.recv().await { // this module only handles requests, ignores all responses let Message::Request(req) = &km.message else { @@ -46,7 +59,16 @@ pub async fn provider( let Ok(action) = serde_json::from_slice::(&req.body) else { continue; }; - match handle_request(&our, action, &send_to_loop).await { + println!("eth: action received\r"); + match handle_request( + our.clone(), + &km.rsvp.unwrap_or(km.source.clone()), + action, + &mut connections, + &send_to_loop, + ) + .await + { Ok(()) => {} Err(e) => { let _ = print_tx @@ -55,6 +77,32 @@ pub async fn provider( content: format!("eth: error handling request: {:?}", e), }) .await; + if req.expects_response.is_some() { + send_to_loop + .send(KernelMessage { + id: km.id, + source: Address { + node: our.to_string(), + process: ETH_PROCESS_ID.clone(), + }, + target: Address { + node: our.to_string(), + process: km.source.process.clone(), + }, + rsvp: None, + message: Message::Response(( + Response { + inherit: false, + body: serde_json::to_vec::>(&Err(e))?, + metadata: None, + capabilities: vec![], + }, + None, + )), + lazy_load_blob: None, + }) + .await?; + } } } } @@ -62,211 +110,79 @@ pub async fn provider( } async fn handle_request( - our: &str, + our: Arc, + target: &Address, action: EthAction, + connections: &mut RpcConnections, send_to_loop: &MessageSender, -) -> Result<(), anyhow::Error> { +) -> Result<(), EthError> { match action { - EthAction::SubscribeLogs(req) => { - todo!() + EthAction::SubscribeLogs { sub_id, filter } => { + if connections.ws_provider_subscriptions.contains_key(&sub_id) { + return Err(EthError::SubscriptionIdCollision); + } + + let handle = tokio::spawn(handle_subscription_stream( + our.clone(), + connections.provider.clone(), + filter, + target.clone(), + send_to_loop.clone(), + )); + connections.ws_provider_subscriptions.insert(sub_id, handle); + Ok(()) } - EthAction::UnsubscribeLogs(channel_id) => { - todo!() + EthAction::UnsubscribeLogs(sub_id) => { + let handle = connections + .ws_provider_subscriptions + .remove(&sub_id) + .ok_or(EthError::SubscriptionNotFound)?; + + handle.abort(); + Ok(()) } } - Ok(()) } -async fn spawn_provider_read_stream( - our: String, - req: SubscribeLogs, - km: KernelMessage, - connections: Arc>, +/// Executed as a long-lived task. The JoinHandle is stored in the `connections` map. +/// This task is responsible for connecting to the ETH RPC provider and streaming logs +/// for a specific subscription made by a process. +async fn handle_subscription_stream( + our: Arc, + provider: Provider, + filter: Filter, + target: Address, send_to_loop: MessageSender, -) { - loop { - let mut connections_guard = connections.lock().await; - - let Some(ref ws_rpc_url) = connections_guard.ws_rpc_url else { - todo!() - }; - let ws_provider = match Provider::::connect(&ws_rpc_url).await { - Ok(provider) => provider, - Err(e) => { - println!("error connecting to ws provider: {:?}", e); - return; - } - }; - - let mut stream = match ws_provider.subscribe_logs(&req.filter.clone()).await { - Ok(s) => s, - Err(e) => { - println!("error subscribing to logs: {:?}", e); - return; - } - }; - - let ws_provider_subscription = connections_guard - .ws_provider_subscriptions - .entry(km.id) - .or_insert(WsProviderSubscription::default()); - - ws_provider_subscription.provider = Some(ws_provider.clone()); - ws_provider_subscription.subscription = Some(stream.id); - - drop(connections_guard); - - while let Some(event) = stream.next().await { - send_to_loop - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our.clone(), - process: ETH_PROCESS_ID.clone(), - }, - target: Address { - node: our.clone(), - process: km.source.process.clone(), - }, - rsvp: None, - message: Message::Request(Request { - inherit: false, - expects_response: None, - body: json!({ - "EventSubscription": serde_json::to_value(event.clone()).unwrap() - }) - .to_string() - .into_bytes(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: None, - }) - .await - .unwrap(); - } - } -} - -async fn bind_websockets(our: &str, send_to_loop: &MessageSender) { - let _ = send_to_loop - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our.to_string(), - process: ETH_PROCESS_ID.clone(), - }, - target: Address { - node: our.to_string(), - process: HTTP_SERVER_PROCESS_ID.clone(), - }, - rsvp: None, - message: Message::Request(Request { - inherit: false, - body: serde_json::to_vec(&HttpServerAction::WebSocketBind { - path: "/".to_string(), - authenticated: false, - encrypted: false, - }) - .unwrap(), - metadata: None, - expects_response: None, - capabilities: vec![], - }), - lazy_load_blob: None, - }) - .await; -} - -async fn bootstrap_websocket_connections( - our: &str, - rpc_url: &str, - connections: Arc>, - send_to_loop: &mut MessageSender, -) -> Result<()> { - let our = our.to_string(); - let rpc_url = rpc_url.to_string(); - let send_to_loop = send_to_loop.clone(); - let connections = connections.clone(); - tokio::spawn(async move { - loop { - let Ok((ws_stream, _response)) = connect_async(&rpc_url).await else { - println!( - "error! couldn't connect to eth_rpc provider: {:?}, trying again in 3s\r", - rpc_url - ); - tokio::time::sleep(std::time::Duration::from_secs(3)).await; - continue; - }; - let (ws_sender, mut ws_receiver) = ws_stream.split(); - - let mut connections_guard = connections.lock().await; - connections_guard.ws_sender = Some(ws_sender); - connections_guard.ws_provider = Some(Provider::::connect(&rpc_url).await.unwrap()); - drop(connections_guard); - - handle_external_websocket_passthrough( - &our, - connections.clone(), - &mut ws_receiver, - &send_to_loop, - ) - .await; - } - }); - Ok(()) -} - -async fn handle_external_websocket_passthrough( - our: &str, - connections: Arc>, - ws_receiver: &mut SplitStream>>, - send_to_loop: &MessageSender, -) { - while let Some(message) = ws_receiver.next().await { - match message { - Ok(msg) => { - if let Ok(text) = msg.into_text() { - let Ok(mut json) = serde_json::from_str::(&text) else { - continue; - }; - let id = json["id"].as_u64().unwrap() as u32; - let channel_id: u32 = *connections.lock().await.ws_sender_ids.get(&id).unwrap(); - - json["id"] = serde_json::Value::from(id - channel_id); - - let _ = send_to_loop - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our.to_string(), - process: ETH_PROCESS_ID.clone(), - }, - target: Address { - node: our.to_string(), - process: HTTP_SERVER_PROCESS_ID.clone(), - }, - rsvp: None, - message: Message::Request(Request { - inherit: false, - body: serde_json::to_vec(&HttpServerAction::WebSocketPush { - channel_id, - message_type: WsMessageType::Text, - }) - .unwrap(), - metadata: None, - expects_response: None, - capabilities: vec![], - }), - lazy_load_blob: Some(LazyLoadBlob { - bytes: json.to_string().as_bytes().to_vec(), - mime: None, - }), - }) - .await; - } - } - Err(_) => break, +) -> Result<(), EthError> { + println!("eth: handling subscription stream\r"); + let mut stream = match provider.subscribe_logs(&filter).await { + Ok(s) => s, + Err(e) => { + return Err(EthError::ProviderError(e.to_string())); } + }; + + while let Some(event) = stream.next().await { + send_to_loop + .send(KernelMessage { + id: rand::random(), + source: Address { + node: our.to_string(), + process: ETH_PROCESS_ID.clone(), + }, + target: target.clone(), + rsvp: None, + message: Message::Request(Request { + inherit: false, + expects_response: None, + body: serde_json::to_vec(&EthSubEvent::Log(event)).unwrap(), + metadata: None, + capabilities: vec![], + }), + lazy_load_blob: None, + }) + .await + .unwrap(); } + Err(EthError::SubscriptionClosed) } diff --git a/src/eth/types.rs b/src/eth/types.rs index 389d80cd..6c60c890 100644 --- a/src/eth/types.rs +++ b/src/eth/types.rs @@ -1,6 +1,6 @@ use ethers::prelude::Provider; -use ethers::types::{Filter, Log, U256}; -use ethers_providers::{Middleware, Ws}; +use ethers::types::{Filter, Log}; +use ethers_providers::Ws; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use tokio::task::JoinHandle; @@ -17,6 +17,21 @@ pub enum EthAction { UnsubscribeLogs(u64), } +/// The Response type which a process will get from requesting with an [`EthAction`] will be +/// of the form `Result<(), EthError>`, serialized and deserialized using `serde_json::to_vec` +/// and `serde_json::from_slice`. +#[derive(Debug, Serialize, Deserialize)] +pub enum EthError { + /// The subscription ID already existed + SubscriptionIdCollision, + /// The ethers provider threw an error when trying to subscribe + /// (contains ProviderError serialized to debug string) + ProviderError(String), + SubscriptionClosed, + /// The subscription ID was not found, so we couldn't unsubscribe. + SubscriptionNotFound, +} + /// The Request type which a process will get from using SubscribeLogs to subscribe /// to a log. /// @@ -32,19 +47,6 @@ pub enum EthSubEvent { /// Primary state object of the `eth` module pub struct RpcConnections { - pub ws_rpc_url: String, - pub ws_provider_subscriptions: HashMap, -} - -pub struct WsProviderSubscription { - pub handle: JoinHandle<()>, pub provider: Provider, - pub subscription: U256, -} - -impl WsProviderSubscription { - pub async fn kill(&self) { - let _ = self.provider.unsubscribe(self.subscription).await; - self.handle.abort(); - } + pub ws_provider_subscriptions: HashMap>>, }