hotfix: keep eth rpc connection alive

This commit is contained in:
dr-frmr 2024-01-11 14:22:54 -03:00
parent cc0bac5757
commit 0f758d0dda
No known key found for this signature in database

View File

@ -24,13 +24,15 @@ pub async fn provider(
) -> Result<()> {
bind_websockets(&our, &send_to_loop).await;
let mut connections = RpcConnections::default();
connections.ws_rpc_url = Some(rpc_url.to_string());
let connections = Arc::new(Mutex::new(connections));
match Url::parse(&rpc_url).unwrap().scheme() {
"http" | "https" => {
return Err(anyhow::anyhow!("eth: http provider not supported yet!"));
}
"ws" | "wss" => {
bootstrap_websocket_connections(&our, &rpc_url, &mut connections, &mut send_to_loop)
bootstrap_websocket_connections(&our, &rpc_url, connections.clone(), &mut send_to_loop)
.await
.map_err(|e| {
anyhow::anyhow!(
@ -45,8 +47,6 @@ pub async fn provider(
}
}
let connections = Arc::new(Mutex::new(connections));
while let Some(km) = recv_in_client.recv().await {
if let Message::Request(req) = &km.message {
match handle_request(&our, &km, req, &connections, &send_to_loop).await {
@ -161,65 +161,67 @@ async fn spawn_provider_read_stream(
connections: Arc<Mutex<RpcConnections>>,
send_to_loop: MessageSender,
) {
let mut connections_guard = connections.lock().await;
loop {
let mut connections_guard = connections.lock().await;
let Some(ref ws_rpc_url) = connections_guard.ws_rpc_url else {
todo!()
};
let ws_provider = match Provider::<Ws>::connect(&ws_rpc_url).await {
Ok(provider) => provider,
Err(e) => {
println!("error connecting to ws provider: {:?}", e);
return;
let Some(ref ws_rpc_url) = connections_guard.ws_rpc_url else {
todo!()
};
let ws_provider = match Provider::<Ws>::connect(&ws_rpc_url).await {
Ok(provider) => provider,
Err(e) => {
println!("error connecting to ws provider: {:?}", e);
return;
}
};
let mut stream = match ws_provider.subscribe_logs(&req.filter.clone()).await {
Ok(s) => s,
Err(e) => {
println!("error subscribing to logs: {:?}", e);
return;
}
};
let ws_provider_subscription = connections_guard
.ws_provider_subscriptions
.entry(km.id)
.or_insert(WsProviderSubscription::default());
ws_provider_subscription.provider = Some(ws_provider.clone());
ws_provider_subscription.subscription = Some(stream.id);
drop(connections_guard);
while let Some(event) = stream.next().await {
send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.clone(),
process: ETH_PROCESS_ID.clone(),
},
target: Address {
node: our.clone(),
process: km.source.process.clone(),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
body: json!({
"EventSubscription": serde_json::to_value(event.clone()).unwrap()
})
.to_string()
.into_bytes(),
metadata: None,
capabilities: vec![],
}),
lazy_load_blob: None,
})
.await
.unwrap();
}
};
let mut stream = match ws_provider.subscribe_logs(&req.filter.clone()).await {
Ok(s) => s,
Err(e) => {
println!("error subscribing to logs: {:?}", e);
return;
}
};
let ws_provider_subscription = connections_guard
.ws_provider_subscriptions
.entry(km.id)
.or_insert(WsProviderSubscription::default());
ws_provider_subscription.provider = Some(ws_provider.clone());
ws_provider_subscription.subscription = Some(stream.id);
drop(connections_guard);
while let Some(event) = stream.next().await {
send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.clone(),
process: ETH_PROCESS_ID.clone(),
},
target: Address {
node: our.clone(),
process: km.source.process.clone(),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
body: json!({
"EventSubscription": serde_json::to_value(event.clone()).unwrap()
})
.to_string()
.into_bytes(),
metadata: None,
capabilities: vec![],
}),
lazy_load_blob: None,
})
.await
.unwrap();
}
}
@ -256,36 +258,47 @@ async fn bind_websockets(our: &str, send_to_loop: &MessageSender) {
async fn bootstrap_websocket_connections(
our: &str,
rpc_url: &str,
connections: &mut RpcConnections,
connections: Arc<Mutex<RpcConnections>>,
send_to_loop: &mut MessageSender,
) -> Result<()> {
let (ws_stream, _response) = connect_async(rpc_url).await.map_err(|e| {
anyhow::anyhow!(
"eth: error connecting to websocket provider at {}: {:?}",
rpc_url,
e
)
})?;
let (ws_sender, ws_receiver) = ws_stream.split();
let our = our.to_string();
let rpc_url = rpc_url.to_string();
let send_to_loop = send_to_loop.clone();
let connections = connections.clone();
tokio::spawn(async move {
loop {
let Ok((ws_stream, _response)) = connect_async(&rpc_url).await else {
println!(
"error! couldn't connect to eth_rpc provider: {:?}, trying again in 3s\r",
rpc_url
);
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
continue;
};
let (ws_sender, mut ws_receiver) = ws_stream.split();
connections.ws_sender = Some(ws_sender);
connections.ws_rpc_url = Some(rpc_url.to_string());
connections.ws_provider = Some(Provider::<Ws>::connect(rpc_url).await?);
let mut connections_guard = connections.lock().await;
connections_guard.ws_sender = Some(ws_sender);
connections_guard.ws_provider = Some(Provider::<Ws>::connect(&rpc_url).await.unwrap());
drop(connections_guard);
tokio::spawn(handle_external_websocket_passthrough(
our.to_string(),
connections.ws_sender_ids.clone(),
ws_receiver,
send_to_loop.clone(),
));
handle_external_websocket_passthrough(
&our,
connections.clone(),
&mut ws_receiver,
&send_to_loop,
)
.await;
}
});
Ok(())
}
async fn handle_external_websocket_passthrough(
our: String,
ws_request_ids: WsRequestIds,
mut ws_receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
send_to_loop: MessageSender,
our: &str,
connections: Arc<Mutex<RpcConnections>>,
ws_receiver: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
send_to_loop: &MessageSender,
) {
while let Some(message) = ws_receiver.next().await {
match message {
@ -295,7 +308,7 @@ async fn handle_external_websocket_passthrough(
continue;
};
let id = json["id"].as_u64().unwrap() as u32;
let channel_id: u32 = *ws_request_ids.get(&id).unwrap();
let channel_id: u32 = *connections.lock().await.ws_sender_ids.get(&id).unwrap();
json["id"] = serde_json::Value::from(id - channel_id);
@ -303,11 +316,11 @@ async fn handle_external_websocket_passthrough(
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.clone(),
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: Address {
node: our.clone(),
node: our.to_string(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
rsvp: None,