From ee0b675d4dd82b9750178b2996e66f48a20efe79 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 26 Dec 2023 21:08:27 +0000 Subject: [PATCH] Format Rust code using rustfmt --- src/eth/provider.rs | 224 +++++++++++++++++++++++--------------------- 1 file changed, 117 insertions(+), 107 deletions(-) diff --git a/src/eth/provider.rs b/src/eth/provider.rs index 5fdb9090..9696e38f 100644 --- a/src/eth/provider.rs +++ b/src/eth/provider.rs @@ -1,34 +1,34 @@ use crate::eth::types::*; -use crate::http::types::{ HttpServerAction, HttpServerRequest, WsMessageType }; +use crate::http::types::{HttpServerAction, HttpServerRequest, WsMessageType}; use crate::types::*; use anyhow::Result; -use ethers::prelude::Provider; -use ethers_providers::{StreamExt, Ws, Http}; -use futures::SinkExt; -use tokio_tungstenite::connect_async; -use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; -use tokio_tungstenite::tungstenite::Message as TungsteniteMessage; -use futures::stream::SplitSink; -use tokio::net::TcpStream; use dashmap::DashMap; -use url::Url; +use ethers::prelude::Provider; +use ethers_providers::{Http, StreamExt, Ws}; +use futures::stream::SplitSink; +use futures::SinkExt; use std::sync::Arc; +use tokio::net::TcpStream; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message as TungsteniteMessage; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; +use url::Url; struct Connections { - ws_sender: Option>,TungsteniteMessage>>, + ws_sender: Option>, TungsteniteMessage>>, ws_provider: Option>, http_provider: Option>, - uq_provider: Option + uq_provider: Option, } // I need a data structure that tracks incoming requests from a particular websocket channel -// and associates the response from the response to the outgoing websocket message with that +// and associates the response from the response to the outgoing websocket message with that // channel. It should then return the response to that channel. -// this should just map responses from the outgoing websocket request +// this should just map responses from the outgoing websocket request // to the requests that made them -// Channel IDs to Nonces used to make unique IDs +// Channel IDs to Nonces used to make unique IDs type WsRequestNonces = Arc>; // Request IDs to Channel IDs type WsRequestIds = Arc>; @@ -74,16 +74,17 @@ pub async fn provider( ws_sender: None, ws_provider: None, http_provider: None, - uq_provider: None + uq_provider: None, }; let ws_request_nonces: WsRequestNonces = Arc::new(DashMap::new()); let ws_request_ids: WsRequestIds = Arc::new(DashMap::new()); match Url::parse(&rpc_url).unwrap().scheme() { - "http" | "https" => { unreachable!() } + "http" | "https" => { + unreachable!() + } "ws" | "wss" => { - let (_ws_stream, _) = connect_async(&rpc_url).await.expect("failed to connect"); let (_ws_sender, mut ws_receiver) = _ws_stream.split(); @@ -97,46 +98,54 @@ pub async fn provider( match message { Ok(msg) => { if msg.is_text() { - - let Ok(text) = msg.into_text() else { todo!(); }; - let json_result: Result = serde_json::from_str(&text); - let Ok(mut _json) = json_result else { todo!(); }; + let Ok(text) = msg.into_text() else { + todo!(); + }; + let json_result: Result = + serde_json::from_str(&text); + let Ok(mut _json) = json_result else { + todo!(); + }; let id = _json["id"].as_u64().unwrap() as u32; let channel_id = ws_request_ids.get(&id).unwrap().clone(); _json["id"] = serde_json::Value::from(id - channel_id); - let _ = send_to_loop.send(KernelMessage { - id: rand::random(), - source: Address { - node: our.clone(), - process: ETH_PROCESS_ID.clone(), - }, - target: Address { - node: our.clone(), - process: HTTP_SERVER_PROCESS_ID.clone(), - }, - rsvp: None, - message: Message::Request(Request { - inherit: false, - ipc: serde_json::to_vec(&HttpServerAction::WebSocketPush { - channel_id: channel_id, - message_type: WsMessageType::Text, - }).unwrap(), - metadata: None, - expects_response: None - }), - payload: Some(Payload { - bytes: _json.to_string().as_bytes().to_vec(), - mime: None - }), - signed_capabilities: None - }).await; - + let _ = send_to_loop + .send(KernelMessage { + id: rand::random(), + source: Address { + node: our.clone(), + process: ETH_PROCESS_ID.clone(), + }, + target: Address { + node: our.clone(), + process: HTTP_SERVER_PROCESS_ID.clone(), + }, + rsvp: None, + message: Message::Request(Request { + inherit: false, + ipc: serde_json::to_vec( + &HttpServerAction::WebSocketPush { + channel_id: channel_id, + message_type: WsMessageType::Text, + }, + ) + .unwrap(), + metadata: None, + expects_response: None, + }), + payload: Some(Payload { + bytes: _json.to_string().as_bytes().to_vec(), + mime: None, + }), + signed_capabilities: None, + }) + .await; } else { println!("Received a binary message: {:?}", msg.into_data()); } - }, + } Err(e) => { println!("Error receiving a message: {:?}", e); } @@ -144,9 +153,10 @@ pub async fn provider( } Ok::<(), ()>(()) }); - } - _ => { unreachable!() } + _ => { + unreachable!() + } } while let Some(km) = recv_in_client.recv().await { @@ -154,13 +164,14 @@ pub async fn provider( Message::Request(Request { ref ipc, .. }) => { println!("eth request"); let _ = handle_request( - ipc, - km.source, - km.payload, + ipc, + km.source, + km.payload, ws_request_nonces.clone(), ws_request_ids.clone(), - &mut connections - ).await; + &mut connections, + ) + .await; } Message::Response((Response { ref ipc, .. }, ..)) => { println!("eth response"); @@ -177,85 +188,84 @@ pub async fn provider( } async fn handle_request( - ipc: &Vec, - source: Address, - payload: Option, + ipc: &Vec, + source: Address, + payload: Option, ws_request_nonces: WsRequestNonces, ws_request_ids: WsRequestIds, - connections: &mut Connections + connections: &mut Connections, ) -> Result<()> { - println!("request"); if let Ok(action) = serde_json::from_slice::(ipc) { match action { - HttpServerRequest::WebSocketOpen{path, channel_id} => { + HttpServerRequest::WebSocketOpen { path, channel_id } => { println!("open {:?}, {:?}", path, channel_id); } - HttpServerRequest::WebSocketPush{channel_id, message_type} => { - match message_type { - WsMessageType::Text => { - println!("text"); + HttpServerRequest::WebSocketPush { + channel_id, + message_type, + } => match message_type { + WsMessageType::Text => { + println!("text"); - let bytes = payload.unwrap().bytes; - let text = std::str::from_utf8(&bytes).unwrap(); - let mut json: serde_json::Value = serde_json::from_str(text)?; - let mut id = json["id"].as_u64().unwrap(); + let bytes = payload.unwrap().bytes; + let text = std::str::from_utf8(&bytes).unwrap(); + let mut json: serde_json::Value = serde_json::from_str(text)?; + let mut id = json["id"].as_u64().unwrap(); - let mut nonce = ws_request_nonces.entry(channel_id).or_insert(0); - - id += channel_id as u64; - id += *nonce as u64; - *nonce += 1; + let mut nonce = ws_request_nonces.entry(channel_id).or_insert(0); - ws_request_ids.insert(id as u32, channel_id); + id += channel_id as u64; + id += *nonce as u64; + *nonce += 1; - json["id"] = serde_json::Value::from(id); + ws_request_ids.insert(id as u32, channel_id); - let _new_text = json.to_string(); + json["id"] = serde_json::Value::from(id); - let _ = connections.ws_sender.as_mut().unwrap() - .send(TungsteniteMessage::Text(_new_text)) - .await; + let _new_text = json.to_string(); - }, - WsMessageType::Binary => { - println!("binary"); - }, - WsMessageType::Ping => { - println!("ping"); - }, - WsMessageType::Pong => { - println!("pong"); - }, - WsMessageType::Close => { - println!("close"); - }, + let _ = connections + .ws_sender + .as_mut() + .unwrap() + .send(TungsteniteMessage::Text(_new_text)) + .await; } - } - HttpServerRequest::WebSocketClose(channel_id) => { } - HttpServerRequest::Http(_) => todo!() + WsMessageType::Binary => { + println!("binary"); + } + WsMessageType::Ping => { + println!("ping"); + } + WsMessageType::Pong => { + println!("pong"); + } + WsMessageType::Close => { + println!("close"); + } + }, + HttpServerRequest::WebSocketClose(channel_id) => {} + HttpServerRequest::Http(_) => todo!(), } } else if let Ok(action) = serde_json::from_slice::(ipc) { match action { EthRpcAction::JsonRpcRequest(_) => unreachable!(), - EthRpcAction::Eth(method) => { } - EthRpcAction::Debug(method) => { } - EthRpcAction::Net(method) => { } - EthRpcAction::Trace(method) => { } - EthRpcAction::TxPool(method) => { } + EthRpcAction::Eth(method) => {} + EthRpcAction::Debug(method) => {} + EthRpcAction::Net(method) => {} + EthRpcAction::Trace(method) => {} + EthRpcAction::TxPool(method) => {} } } else { println!("unknown request"); } Ok(()) - } -fn handle_http () { - -} +fn handle_http() {} fn handle_response(ipc: &Vec) -> Result<()> { let Ok(message) = serde_json::from_slice::(ipc) else {