removed eth_rpc from codebase and removed eth feature flag

This commit is contained in:
commercium-sys 2023-12-27 19:38:57 -05:00
parent 177c23f573
commit 6e98333126
8 changed files with 5 additions and 325 deletions

View File

@ -17,7 +17,6 @@ zip = "0.6"
[features]
simulation-mode = []
eth = []
[dependencies]
aes-gcm = "0.10.2"

View File

@ -12,7 +12,7 @@
"net:sys:uqbar",
"vfs:sys:uqbar",
"kernel:sys:uqbar",
"eth_rpc:sys:uqbar",
"eth:sys:uqbar",
{
"process": "vfs:sys:uqbar",
"params": {

View File

@ -7,7 +7,7 @@
"request_messaging": [
"net:sys:uqbar",
"http_server:sys:uqbar",
"eth_rpc:sys:uqbar"
"eth:sys:uqbar"
],
"public": true
}

View File

@ -155,7 +155,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
continue;
};
let Message::Request { source, ipc, .. } = message else {
// TODO we should store the subscription ID for eth_rpc
// TODO we should store the subscription ID for eth
// incase we want to cancel/reset it
continue;
};

View File

@ -33,7 +33,7 @@ pub async fn provider(
mut recv_in_client: MessageReceiver,
print_tx: PrintSender,
) -> Result<()> {
println!("eth_rpc: starting");
println!("eth: starting");
let open_ws = KernelMessage {
id: rand::random(),

View File

@ -1,301 +0,0 @@
use crate::types::*;
use anyhow::Result;
use ethers::core::types::Filter;
use ethers::prelude::Provider;
use ethers::types::{ValueOrArray, U256, U64};
use ethers_providers::{Middleware, StreamExt, Ws};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
#[derive(Debug, Serialize, Deserialize)]
enum EthRpcAction {
SubscribeEvents(EthEventSubscription),
Unsubscribe(u64),
}
#[derive(Debug, Serialize, Deserialize)]
struct EthEventSubscription {
addresses: Option<Vec<String>>,
from_block: Option<u64>,
to_block: Option<u64>,
events: Option<Vec<String>>, // aka topic0s
topic1: Option<U256>,
topic2: Option<U256>,
topic3: Option<U256>,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum EthRpcError {
NoRsvp,
BadJson,
NoJson,
EventSubscriptionFailed,
}
impl EthRpcError {
pub fn _kind(&self) -> &str {
match *self {
EthRpcError::NoRsvp { .. } => "NoRsvp",
EthRpcError::BadJson { .. } => "BapJson",
EthRpcError::NoJson { .. } => "NoJson",
EthRpcError::EventSubscriptionFailed { .. } => "EventSubscriptionFailed",
}
}
}
pub async fn eth_rpc(
our: String,
rpc_url: String,
send_to_loop: MessageSender,
mut recv_in_client: MessageReceiver,
print_tx: PrintSender,
) -> Result<()> {
let mut subscriptions = HashMap::<u64, tokio::task::JoinHandle<Result<(), EthRpcError>>>::new();
while let Some(message) = recv_in_client.recv().await {
let our = our.clone();
let send_to_loop = send_to_loop.clone();
let print_tx = print_tx.clone();
let KernelMessage {
ref source,
ref rsvp,
message:
Message::Request(Request {
expects_response,
ipc: ref json_bytes,
..
}),
..
} = message
else {
panic!("eth_rpc: bad message");
};
let target = if expects_response.is_some() {
Address {
node: our.clone(),
process: source.process.clone(),
}
} else {
let Some(rsvp) = rsvp else {
send_to_loop
.send(make_error_message(
our.clone(),
&message,
EthRpcError::NoRsvp,
))
.await
.unwrap();
continue;
};
rsvp.clone()
};
// let call_data = content.payload.bytes.content.clone().unwrap_or(vec![]);
let Ok(action) = serde_json::from_slice::<EthRpcAction>(json_bytes) else {
send_to_loop
.send(make_error_message(
our.clone(),
&message,
EthRpcError::BadJson,
))
.await
.unwrap();
continue;
};
match action {
EthRpcAction::SubscribeEvents(sub) => {
send_to_loop
.send(KernelMessage {
id: message.id,
source: Address {
node: our.clone(),
process: ETH_RPC_PROCESS_ID.clone(),
},
target: match &message.rsvp {
None => message.source.clone(),
Some(rsvp) => rsvp.clone(),
},
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::to_vec::<Result<u64, EthRpcError>>(
&Ok(message.id),
)
.unwrap(),
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
})
.await
.unwrap();
let mut filter = Filter::new();
if let Some(addresses) = sub.addresses {
filter = filter.address(ValueOrArray::Array(
addresses.into_iter().map(|s| s.parse().unwrap()).collect(),
));
}
// TODO is there a cleaner way to do all of this?
if let Some(from_block) = sub.from_block {
filter = filter.from_block(from_block);
}
if let Some(to_block) = sub.to_block {
filter = filter.to_block(to_block);
}
if let Some(events) = sub.events {
filter = filter.events(&events);
}
if let Some(topic1) = sub.topic1 {
filter = filter.topic1(topic1);
}
if let Some(topic2) = sub.topic2 {
filter = filter.topic2(topic2);
}
if let Some(topic3) = sub.topic3 {
filter = filter.topic3(topic3);
}
let rpc_url = rpc_url.clone();
let handle = tokio::task::spawn(async move {
// when connection dies you need to restart at the last block you saw
// otherwise you replay events unnecessarily
let mut from_block: U64 =
filter.clone().get_from_block().unwrap_or(U64::zero());
loop {
// NOTE give main.rs uses rpc_url and panics if it can't connect, we do
// know that this should work in theory...can keep trying to reconnect
let Ok(ws_rpc) = Provider::<Ws>::connect(rpc_url.clone()).await else {
// TODO grab and print error
let _ = print_tx
.send(Printout {
verbosity: 0,
content: "eth_rpc: connection failed, retrying in 5s"
.to_string(),
})
.await;
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
};
match ws_rpc
.subscribe_logs(&filter.clone().from_block(from_block))
.await
{
Err(e) => {
let _ = print_tx
.send(Printout {
verbosity: 0,
content: format!("eth_rpc: subscription error: {:?}", e),
})
.await;
continue;
}
Ok(mut stream) => {
let _ = print_tx
.send(Printout {
verbosity: 0,
content: "eth_rpc: connection established".to_string(),
})
.await;
while let Some(event) = stream.next().await {
send_to_loop.send(
KernelMessage {
id: rand::random(),
source: Address {
node: our.clone(),
process: ETH_RPC_PROCESS_ID.clone(),
},
target: target.clone(),
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
ipc: json!({
"EventSubscription": serde_json::to_value(event.clone()).unwrap()
}).to_string().into_bytes(),
metadata: None,
}),
payload: None,
signed_capabilities: None,
}
).await.unwrap();
from_block = event.block_number.unwrap_or(from_block);
}
let _ = print_tx
.send(Printout {
verbosity: 0,
content:
"eth_rpc: subscription connection lost, reconnecting"
.to_string(),
})
.await;
}
};
}
});
subscriptions.insert(message.id, handle);
}
EthRpcAction::Unsubscribe(sub_id) => {
let _ = print_tx
.send(Printout {
verbosity: 0,
content: format!("eth_rpc: unsubscribing from {}", sub_id),
})
.await;
if let Some(handle) = subscriptions.remove(&sub_id) {
handle.abort();
} else {
let _ = print_tx
.send(Printout {
verbosity: 0,
content: format!("eth_rpc: no task found with id {}", sub_id),
})
.await;
}
}
}
}
Ok(())
}
//
// helpers
//
fn make_error_message(our_name: String, km: &KernelMessage, error: EthRpcError) -> KernelMessage {
KernelMessage {
id: km.id,
source: Address {
node: our_name.clone(),
process: ETH_RPC_PROCESS_ID.clone(),
},
target: match &km.rsvp {
None => km.source.clone(),
Some(rsvp) => rsvp.clone(),
},
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::to_vec::<Result<u64, EthRpcError>>(&Err(error)).unwrap(),
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
}
}

View File

@ -12,7 +12,6 @@ use tokio::{fs, time::timeout};
use ring::{rand::SystemRandom, signature, signature::KeyPair};
mod eth;
mod eth_rpc;
mod http;
mod kernel;
mod keygen;
@ -33,7 +32,6 @@ const WEBSOCKET_SENDER_CHANNEL_CAPACITY: usize = 32;
const HTTP_CHANNEL_CAPACITY: usize = 32;
const HTTP_CLIENT_CHANNEL_CAPACITY: usize = 32;
const ETH_PROVIDER_CHANNEL_CAPACITY: usize = 32;
const ETH_RPC_CHANNEL_CAPACITY: usize = 32;
const VFS_CHANNEL_CAPACITY: usize = 1_000;
const CAP_CHANNEL_CAPACITY: usize = 1_000;
const KV_CHANNEL_CAPACITY: usize = 1_000;
@ -173,8 +171,6 @@ async fn main() {
mpsc::channel(HTTP_CHANNEL_CAPACITY);
let (eth_provider_sender, eth_provider_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(ETH_PROVIDER_CHANNEL_CAPACITY);
let (eth_rpc_sender, eth_rpc_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(ETH_RPC_CHANNEL_CAPACITY);
// http client performs http requests on behalf of processes
let (http_client_sender, http_client_receiver): (MessageSender, MessageReceiver) =
mpsc::channel(HTTP_CLIENT_CHANNEL_CAPACITY);
@ -338,11 +334,6 @@ async fn main() {
eth_provider_sender,
true,
),
(
ProcessId::new(Some("eth_rpc"), "sys", "uqbar"),
eth_rpc_sender,
true,
),
(
ProcessId::new(Some("vfs"), "sys", "uqbar"),
vfs_message_sender,
@ -463,7 +454,7 @@ async fn main() {
timer_service_receiver,
print_sender.clone(),
));
#[cfg(feature = "eth")]
#[cfg(not(feature = "simulation-mode"))]
tasks.spawn(eth::provider::provider(
our.name.clone(),
rpc_url.clone(),
@ -471,14 +462,6 @@ async fn main() {
eth_provider_receiver,
print_sender.clone(),
));
#[cfg(not(feature = "simulation-mode"))]
tasks.spawn(eth_rpc::eth_rpc(
our.name.clone(),
rpc_url.clone(),
kernel_message_sender.clone(),
eth_rpc_receiver,
print_sender.clone(),
));
tasks.spawn(vfs::vfs(
our.name.clone(),
kernel_message_sender.clone(),

View File

@ -6,7 +6,6 @@ use thiserror::Error;
lazy_static::lazy_static! {
pub static ref ETH_PROCESS_ID: ProcessId = ProcessId::new(Some("eth"), "sys", "uqbar");
pub static ref ETH_RPC_PROCESS_ID: ProcessId = ProcessId::new(Some("eth_rpc"), "sys", "uqbar");
pub static ref HTTP_CLIENT_PROCESS_ID: ProcessId = ProcessId::new(Some("http_client"), "sys", "uqbar");
pub static ref HTTP_SERVER_PROCESS_ID: ProcessId = ProcessId::new(Some("http_server"), "sys", "uqbar");
pub static ref KERNEL_PROCESS_ID: ProcessId = ProcessId::new(Some("kernel"), "sys", "uqbar");