diff --git a/src/eth/provider.rs b/src/eth/provider.rs index 6198e730..1a5e7f91 100644 --- a/src/eth/provider.rs +++ b/src/eth/provider.rs @@ -7,15 +7,15 @@ use ethers::prelude::Provider; use ethers_providers::{Http, Middleware, StreamExt, Ws}; use futures::stream::SplitSink; use futures::SinkExt; -use std::sync::Arc; use serde_json::json; +use std::sync::Arc; use tokio::net::TcpStream; +use tokio::sync::Mutex; use tokio_tungstenite::connect_async; use tokio_tungstenite::tungstenite::Message as TungsteniteMessage; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; use url::Url; - struct Connections { ws_sender: Option>, TungsteniteMessage>>, ws_provider: Option>, @@ -63,13 +63,15 @@ pub async fn provider( let _ = send_to_loop.send(open_ws).await; - let mut connections = Connections { + let connections = Connections { ws_sender: None, ws_provider: None, http_provider: None, uq_provider: None, }; + let connections = Arc::new(Mutex::new(connections)); + let ws_request_ids: WsRequestIds = Arc::new(DashMap::new()); match Url::parse(&rpc_url).unwrap().scheme() { @@ -80,8 +82,10 @@ pub async fn provider( let (_ws_stream, _) = connect_async(&rpc_url).await.expect("failed to connect"); let (_ws_sender, mut ws_receiver) = _ws_stream.split(); - connections.ws_sender = Some(_ws_sender); - connections.ws_provider = Some(Provider::::connect(rpc_url.clone()).await?); + let mut connections_guard = connections.lock().await; + + connections_guard.ws_sender = Some(_ws_sender); + connections_guard.ws_provider = Some(Provider::::connect(rpc_url.clone()).await?); let send_to_loop = send_to_loop.clone(); let ws_request_ids = ws_request_ids.clone(); @@ -155,18 +159,17 @@ pub async fn provider( while let Some(km) = recv_in_client.recv().await { match km.message { - Message::Request(Request { ref ipc, .. }) => { + Message::Request(Request { ipc, .. }) => { println!("eth request"); - let _ = handle_request( + tokio::spawn(handle_request( our.clone(), ipc, km.source, km.payload, ws_request_ids.clone(), - &mut connections, + connections.clone(), send_to_loop.clone(), - ) - .await; + )); } Message::Response((Response { ref ipc, .. }, ..)) => { println!("eth response"); @@ -184,14 +187,13 @@ pub async fn provider( async fn handle_request( our: String, - ipc: &Vec, + ipc: Vec, source: Address, payload: Option, ws_request_ids: WsRequestIds, - connections: &mut Connections , + connections: Arc>, send_to_loop: MessageSender, ) -> Result<()> { - println!("request"); let target = Address { @@ -199,7 +201,7 @@ async fn handle_request( process: source.process.clone(), }; - if let Ok(action) = serde_json::from_slice::(ipc) { + if let Ok(action) = serde_json::from_slice::(&ipc) { match action { HttpServerRequest::WebSocketOpen { path, channel_id } => { println!("open {:?}, {:?}", path, channel_id); @@ -224,12 +226,11 @@ async fn handle_request( let _new_text = json.to_string(); - let _ = connections - .ws_sender - .as_mut() - .unwrap() - .send(TungsteniteMessage::Text(_new_text)) - .await; + let mut connections_guard = connections.lock().await; + + if let Some(ws_sender) = &mut connections_guard.ws_sender { + let _ = ws_sender.send(TungsteniteMessage::Text(_new_text)).await; + } } WsMessageType::Binary => { println!("binary"); @@ -247,25 +248,20 @@ async fn handle_request( HttpServerRequest::WebSocketClose(channel_id) => {} HttpServerRequest::Http(_) => todo!(), } - } else if let Ok(action) = serde_json::from_slice::(ipc) { + } else if let Ok(action) = serde_json::from_slice::(&ipc) { match action { EthRequest::SubscribeLogs(request) => { println!("subscribe logs {:?}", request); - let Ok(mut stream) = connections - .ws_provider.as_mut().unwrap() - .subscribe_logs(&request.filter.clone()) - .await - else { - todo!(); - }; - + let mut connections_guard = connections.lock().await; + let ws_provider = connections_guard.ws_provider.as_mut().unwrap(); + let mut stream = ws_provider.subscribe_logs(&request.filter.clone()).await?; while let Some(event) = stream.next().await { send_to_loop.send( KernelMessage { id: rand::random(), - source: Address { + source: Address { node: our.clone(), process: ETH_PROCESS_ID.clone(), }, @@ -284,7 +280,6 @@ async fn handle_request( } ).await.unwrap(); } - } } } else { diff --git a/src/eth/types.rs b/src/eth/types.rs index e48660fa..8553af0a 100644 --- a/src/eth/types.rs +++ b/src/eth/types.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] pub struct SubscribeLogs { - pub filter: Filter + pub filter: Filter, } #[derive(Debug, Serialize, Deserialize)]