Format Rust code using rustfmt

This commit is contained in:
github-actions[bot] 2023-12-26 21:08:27 +00:00 committed by GitHub
parent 90eab26187
commit ee0b675d4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2,23 +2,23 @@ use crate::eth::types::*;
use crate::http::types::{HttpServerAction, HttpServerRequest, WsMessageType}; use crate::http::types::{HttpServerAction, HttpServerRequest, WsMessageType};
use crate::types::*; use crate::types::*;
use anyhow::Result; use anyhow::Result;
use ethers::prelude::Provider;
use ethers_providers::{StreamExt, Ws, Http};
use futures::SinkExt;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
use futures::stream::SplitSink;
use tokio::net::TcpStream;
use dashmap::DashMap; use dashmap::DashMap;
use url::Url; use ethers::prelude::Provider;
use ethers_providers::{Http, StreamExt, Ws};
use futures::stream::SplitSink;
use futures::SinkExt;
use std::sync::Arc; use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use url::Url;
struct Connections { struct Connections {
ws_sender: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>>, ws_sender: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>>,
ws_provider: Option<Provider<Ws>>, ws_provider: Option<Provider<Ws>>,
http_provider: Option<Provider<Http>>, http_provider: Option<Provider<Http>>,
uq_provider: Option<NodeId> uq_provider: Option<NodeId>,
} }
// I need a data structure that tracks incoming requests from a particular websocket channel // I need a data structure that tracks incoming requests from a particular websocket channel
@ -74,16 +74,17 @@ pub async fn provider(
ws_sender: None, ws_sender: None,
ws_provider: None, ws_provider: None,
http_provider: None, http_provider: None,
uq_provider: None uq_provider: None,
}; };
let ws_request_nonces: WsRequestNonces = Arc::new(DashMap::new()); let ws_request_nonces: WsRequestNonces = Arc::new(DashMap::new());
let ws_request_ids: WsRequestIds = Arc::new(DashMap::new()); let ws_request_ids: WsRequestIds = Arc::new(DashMap::new());
match Url::parse(&rpc_url).unwrap().scheme() { match Url::parse(&rpc_url).unwrap().scheme() {
"http" | "https" => { unreachable!() } "http" | "https" => {
unreachable!()
}
"ws" | "wss" => { "ws" | "wss" => {
let (_ws_stream, _) = connect_async(&rpc_url).await.expect("failed to connect"); let (_ws_stream, _) = connect_async(&rpc_url).await.expect("failed to connect");
let (_ws_sender, mut ws_receiver) = _ws_stream.split(); let (_ws_sender, mut ws_receiver) = _ws_stream.split();
@ -97,16 +98,21 @@ pub async fn provider(
match message { match message {
Ok(msg) => { Ok(msg) => {
if msg.is_text() { if msg.is_text() {
let Ok(text) = msg.into_text() else {
let Ok(text) = msg.into_text() else { todo!(); }; todo!();
let json_result: Result<serde_json::Value, serde_json::Error> = serde_json::from_str(&text); };
let Ok(mut _json) = json_result else { todo!(); }; let json_result: Result<serde_json::Value, serde_json::Error> =
serde_json::from_str(&text);
let Ok(mut _json) = json_result else {
todo!();
};
let id = _json["id"].as_u64().unwrap() as u32; let id = _json["id"].as_u64().unwrap() as u32;
let channel_id = ws_request_ids.get(&id).unwrap().clone(); let channel_id = ws_request_ids.get(&id).unwrap().clone();
_json["id"] = serde_json::Value::from(id - channel_id); _json["id"] = serde_json::Value::from(id - channel_id);
let _ = send_to_loop.send(KernelMessage { let _ = send_to_loop
.send(KernelMessage {
id: rand::random(), id: rand::random(),
source: Address { source: Address {
node: our.clone(), node: our.clone(),
@ -119,24 +125,27 @@ pub async fn provider(
rsvp: None, rsvp: None,
message: Message::Request(Request { message: Message::Request(Request {
inherit: false, inherit: false,
ipc: serde_json::to_vec(&HttpServerAction::WebSocketPush { ipc: serde_json::to_vec(
&HttpServerAction::WebSocketPush {
channel_id: channel_id, channel_id: channel_id,
message_type: WsMessageType::Text, message_type: WsMessageType::Text,
}).unwrap(), },
)
.unwrap(),
metadata: None, metadata: None,
expects_response: None expects_response: None,
}), }),
payload: Some(Payload { payload: Some(Payload {
bytes: _json.to_string().as_bytes().to_vec(), bytes: _json.to_string().as_bytes().to_vec(),
mime: None mime: None,
}), }),
signed_capabilities: None signed_capabilities: None,
}).await; })
.await;
} else { } else {
println!("Received a binary message: {:?}", msg.into_data()); println!("Received a binary message: {:?}", msg.into_data());
} }
}, }
Err(e) => { Err(e) => {
println!("Error receiving a message: {:?}", e); println!("Error receiving a message: {:?}", e);
} }
@ -144,9 +153,10 @@ pub async fn provider(
} }
Ok::<(), ()>(()) Ok::<(), ()>(())
}); });
} }
_ => { unreachable!() } _ => {
unreachable!()
}
} }
while let Some(km) = recv_in_client.recv().await { while let Some(km) = recv_in_client.recv().await {
@ -159,8 +169,9 @@ pub async fn provider(
km.payload, km.payload,
ws_request_nonces.clone(), ws_request_nonces.clone(),
ws_request_ids.clone(), ws_request_ids.clone(),
&mut connections &mut connections,
).await; )
.await;
} }
Message::Response((Response { ref ipc, .. }, ..)) => { Message::Response((Response { ref ipc, .. }, ..)) => {
println!("eth response"); println!("eth response");
@ -182,9 +193,8 @@ async fn handle_request(
payload: Option<Payload>, payload: Option<Payload>,
ws_request_nonces: WsRequestNonces, ws_request_nonces: WsRequestNonces,
ws_request_ids: WsRequestIds, ws_request_ids: WsRequestIds,
connections: &mut Connections connections: &mut Connections,
) -> Result<()> { ) -> Result<()> {
println!("request"); println!("request");
if let Ok(action) = serde_json::from_slice::<HttpServerRequest>(ipc) { if let Ok(action) = serde_json::from_slice::<HttpServerRequest>(ipc) {
@ -192,8 +202,10 @@ async fn handle_request(
HttpServerRequest::WebSocketOpen { path, channel_id } => { HttpServerRequest::WebSocketOpen { path, channel_id } => {
println!("open {:?}, {:?}", path, channel_id); println!("open {:?}, {:?}", path, channel_id);
} }
HttpServerRequest::WebSocketPush{channel_id, message_type} => { HttpServerRequest::WebSocketPush {
match message_type { channel_id,
message_type,
} => match message_type {
WsMessageType::Text => { WsMessageType::Text => {
println!("text"); println!("text");
@ -214,27 +226,28 @@ async fn handle_request(
let _new_text = json.to_string(); let _new_text = json.to_string();
let _ = connections.ws_sender.as_mut().unwrap() let _ = connections
.ws_sender
.as_mut()
.unwrap()
.send(TungsteniteMessage::Text(_new_text)) .send(TungsteniteMessage::Text(_new_text))
.await; .await;
}
},
WsMessageType::Binary => { WsMessageType::Binary => {
println!("binary"); println!("binary");
}, }
WsMessageType::Ping => { WsMessageType::Ping => {
println!("ping"); println!("ping");
}, }
WsMessageType::Pong => { WsMessageType::Pong => {
println!("pong"); println!("pong");
}, }
WsMessageType::Close => { WsMessageType::Close => {
println!("close"); println!("close");
}
}, },
}
}
HttpServerRequest::WebSocketClose(channel_id) => {} HttpServerRequest::WebSocketClose(channel_id) => {}
HttpServerRequest::Http(_) => todo!() HttpServerRequest::Http(_) => todo!(),
} }
} else if let Ok(action) = serde_json::from_slice::<EthRpcAction>(ipc) { } else if let Ok(action) = serde_json::from_slice::<EthRpcAction>(ipc) {
match action { match action {
@ -250,12 +263,9 @@ async fn handle_request(
} }
Ok(()) Ok(())
} }
fn handle_http () { fn handle_http() {}
}
fn handle_response(ipc: &Vec<u8>) -> Result<()> { fn handle_response(ipc: &Vec<u8>) -> Result<()> {
let Ok(message) = serde_json::from_slice::<HttpServerAction>(ipc) else { let Ok(message) = serde_json::from_slice::<HttpServerAction>(ipc) else {