mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-03 03:36:30 +03:00
saving subscription id for future cancellation
This commit is contained in:
parent
5dddba9faa
commit
d87eba49d1
@ -57,8 +57,9 @@ pub async fn provider(
|
||||
match km.message {
|
||||
Message::Request(Request { ipc, .. }) => {
|
||||
let _ = handle_request(
|
||||
our.clone(),
|
||||
ipc,
|
||||
our.clone(),
|
||||
km.id,
|
||||
km.source,
|
||||
km.payload,
|
||||
ws_request_ids.clone(),
|
||||
@ -78,8 +79,9 @@ pub async fn provider(
|
||||
}
|
||||
|
||||
async fn handle_request(
|
||||
our: String,
|
||||
ipc: Vec<u8>,
|
||||
our: String,
|
||||
km_id: u64,
|
||||
source: Address,
|
||||
payload: Option<Payload>,
|
||||
ws_request_ids: WsRequestIds,
|
||||
@ -89,7 +91,7 @@ async fn handle_request(
|
||||
if let Ok(action) = serde_json::from_slice::<HttpServerRequest>(&ipc) {
|
||||
let _ = handle_http_server_request(action, payload, ws_request_ids, connections);
|
||||
} else if let Ok(action) = serde_json::from_slice::<EthRequest>(&ipc) {
|
||||
let _ = handle_eth_request(action, our.clone(), source, connections, send_to_loop).await;
|
||||
let _ = handle_eth_request(action, our.clone(), km_id, source, connections, send_to_loop).await;
|
||||
} else {
|
||||
println!("unknown request");
|
||||
}
|
||||
@ -154,6 +156,7 @@ async fn handle_http_server_request(
|
||||
async fn handle_eth_request(
|
||||
action: EthRequest,
|
||||
our: String,
|
||||
km_id: u64,
|
||||
source: Address,
|
||||
connections: Arc<Mutex<RpcConnections>>,
|
||||
send_to_loop: MessageSender,
|
||||
@ -161,9 +164,11 @@ async fn handle_eth_request(
|
||||
match action {
|
||||
EthRequest::SubscribeLogs(request) => {
|
||||
|
||||
tokio::spawn(async move {
|
||||
let connections_for_task = connections.clone();
|
||||
|
||||
let mut connections_guard = connections.lock().await;
|
||||
let handle = tokio::spawn(async move {
|
||||
|
||||
let mut connections_guard = connections_for_task.lock().await;
|
||||
let ws_provider = connections_guard.ws_provider.as_mut().unwrap();
|
||||
let mut stream = match ws_provider.subscribe_logs(&request.filter.clone()).await {
|
||||
Ok(s) => s,
|
||||
@ -204,6 +209,11 @@ async fn handle_eth_request(
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
let mut connections_guard = connections.lock().await;
|
||||
|
||||
connections_guard.ws_provider_subs.insert(km_id, handle);
|
||||
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
@ -8,26 +8,49 @@ use serde::{Deserialize, Serialize};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
|
||||
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
|
||||
use std::collections::HashMap;
|
||||
|
||||
pub struct RpcConnections {
|
||||
pub ws_sender:
|
||||
Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>>,
|
||||
pub ws_provider: Option<Provider<Ws>>,
|
||||
pub ws_provider_subs: HashMap::<u64, tokio::task::JoinHandle<()>>,
|
||||
pub http_provider: Option<Provider<Http>>,
|
||||
pub uq_provider: Option<NodeId>,
|
||||
}
|
||||
|
||||
|
||||
impl Default for RpcConnections {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
ws_sender: None,
|
||||
ws_provider: None,
|
||||
ws_provider_subs:
|
||||
HashMap::<u64, tokio::task::JoinHandle<()>>::new(),
|
||||
http_provider: None,
|
||||
uq_provider: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum EthRpcError {
|
||||
NoRsvp,
|
||||
BadJson,
|
||||
NoJson,
|
||||
EventSubscriptionFailed,
|
||||
}
|
||||
impl EthRpcError {
|
||||
pub fn _kind(&self) -> &str {
|
||||
match *self {
|
||||
EthRpcError::NoRsvp { .. } => "NoRsvp",
|
||||
EthRpcError::BadJson { .. } => "BapJson",
|
||||
EthRpcError::NoJson { .. } => "NoJson",
|
||||
EthRpcError::EventSubscriptionFailed { .. } => "EventSubscriptionFailed",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct SubscribeLogs {
|
||||
pub filter: Filter,
|
||||
|
Loading…
Reference in New Issue
Block a user