From 0f758d0ddacc51aa6aa1a82ae4fa9417dddf6340 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Thu, 11 Jan 2024 14:22:54 -0300 Subject: [PATCH] hotfix: keep eth rpc connection alive --- src/eth/provider.rs | 183 ++++++++++++++++++++++++-------------------- 1 file changed, 98 insertions(+), 85 deletions(-) diff --git a/src/eth/provider.rs b/src/eth/provider.rs index 1a92b19b..0f7f8288 100644 --- a/src/eth/provider.rs +++ b/src/eth/provider.rs @@ -24,13 +24,15 @@ pub async fn provider( ) -> Result<()> { bind_websockets(&our, &send_to_loop).await; let mut connections = RpcConnections::default(); + connections.ws_rpc_url = Some(rpc_url.to_string()); + let connections = Arc::new(Mutex::new(connections)); match Url::parse(&rpc_url).unwrap().scheme() { "http" | "https" => { return Err(anyhow::anyhow!("eth: http provider not supported yet!")); } "ws" | "wss" => { - bootstrap_websocket_connections(&our, &rpc_url, &mut connections, &mut send_to_loop) + bootstrap_websocket_connections(&our, &rpc_url, connections.clone(), &mut send_to_loop) .await .map_err(|e| { anyhow::anyhow!( @@ -45,8 +47,6 @@ pub async fn provider( } } - let connections = Arc::new(Mutex::new(connections)); - while let Some(km) = recv_in_client.recv().await { if let Message::Request(req) = &km.message { match handle_request(&our, &km, req, &connections, &send_to_loop).await { @@ -161,65 +161,67 @@ async fn spawn_provider_read_stream( connections: Arc>, send_to_loop: MessageSender, ) { - let mut connections_guard = connections.lock().await; + loop { + let mut connections_guard = connections.lock().await; - let Some(ref ws_rpc_url) = connections_guard.ws_rpc_url else { - todo!() - }; - let ws_provider = match Provider::::connect(&ws_rpc_url).await { - Ok(provider) => provider, - Err(e) => { - println!("error connecting to ws provider: {:?}", e); - return; + let Some(ref ws_rpc_url) = connections_guard.ws_rpc_url else { + todo!() + }; + let ws_provider = match Provider::::connect(&ws_rpc_url).await { + Ok(provider) => provider, + Err(e) => { + println!("error connecting to ws provider: {:?}", e); + return; + } + }; + + let mut stream = match ws_provider.subscribe_logs(&req.filter.clone()).await { + Ok(s) => s, + Err(e) => { + println!("error subscribing to logs: {:?}", e); + return; + } + }; + + let ws_provider_subscription = connections_guard + .ws_provider_subscriptions + .entry(km.id) + .or_insert(WsProviderSubscription::default()); + + ws_provider_subscription.provider = Some(ws_provider.clone()); + ws_provider_subscription.subscription = Some(stream.id); + + drop(connections_guard); + + while let Some(event) = stream.next().await { + send_to_loop + .send(KernelMessage { + id: rand::random(), + source: Address { + node: our.clone(), + process: ETH_PROCESS_ID.clone(), + }, + target: Address { + node: our.clone(), + process: km.source.process.clone(), + }, + rsvp: None, + message: Message::Request(Request { + inherit: false, + expects_response: None, + body: json!({ + "EventSubscription": serde_json::to_value(event.clone()).unwrap() + }) + .to_string() + .into_bytes(), + metadata: None, + capabilities: vec![], + }), + lazy_load_blob: None, + }) + .await + .unwrap(); } - }; - - let mut stream = match ws_provider.subscribe_logs(&req.filter.clone()).await { - Ok(s) => s, - Err(e) => { - println!("error subscribing to logs: {:?}", e); - return; - } - }; - - let ws_provider_subscription = connections_guard - .ws_provider_subscriptions - .entry(km.id) - .or_insert(WsProviderSubscription::default()); - - ws_provider_subscription.provider = Some(ws_provider.clone()); - ws_provider_subscription.subscription = Some(stream.id); - - drop(connections_guard); - - while let Some(event) = stream.next().await { - send_to_loop - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our.clone(), - process: ETH_PROCESS_ID.clone(), - }, - target: Address { - node: our.clone(), - process: km.source.process.clone(), - }, - rsvp: None, - message: Message::Request(Request { - inherit: false, - expects_response: None, - body: json!({ - "EventSubscription": serde_json::to_value(event.clone()).unwrap() - }) - .to_string() - .into_bytes(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: None, - }) - .await - .unwrap(); } } @@ -256,36 +258,47 @@ async fn bind_websockets(our: &str, send_to_loop: &MessageSender) { async fn bootstrap_websocket_connections( our: &str, rpc_url: &str, - connections: &mut RpcConnections, + connections: Arc>, send_to_loop: &mut MessageSender, ) -> Result<()> { - let (ws_stream, _response) = connect_async(rpc_url).await.map_err(|e| { - anyhow::anyhow!( - "eth: error connecting to websocket provider at {}: {:?}", - rpc_url, - e - ) - })?; - let (ws_sender, ws_receiver) = ws_stream.split(); + let our = our.to_string(); + let rpc_url = rpc_url.to_string(); + let send_to_loop = send_to_loop.clone(); + let connections = connections.clone(); + tokio::spawn(async move { + loop { + let Ok((ws_stream, _response)) = connect_async(&rpc_url).await else { + println!( + "error! couldn't connect to eth_rpc provider: {:?}, trying again in 3s\r", + rpc_url + ); + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + continue; + }; + let (ws_sender, mut ws_receiver) = ws_stream.split(); - connections.ws_sender = Some(ws_sender); - connections.ws_rpc_url = Some(rpc_url.to_string()); - connections.ws_provider = Some(Provider::::connect(rpc_url).await?); + let mut connections_guard = connections.lock().await; + connections_guard.ws_sender = Some(ws_sender); + connections_guard.ws_provider = Some(Provider::::connect(&rpc_url).await.unwrap()); + drop(connections_guard); - tokio::spawn(handle_external_websocket_passthrough( - our.to_string(), - connections.ws_sender_ids.clone(), - ws_receiver, - send_to_loop.clone(), - )); + handle_external_websocket_passthrough( + &our, + connections.clone(), + &mut ws_receiver, + &send_to_loop, + ) + .await; + } + }); Ok(()) } async fn handle_external_websocket_passthrough( - our: String, - ws_request_ids: WsRequestIds, - mut ws_receiver: SplitStream>>, - send_to_loop: MessageSender, + our: &str, + connections: Arc>, + ws_receiver: &mut SplitStream>>, + send_to_loop: &MessageSender, ) { while let Some(message) = ws_receiver.next().await { match message { @@ -295,7 +308,7 @@ async fn handle_external_websocket_passthrough( continue; }; let id = json["id"].as_u64().unwrap() as u32; - let channel_id: u32 = *ws_request_ids.get(&id).unwrap(); + let channel_id: u32 = *connections.lock().await.ws_sender_ids.get(&id).unwrap(); json["id"] = serde_json::Value::from(id - channel_id); @@ -303,11 +316,11 @@ async fn handle_external_websocket_passthrough( .send(KernelMessage { id: rand::random(), source: Address { - node: our.clone(), + node: our.to_string(), process: ETH_PROCESS_ID.clone(), }, target: Address { - node: our.clone(), + node: our.to_string(), process: HTTP_SERVER_PROCESS_ID.clone(), }, rsvp: None,