moved types into types.rs for eth

This commit is contained in:
commercium-sys 2023-12-22 11:23:45 -08:00
parent 4611dc1eb1
commit 326e68d54a
3 changed files with 59 additions and 295 deletions

View File

@ -1 +1,2 @@
pub mod provider;
pub mod types;

View File

@ -9,42 +9,6 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
#[derive(Debug, Serialize, Deserialize)]
enum EthRpcAction {
SubscribeEvents(EthEventSubscription),
Unsubscribe(u64),
}
#[derive(Debug, Serialize, Deserialize)]
struct EthEventSubscription {
addresses: Option<Vec<String>>,
from_block: Option<u64>,
to_block: Option<u64>,
events: Option<Vec<String>>, // aka topic0s
topic1: Option<U256>,
topic2: Option<U256>,
topic3: Option<U256>,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum EthProviderError {
NoRsvp,
BadJson,
NoJson,
EventSubscriptionFailed,
}
impl EthProviderError {
pub fn _kind(&self) -> &str {
match *self {
EthProviderError::NoRsvp { .. } => "NoRsvp",
EthProviderError::BadJson { .. } => "BapJson",
EthProviderError::NoJson { .. } => "NoJson",
EthProviderError::EventSubscriptionFailed { .. } => "EventSubscriptionFailed",
}
}
}
pub async fn provider(
our: String,
rpc_url: String,
@ -84,12 +48,15 @@ pub async fn provider(
while let Some(km) = recv_in_client.recv().await {
match km.message {
Message::Request(request) => {
Message::Request(Request { ref ipc, .. }) => {
println!("eth request");
handle_request(ipc)?;
}
Message::Response(response) => {
Message::Response((Response { ref ipc, .. }, ..)) => {
println!("eth response");
handle_response(ipc)?;
}
Message::Response(_) => todo!(),
_ => {}
}
@ -99,264 +66,25 @@ pub async fn provider(
Ok(())
}
pub async fn eth_provider(
our: String,
rpc_url: String,
send_to_loop: MessageSender,
mut recv_in_client: MessageReceiver,
print_tx: PrintSender,
) -> Result<()> {
let mut subscriptions =
HashMap::<u64, tokio::task::JoinHandle<Result<(), EthProviderError>>>::new();
fn handle_request (ipc: &Vec<u8>) -> Result<()> {
while let Some(message) = recv_in_client.recv().await {
let our = our.clone();
let send_to_loop = send_to_loop.clone();
let print_tx = print_tx.clone();
let Ok(message) = serde_json::from_slice::<HttpServerAction>(ipc) else {
return Ok(());
};
let KernelMessage {
ref source,
ref rsvp,
message:
Message::Request(Request {
expects_response,
ipc: ref json_bytes,
..
}),
..
} = message
else {
panic!("eth_rpc: bad message");
};
let target = if expects_response.is_some() {
Address {
node: our.clone(),
process: source.process.clone(),
}
} else {
let Some(rsvp) = rsvp else {
send_to_loop
.send(make_error_message(
our.clone(),
&message,
EthProviderError::NoRsvp,
))
.await
.unwrap();
continue;
};
rsvp.clone()
};
// let call_data = content.payload.bytes.content.clone().unwrap_or(vec![]);
let Ok(action) = serde_json::from_slice::<EthRpcAction>(json_bytes) else {
send_to_loop
.send(make_error_message(
our.clone(),
&message,
EthProviderError::BadJson,
))
.await
.unwrap();
continue;
};
match action {
EthRpcAction::SubscribeEvents(sub) => {
send_to_loop
.send(KernelMessage {
id: message.id,
source: Address {
node: our.clone(),
process: ETH_PROCESS_ID.clone(),
},
target: match &message.rsvp {
None => message.source.clone(),
Some(rsvp) => rsvp.clone(),
},
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::to_vec::<Result<u64, EthProviderError>>(&Ok(
message.id,
))
.unwrap(),
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
})
.await
.unwrap();
let mut filter = Filter::new();
if let Some(addresses) = sub.addresses {
filter = filter.address(ValueOrArray::Array(
addresses.into_iter().map(|s| s.parse().unwrap()).collect(),
));
}
// TODO is there a cleaner way to do all of this?
if let Some(from_block) = sub.from_block {
filter = filter.from_block(from_block);
}
if let Some(to_block) = sub.to_block {
filter = filter.to_block(to_block);
}
if let Some(events) = sub.events {
filter = filter.events(&events);
}
if let Some(topic1) = sub.topic1 {
filter = filter.topic1(topic1);
}
if let Some(topic2) = sub.topic2 {
filter = filter.topic2(topic2);
}
if let Some(topic3) = sub.topic3 {
filter = filter.topic3(topic3);
}
let rpc_url = rpc_url.clone();
let handle = tokio::task::spawn(async move {
// when connection dies you need to restart at the last block you saw
// otherwise you replay events unnecessarily
let mut from_block: U64 =
filter.clone().get_from_block().unwrap_or(U64::zero());
loop {
// NOTE give main.rs uses rpc_url and panics if it can't connect, we do
// know that this should work in theory...can keep trying to reconnect
let Ok(ws_rpc) = Provider::<Ws>::connect(rpc_url.clone()).await else {
// TODO grab and print error
let _ = print_tx
.send(Printout {
verbosity: 0,
content: "eth_rpc: connection failed, retrying in 5s"
.to_string(),
})
.await;
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
};
match ws_rpc
.subscribe_logs(&filter.clone().from_block(from_block))
.await
{
Err(e) => {
let _ = print_tx
.send(Printout {
verbosity: 0,
content: format!("eth_rpc: subscription error: {:?}", e),
})
.await;
continue;
}
Ok(mut stream) => {
let _ = print_tx
.send(Printout {
verbosity: 0,
content: "eth_rpc: connection established".to_string(),
})
.await;
while let Some(event) = stream.next().await {
send_to_loop.send(
KernelMessage {
id: rand::random(),
source: Address {
node: our.clone(),
process: ETH_PROCESS_ID.clone(),
},
target: target.clone(),
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
ipc: json!({
"EventSubscription": serde_json::to_value(event.clone()).unwrap()
}).to_string().into_bytes(),
metadata: None,
}),
payload: None,
signed_capabilities: None,
}
).await.unwrap();
from_block = event.block_number.unwrap_or(from_block);
}
let _ = print_tx
.send(Printout {
verbosity: 0,
content:
"eth_rpc: subscription connection lost, reconnecting"
.to_string(),
})
.await;
}
};
}
});
subscriptions.insert(message.id, handle);
}
EthRpcAction::Unsubscribe(sub_id) => {
let _ = print_tx
.send(Printout {
verbosity: 0,
content: format!("eth_rpc: unsubscribing from {}", sub_id),
})
.await;
if let Some(handle) = subscriptions.remove(&sub_id) {
handle.abort();
} else {
let _ = print_tx
.send(Printout {
verbosity: 0,
content: format!("eth_rpc: no task found with id {}", sub_id),
})
.await;
}
}
}
}
println!("request message {:?}", message);
Ok(())
}
//
// helpers
//
fn handle_response (ipc: &Vec<u8>) -> Result<()> {
fn make_error_message(
our_name: String,
km: &KernelMessage,
error: EthProviderError,
) -> KernelMessage {
KernelMessage {
id: km.id,
source: Address {
node: our_name.clone(),
process: ETH_PROCESS_ID.clone(),
},
target: match &km.rsvp {
None => km.source.clone(),
Some(rsvp) => rsvp.clone(),
},
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::to_vec::<Result<u64, EthProviderError>>(&Err(error)).unwrap(),
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
}
let Ok(message) = serde_json::from_slice::<HttpServerAction>(ipc) else {
return Ok(());
};
println!("response message {:?}", message);
Ok(())
}

View File

@ -1,3 +1,7 @@
use crate::http::types::HttpServerAction;
use ethers::types::{ValueOrArray, U256, U64};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct EthEventSubscription {
addresses: Option<Vec<String>>,
@ -9,14 +13,22 @@ struct EthEventSubscription {
topic3: Option<U256>,
}
struct EthAccounts {
addresses: Option<Vec<String>>,
#[derive(Debug, Serialize, Deserialize)]
pub enum ProviderAction {
HttpServerAction(HttpServerAction),
EthRpcAction(EthRpcAction),
}
struct EthBlockNumber {
#[derive(Debug, Serialize, Deserialize)]
pub enum EthRpcAction {
Eth(EthMethod),
Debug(DebugMethod),
Net(NetMethod),
Trace(TraceMethod),
TxPool(TxPoolMethod),
}
#[derive(Debug, Serialize, Deserialize)]
enum DebugMethod {
GetRawBlock,
GetRawHeader,
@ -30,6 +42,7 @@ enum DebugMethod {
TraceTransaction,
}
#[derive(Debug, Serialize, Deserialize)]
enum EthMethod {
Accounts,
BlockNumber,
@ -75,12 +88,14 @@ enum EthMethod {
UninstallFilter,
}
#[derive(Debug, Serialize, Deserialize)]
enum NetMethod {
Listening,
PeerCount,
Version,
}
#[derive(Debug, Serialize, Deserialize)]
enum TraceMethod {
Call,
CallMany,
@ -91,8 +106,28 @@ enum TraceMethod {
Transaction,
}
#[derive(Debug, Serialize, Deserialize)]
enum TxPoolMethod {
Content,
Inspect,
Status,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum EthProviderError {
NoRsvp,
BadJson,
NoJson,
EventSubscriptionFailed,
}
impl EthProviderError {
pub fn _kind(&self) -> &str {
match *self {
EthProviderError::NoRsvp { .. } => "NoRsvp",
EthProviderError::BadJson { .. } => "BapJson",
EthProviderError::NoJson { .. } => "NoJson",
EthProviderError::EventSubscriptionFailed { .. } => "EventSubscriptionFailed",
}
}
}