attempt at making provider non blocking when receiving EthRequest in ipc

This commit is contained in:
commercium-sys 2023-12-27 21:46:27 -05:00
parent 1c5fe3fce1
commit b623f0f6db
2 changed files with 27 additions and 32 deletions

View File

@ -7,15 +7,15 @@ use ethers::prelude::Provider;
use ethers_providers::{Http, Middleware, StreamExt, Ws}; use ethers_providers::{Http, Middleware, StreamExt, Ws};
use futures::stream::SplitSink; use futures::stream::SplitSink;
use futures::SinkExt; use futures::SinkExt;
use std::sync::Arc;
use serde_json::json; use serde_json::json;
use std::sync::Arc;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio_tungstenite::connect_async; use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage; use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use url::Url; use url::Url;
struct Connections { struct Connections {
ws_sender: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>>, ws_sender: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>>,
ws_provider: Option<Provider<Ws>>, ws_provider: Option<Provider<Ws>>,
@ -63,13 +63,15 @@ pub async fn provider(
let _ = send_to_loop.send(open_ws).await; let _ = send_to_loop.send(open_ws).await;
let mut connections = Connections { let connections = Connections {
ws_sender: None, ws_sender: None,
ws_provider: None, ws_provider: None,
http_provider: None, http_provider: None,
uq_provider: None, uq_provider: None,
}; };
let connections = Arc::new(Mutex::new(connections));
let ws_request_ids: WsRequestIds = Arc::new(DashMap::new()); let ws_request_ids: WsRequestIds = Arc::new(DashMap::new());
match Url::parse(&rpc_url).unwrap().scheme() { match Url::parse(&rpc_url).unwrap().scheme() {
@ -80,8 +82,10 @@ pub async fn provider(
let (_ws_stream, _) = connect_async(&rpc_url).await.expect("failed to connect"); let (_ws_stream, _) = connect_async(&rpc_url).await.expect("failed to connect");
let (_ws_sender, mut ws_receiver) = _ws_stream.split(); let (_ws_sender, mut ws_receiver) = _ws_stream.split();
connections.ws_sender = Some(_ws_sender); let mut connections_guard = connections.lock().await;
connections.ws_provider = Some(Provider::<Ws>::connect(rpc_url.clone()).await?);
connections_guard.ws_sender = Some(_ws_sender);
connections_guard.ws_provider = Some(Provider::<Ws>::connect(rpc_url.clone()).await?);
let send_to_loop = send_to_loop.clone(); let send_to_loop = send_to_loop.clone();
let ws_request_ids = ws_request_ids.clone(); let ws_request_ids = ws_request_ids.clone();
@ -155,18 +159,17 @@ pub async fn provider(
while let Some(km) = recv_in_client.recv().await { while let Some(km) = recv_in_client.recv().await {
match km.message { match km.message {
Message::Request(Request { ref ipc, .. }) => { Message::Request(Request { ipc, .. }) => {
println!("eth request"); println!("eth request");
let _ = handle_request( tokio::spawn(handle_request(
our.clone(), our.clone(),
ipc, ipc,
km.source, km.source,
km.payload, km.payload,
ws_request_ids.clone(), ws_request_ids.clone(),
&mut connections, connections.clone(),
send_to_loop.clone(), send_to_loop.clone(),
) ));
.await;
} }
Message::Response((Response { ref ipc, .. }, ..)) => { Message::Response((Response { ref ipc, .. }, ..)) => {
println!("eth response"); println!("eth response");
@ -184,14 +187,13 @@ pub async fn provider(
async fn handle_request( async fn handle_request(
our: String, our: String,
ipc: &Vec<u8>, ipc: Vec<u8>,
source: Address, source: Address,
payload: Option<Payload>, payload: Option<Payload>,
ws_request_ids: WsRequestIds, ws_request_ids: WsRequestIds,
connections: &mut Connections , connections: Arc<Mutex<Connections>>,
send_to_loop: MessageSender, send_to_loop: MessageSender,
) -> Result<()> { ) -> Result<()> {
println!("request"); println!("request");
let target = Address { let target = Address {
@ -199,7 +201,7 @@ async fn handle_request(
process: source.process.clone(), process: source.process.clone(),
}; };
if let Ok(action) = serde_json::from_slice::<HttpServerRequest>(ipc) { if let Ok(action) = serde_json::from_slice::<HttpServerRequest>(&ipc) {
match action { match action {
HttpServerRequest::WebSocketOpen { path, channel_id } => { HttpServerRequest::WebSocketOpen { path, channel_id } => {
println!("open {:?}, {:?}", path, channel_id); println!("open {:?}, {:?}", path, channel_id);
@ -224,12 +226,11 @@ async fn handle_request(
let _new_text = json.to_string(); let _new_text = json.to_string();
let _ = connections let mut connections_guard = connections.lock().await;
.ws_sender
.as_mut() if let Some(ws_sender) = &mut connections_guard.ws_sender {
.unwrap() let _ = ws_sender.send(TungsteniteMessage::Text(_new_text)).await;
.send(TungsteniteMessage::Text(_new_text)) }
.await;
} }
WsMessageType::Binary => { WsMessageType::Binary => {
println!("binary"); println!("binary");
@ -247,25 +248,20 @@ async fn handle_request(
HttpServerRequest::WebSocketClose(channel_id) => {} HttpServerRequest::WebSocketClose(channel_id) => {}
HttpServerRequest::Http(_) => todo!(), HttpServerRequest::Http(_) => todo!(),
} }
} else if let Ok(action) = serde_json::from_slice::<EthRequest>(ipc) { } else if let Ok(action) = serde_json::from_slice::<EthRequest>(&ipc) {
match action { match action {
EthRequest::SubscribeLogs(request) => { EthRequest::SubscribeLogs(request) => {
println!("subscribe logs {:?}", request); println!("subscribe logs {:?}", request);
let Ok(mut stream) = connections let mut connections_guard = connections.lock().await;
.ws_provider.as_mut().unwrap() let ws_provider = connections_guard.ws_provider.as_mut().unwrap();
.subscribe_logs(&request.filter.clone()) let mut stream = ws_provider.subscribe_logs(&request.filter.clone()).await?;
.await
else {
todo!();
};
while let Some(event) = stream.next().await { while let Some(event) = stream.next().await {
send_to_loop.send( send_to_loop.send(
KernelMessage { KernelMessage {
id: rand::random(), id: rand::random(),
source: Address { source: Address {
node: our.clone(), node: our.clone(),
process: ETH_PROCESS_ID.clone(), process: ETH_PROCESS_ID.clone(),
}, },
@ -284,7 +280,6 @@ async fn handle_request(
} }
).await.unwrap(); ).await.unwrap();
} }
} }
} }
} else { } else {

View File

@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct SubscribeLogs { pub struct SubscribeLogs {
pub filter: Filter pub filter: Filter,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]