simplest robust working

This commit is contained in:
dr-frmr 2024-01-12 18:46:04 -03:00
parent b6d28885e2
commit 88b826a1d4
No known key found for this signature in database
7 changed files with 160 additions and 244 deletions

1
Cargo.lock generated
View File

@ -3029,6 +3029,7 @@ dependencies = [
"static_dir",
"thiserror",
"tokio",
"tokio-stream",
"tokio-tungstenite 0.20.1",
"url",
"uuid 1.4.1",

View File

@ -66,6 +66,7 @@ snow = { version = "0.9.3", features = ["ring-resolver"] }
static_dir = "0.2.0"
thiserror = "1.0"
tokio = { version = "1.28", features = ["fs", "macros", "rt-multi-thread", "signal", "sync"] }
tokio-stream = "0.1.14"
tokio-tungstenite = "0.20.1"
url = "2.4.1"
uuid = { version = "1.1.2", features = ["serde", "v4"] }

View File

@ -1123,8 +1123,9 @@ dependencies = [
[[package]]
name = "nectar_process_lib"
version = "0.5.0"
source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?tag=v0.5.0-alpha#235f4aae4d8078e86d3114753ed79ccedb947ebe"
source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?rev=d19ff3d#d19ff3d15df2155e441939dd6432342f9ec1b340"
dependencies = [
"alloy-rpc-types",
"anyhow",
"bincode",
"ethers-core",

View File

@ -20,7 +20,7 @@ hex = "0.4.3"
rmp-serde = "1.1.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
nectar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", tag = "v0.5.0-alpha", features = ["eth"] }
nectar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "d19ff3d", features = ["eth"] }
wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "efcc759" }
[lib]

View File

@ -1,6 +1,6 @@
use alloy_rpc_types::Log;
use alloy_sol_types::{sol, SolEvent};
use nectar_process_lib::eth::{EthAddress, SubscribeLogsRequest};
use nectar_process_lib::eth::{EthAddress, EthSubEvent, SubscribeLogsRequest};
use nectar_process_lib::{
await_message, get_typed_state, http, print_to_terminal, println, set_state, Address,
LazyLoadBlob, Message, Request, Response,
@ -31,11 +31,6 @@ struct State {
block: u64,
}
#[derive(Debug, Serialize, Deserialize)]
enum IndexerActions {
EventSubscription(Log),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NetActions {
NdnsUpdate(NdnsUpdate),
@ -146,7 +141,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
))?
.send()?;
SubscribeLogsRequest::new()
SubscribeLogsRequest::new(1) // subscription id 1
.address(EthAddress::from_str(contract_address.unwrap().as_str())?)
.from_block(state.block - 1)
.events(vec![
@ -206,20 +201,20 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
continue;
}
let Ok(msg) = serde_json::from_slice::<IndexerActions>(&body) else {
let Ok(msg) = serde_json::from_slice::<EthSubEvent>(&body) else {
println!("ndns_indexer: got invalid message");
continue;
};
match msg {
IndexerActions::EventSubscription(e) => {
state.block = e.clone().block_number.expect("expect").to::<u64>();
EthSubEvent::Log(log) => {
state.block = log.block_number.expect("expect").to::<u64>();
let node_id: alloy_primitives::FixedBytes<32> = e.topics[1];
let node_id: alloy_primitives::FixedBytes<32> = log.topics[1];
let name = match state.names.entry(node_id.clone().to_string()) {
let name = match state.names.entry(node_id.to_string()) {
Entry::Occupied(o) => o.into_mut(),
Entry::Vacant(v) => v.insert(get_name(&e)),
Entry::Vacant(v) => v.insert(get_name(&log)),
};
let node = state
@ -229,15 +224,15 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
let mut send = true;
match e.topics[0].clone() {
match log.topics[0] {
KeyUpdate::SIGNATURE_HASH => {
node.public_key = KeyUpdate::abi_decode_data(&e.data, true)
node.public_key = KeyUpdate::abi_decode_data(&log.data, true)
.unwrap()
.0
.to_string();
}
IpUpdate::SIGNATURE_HASH => {
let ip = IpUpdate::abi_decode_data(&e.data, true).unwrap().0;
let ip = IpUpdate::abi_decode_data(&log.data, true).unwrap().0;
node.ip = format!(
"{}.{}.{}.{}",
(ip >> 24) & 0xFF,
@ -247,10 +242,10 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
);
}
WsUpdate::SIGNATURE_HASH => {
node.port = WsUpdate::abi_decode_data(&e.data, true).unwrap().0;
node.port = WsUpdate::abi_decode_data(&log.data, true).unwrap().0;
}
RoutingUpdate::SIGNATURE_HASH => {
node.routers = RoutingUpdate::abi_decode_data(&e.data, true)
node.routers = RoutingUpdate::abi_decode_data(&log.data, true)
.unwrap()
.0
.iter()

View File

@ -1,20 +1,15 @@
use crate::eth::types::*;
use crate::http::server_types::{HttpServerAction, HttpServerRequest, WsMessageType};
use crate::types::*;
use anyhow::Result;
use ethers::prelude::Provider;
use ethers::types::Filter;
use ethers_providers::{Middleware, StreamExt, Ws};
use futures::stream::SplitStream;
use futures::SinkExt;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use url::Url;
const WS_RECONNECTS: usize = 10_000; // TODO workshop this
/// The ETH provider runtime process is responsible for connecting to one or more ETH RPC providers
/// and using them to service indexing requests from other apps. This could also be done by a wasm
/// app, but in the future, this process will hopefully expand in scope to perform more complex
@ -26,18 +21,36 @@ pub async fn provider(
mut recv_in_client: MessageReceiver,
print_tx: PrintSender,
) -> Result<()> {
let our = Arc::new(our);
// for now, we can only handle WebSocket RPC URLs. In the future, we should
// be able to handle HTTP too, at least.
match Url::parse(&rpc_url)?.scheme() {
"http" | "https" => {
return Err(anyhow::anyhow!("eth: http provider not supported yet!"));
return Err(anyhow::anyhow!(
"eth: fatal: http provider not supported yet!"
));
}
"ws" | "wss" => {}
_ => {
return Err(anyhow::anyhow!("eth: provider must use http or ws!"));
return Err(anyhow::anyhow!("eth: fatal: provider must use http or ws!"));
}
}
let provider = match Provider::<Ws>::connect_with_reconnects(&rpc_url, WS_RECONNECTS).await {
Ok(provider) => provider,
Err(e) => {
return Err(anyhow::anyhow!(
"eth: fatal: given RPC URL could not connect! {e:?}"
));
}
};
println!("eth: provider made\r");
let mut connections = RpcConnections {
provider,
ws_provider_subscriptions: HashMap::new(),
};
while let Some(km) = recv_in_client.recv().await {
// this module only handles requests, ignores all responses
let Message::Request(req) = &km.message else {
@ -46,7 +59,16 @@ pub async fn provider(
let Ok(action) = serde_json::from_slice::<EthAction>(&req.body) else {
continue;
};
match handle_request(&our, action, &send_to_loop).await {
println!("eth: action received\r");
match handle_request(
our.clone(),
&km.rsvp.unwrap_or(km.source.clone()),
action,
&mut connections,
&send_to_loop,
)
.await
{
Ok(()) => {}
Err(e) => {
let _ = print_tx
@ -55,6 +77,32 @@ pub async fn provider(
content: format!("eth: error handling request: {:?}", e),
})
.await;
if req.expects_response.is_some() {
send_to_loop
.send(KernelMessage {
id: km.id,
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: Address {
node: our.to_string(),
process: km.source.process.clone(),
},
rsvp: None,
message: Message::Response((
Response {
inherit: false,
body: serde_json::to_vec::<Result<(), EthError>>(&Err(e))?,
metadata: None,
capabilities: vec![],
},
None,
)),
lazy_load_blob: None,
})
.await?;
}
}
}
}
@ -62,211 +110,79 @@ pub async fn provider(
}
async fn handle_request(
our: &str,
our: Arc<String>,
target: &Address,
action: EthAction,
connections: &mut RpcConnections,
send_to_loop: &MessageSender,
) -> Result<(), anyhow::Error> {
) -> Result<(), EthError> {
match action {
EthAction::SubscribeLogs(req) => {
todo!()
EthAction::SubscribeLogs { sub_id, filter } => {
if connections.ws_provider_subscriptions.contains_key(&sub_id) {
return Err(EthError::SubscriptionIdCollision);
}
let handle = tokio::spawn(handle_subscription_stream(
our.clone(),
connections.provider.clone(),
filter,
target.clone(),
send_to_loop.clone(),
));
connections.ws_provider_subscriptions.insert(sub_id, handle);
Ok(())
}
EthAction::UnsubscribeLogs(channel_id) => {
todo!()
EthAction::UnsubscribeLogs(sub_id) => {
let handle = connections
.ws_provider_subscriptions
.remove(&sub_id)
.ok_or(EthError::SubscriptionNotFound)?;
handle.abort();
Ok(())
}
}
Ok(())
}
async fn spawn_provider_read_stream(
our: String,
req: SubscribeLogs,
km: KernelMessage,
connections: Arc<Mutex<RpcConnections>>,
/// Executed as a long-lived task. The JoinHandle is stored in the `connections` map.
/// This task is responsible for connecting to the ETH RPC provider and streaming logs
/// for a specific subscription made by a process.
async fn handle_subscription_stream(
our: Arc<String>,
provider: Provider<Ws>,
filter: Filter,
target: Address,
send_to_loop: MessageSender,
) {
loop {
let mut connections_guard = connections.lock().await;
let Some(ref ws_rpc_url) = connections_guard.ws_rpc_url else {
todo!()
};
let ws_provider = match Provider::<Ws>::connect(&ws_rpc_url).await {
Ok(provider) => provider,
Err(e) => {
println!("error connecting to ws provider: {:?}", e);
return;
}
};
let mut stream = match ws_provider.subscribe_logs(&req.filter.clone()).await {
Ok(s) => s,
Err(e) => {
println!("error subscribing to logs: {:?}", e);
return;
}
};
let ws_provider_subscription = connections_guard
.ws_provider_subscriptions
.entry(km.id)
.or_insert(WsProviderSubscription::default());
ws_provider_subscription.provider = Some(ws_provider.clone());
ws_provider_subscription.subscription = Some(stream.id);
drop(connections_guard);
while let Some(event) = stream.next().await {
send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.clone(),
process: ETH_PROCESS_ID.clone(),
},
target: Address {
node: our.clone(),
process: km.source.process.clone(),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
body: json!({
"EventSubscription": serde_json::to_value(event.clone()).unwrap()
})
.to_string()
.into_bytes(),
metadata: None,
capabilities: vec![],
}),
lazy_load_blob: None,
})
.await
.unwrap();
}
}
}
async fn bind_websockets(our: &str, send_to_loop: &MessageSender) {
let _ = send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: Address {
node: our.to_string(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
body: serde_json::to_vec(&HttpServerAction::WebSocketBind {
path: "/".to_string(),
authenticated: false,
encrypted: false,
})
.unwrap(),
metadata: None,
expects_response: None,
capabilities: vec![],
}),
lazy_load_blob: None,
})
.await;
}
async fn bootstrap_websocket_connections(
our: &str,
rpc_url: &str,
connections: Arc<Mutex<RpcConnections>>,
send_to_loop: &mut MessageSender,
) -> Result<()> {
let our = our.to_string();
let rpc_url = rpc_url.to_string();
let send_to_loop = send_to_loop.clone();
let connections = connections.clone();
tokio::spawn(async move {
loop {
let Ok((ws_stream, _response)) = connect_async(&rpc_url).await else {
println!(
"error! couldn't connect to eth_rpc provider: {:?}, trying again in 3s\r",
rpc_url
);
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
continue;
};
let (ws_sender, mut ws_receiver) = ws_stream.split();
let mut connections_guard = connections.lock().await;
connections_guard.ws_sender = Some(ws_sender);
connections_guard.ws_provider = Some(Provider::<Ws>::connect(&rpc_url).await.unwrap());
drop(connections_guard);
handle_external_websocket_passthrough(
&our,
connections.clone(),
&mut ws_receiver,
&send_to_loop,
)
.await;
}
});
Ok(())
}
async fn handle_external_websocket_passthrough(
our: &str,
connections: Arc<Mutex<RpcConnections>>,
ws_receiver: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
send_to_loop: &MessageSender,
) {
while let Some(message) = ws_receiver.next().await {
match message {
Ok(msg) => {
if let Ok(text) = msg.into_text() {
let Ok(mut json) = serde_json::from_str::<serde_json::Value>(&text) else {
continue;
};
let id = json["id"].as_u64().unwrap() as u32;
let channel_id: u32 = *connections.lock().await.ws_sender_ids.get(&id).unwrap();
json["id"] = serde_json::Value::from(id - channel_id);
let _ = send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: Address {
node: our.to_string(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
body: serde_json::to_vec(&HttpServerAction::WebSocketPush {
channel_id,
message_type: WsMessageType::Text,
})
.unwrap(),
metadata: None,
expects_response: None,
capabilities: vec![],
}),
lazy_load_blob: Some(LazyLoadBlob {
bytes: json.to_string().as_bytes().to_vec(),
mime: None,
}),
})
.await;
}
}
Err(_) => break,
) -> Result<(), EthError> {
println!("eth: handling subscription stream\r");
let mut stream = match provider.subscribe_logs(&filter).await {
Ok(s) => s,
Err(e) => {
return Err(EthError::ProviderError(e.to_string()));
}
};
while let Some(event) = stream.next().await {
send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: target.clone(),
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&EthSubEvent::Log(event)).unwrap(),
metadata: None,
capabilities: vec![],
}),
lazy_load_blob: None,
})
.await
.unwrap();
}
Err(EthError::SubscriptionClosed)
}

View File

@ -1,6 +1,6 @@
use ethers::prelude::Provider;
use ethers::types::{Filter, Log, U256};
use ethers_providers::{Middleware, Ws};
use ethers::types::{Filter, Log};
use ethers_providers::Ws;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::task::JoinHandle;
@ -17,6 +17,21 @@ pub enum EthAction {
UnsubscribeLogs(u64),
}
/// The Response type which a process will get from requesting with an [`EthAction`] will be
/// of the form `Result<(), EthError>`, serialized and deserialized using `serde_json::to_vec`
/// and `serde_json::from_slice`.
#[derive(Debug, Serialize, Deserialize)]
pub enum EthError {
/// The subscription ID already existed
SubscriptionIdCollision,
/// The ethers provider threw an error when trying to subscribe
/// (contains ProviderError serialized to debug string)
ProviderError(String),
SubscriptionClosed,
/// The subscription ID was not found, so we couldn't unsubscribe.
SubscriptionNotFound,
}
/// The Request type which a process will get from using SubscribeLogs to subscribe
/// to a log.
///
@ -32,19 +47,6 @@ pub enum EthSubEvent {
/// Primary state object of the `eth` module
pub struct RpcConnections {
pub ws_rpc_url: String,
pub ws_provider_subscriptions: HashMap<u64, WsProviderSubscription>,
}
pub struct WsProviderSubscription {
pub handle: JoinHandle<()>,
pub provider: Provider<Ws>,
pub subscription: U256,
}
impl WsProviderSubscription {
pub async fn kill(&self) {
let _ = self.provider.unsubscribe(self.subscription).await;
self.handle.abort();
}
pub ws_provider_subscriptions: HashMap<u64, JoinHandle<Result<(), EthError>>>,
}