mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-20 07:01:40 +03:00
compiling closure that should be able to field requests for any eth action to websockets or other source in the future
This commit is contained in:
parent
5a57520ecf
commit
0dce0f81ed
@ -1,4 +1,5 @@
|
|||||||
use crate::http::types::HttpServerAction;
|
use crate::http::types::HttpServerAction;
|
||||||
|
use crate::eth::types::EthRpcAction;
|
||||||
use crate::types::*;
|
use crate::types::*;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use ethers::core::types::Filter;
|
use ethers::core::types::Filter;
|
||||||
@ -8,6 +9,12 @@ use ethers_providers::{Middleware, StreamExt, Ws};
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use tokio_tungstenite::WebSocketStream;
|
||||||
|
use tokio_tungstenite::connect_async;
|
||||||
|
use futures::SinkExt;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
pub async fn provider(
|
pub async fn provider(
|
||||||
our: String,
|
our: String,
|
||||||
@ -46,6 +53,8 @@ pub async fn provider(
|
|||||||
|
|
||||||
send_to_loop.send(open_ws).await;
|
send_to_loop.send(open_ws).await;
|
||||||
|
|
||||||
|
// let dispatch = get_dispatch(rpc_url, send_to_loop.clone()).await;
|
||||||
|
|
||||||
while let Some(km) = recv_in_client.recv().await {
|
while let Some(km) = recv_in_client.recv().await {
|
||||||
match km.message {
|
match km.message {
|
||||||
Message::Request(Request { ref ipc, .. }) => {
|
Message::Request(Request { ref ipc, .. }) => {
|
||||||
@ -66,6 +75,45 @@ pub async fn provider(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_dispatch(rpc_url: String, send_to_loop: MessageSender) -> Box<dyn Fn(EthRpcAction) -> Pin<Box<dyn Future<Output = ()> + Send >>> {
|
||||||
|
|
||||||
|
let parsed = Url::parse(&rpc_url).unwrap();
|
||||||
|
|
||||||
|
match parsed.scheme() {
|
||||||
|
"http" | "https" => { unreachable!() }
|
||||||
|
"ws" | "wss" => { return ws_dispatch(rpc_url.clone(), send_to_loop).await }
|
||||||
|
_ => { unreachable!() }
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn ws_dispatch(rpc_url: String, send_to_loop: MessageSender) -> Box<dyn Fn(EthRpcAction) -> Pin<Box<dyn Future<Output = ()> + Send >>> {
|
||||||
|
|
||||||
|
let provider = Provider::<Ws>::connect(&rpc_url).await;
|
||||||
|
|
||||||
|
Box::new(move |action| {
|
||||||
|
let send_to_loop = send_to_loop.clone();
|
||||||
|
let rpc_url = rpc_url.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
match action {
|
||||||
|
EthRpcAction::JsonRpcRequest(json) => {
|
||||||
|
let (mut ws_stream, _) = connect_async(&rpc_url).await.expect("failed to connect");
|
||||||
|
ws_stream.send(tokio_tungstenite::tungstenite::Message::Text(json)).await.unwrap();
|
||||||
|
|
||||||
|
while let Some(msg) = ws_stream.next().await {
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
EthRpcAction::Eth(method) => {}
|
||||||
|
EthRpcAction::Debug(method) => {}
|
||||||
|
EthRpcAction::Net(method) => {}
|
||||||
|
EthRpcAction::Trace(method) => {}
|
||||||
|
EthRpcAction::TxPool(method) => {}
|
||||||
|
};
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
fn handle_request(ipc: &Vec<u8>) -> Result<()> {
|
fn handle_request(ipc: &Vec<u8>) -> Result<()> {
|
||||||
let Ok(message) = serde_json::from_slice::<HttpServerAction>(ipc) else {
|
let Ok(message) = serde_json::from_slice::<HttpServerAction>(ipc) else {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
Loading…
Reference in New Issue
Block a user