eth: better types and errors

This commit is contained in:
bitful-pannul 2024-02-12 16:11:10 -03:00
parent 5ccd2c2dfd
commit 6806d84d65
5 changed files with 107 additions and 92 deletions

9
Cargo.lock generated
View File

@ -484,7 +484,7 @@ dependencies = [
"alloy-sol-types 0.5.4", "alloy-sol-types 0.5.4",
"anyhow", "anyhow",
"bincode", "bincode",
"kinode_process_lib 0.5.9 (git+https://github.com/kinode-dao/process_lib?tag=v0.5.9-alpha)", "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=1e14a03)",
"rand 0.8.5", "rand 0.8.5",
"serde", "serde",
"serde_json", "serde_json",
@ -3398,10 +3398,8 @@ name = "kinode_process_lib"
version = "0.5.9" version = "0.5.9"
source = "git+https://github.com/kinode-dao/process_lib?tag=v0.5.9-alpha#5e705086bbd10fde89e11d3e3671f6a618a875a7" source = "git+https://github.com/kinode-dao/process_lib?tag=v0.5.9-alpha#5e705086bbd10fde89e11d3e3671f6a618a875a7"
dependencies = [ dependencies = [
"alloy-rpc-types 0.1.0 (git+https://github.com/alloy-rs/alloy.git?rev=3b1c310)",
"anyhow", "anyhow",
"bincode", "bincode",
"ethers-core",
"http 1.0.0", "http 1.0.0",
"mime_guess", "mime_guess",
"rand 0.8.5", "rand 0.8.5",
@ -3449,7 +3447,7 @@ dependencies = [
[[package]] [[package]]
name = "kinode_process_lib" name = "kinode_process_lib"
version = "0.6.0" version = "0.6.0"
source = "git+https://github.com/kinode-dao/process_lib?rev=8d58cfb#8d58cfba0302681b6971cac26ea3f7e49d4602ec" source = "git+https://github.com/kinode-dao/process_lib?rev=1e14a03#1e14a03e3e274f0bc7f16a08d81c7583589b12be"
dependencies = [ dependencies = [
"alloy-json-rpc", "alloy-json-rpc",
"alloy-primitives 0.6.2", "alloy-primitives 0.6.2",
@ -3500,12 +3498,11 @@ name = "kns_indexer"
version = "0.2.0" version = "0.2.0"
dependencies = [ dependencies = [
"alloy-primitives 0.6.2", "alloy-primitives 0.6.2",
"alloy-rpc-types 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=098ad56)",
"alloy-sol-types 0.6.2", "alloy-sol-types 0.6.2",
"anyhow", "anyhow",
"bincode", "bincode",
"hex", "hex",
"kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=8d58cfb)", "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=1e14a03)",
"rmp-serde", "rmp-serde",
"serde", "serde",
"serde_json", "serde_json",

View File

@ -7,11 +7,10 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
alloy-primitives = "0.6.2" alloy-primitives = "0.6.2"
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56"}
alloy-sol-types = "0.6.2" alloy-sol-types = "0.6.2"
bincode = "1.3.3" bincode = "1.3.3"
hex = "0.4.3" hex = "0.4.3"
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "8d58cfb" } kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "1e14a03" }
rmp-serde = "1.1.2" rmp-serde = "1.1.2"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"

View File

@ -1,24 +1,19 @@
use alloy_primitives::Address as EthAddress;
use alloy_rpc_types::{
pubsub::{Params, SubscriptionKind, SubscriptionResult},
BlockNumberOrTag, Filter, Log,
};
use alloy_sol_types::{sol, SolEvent}; use alloy_sol_types::{sol, SolEvent};
use kinode_process_lib::{ use kinode_process_lib::{
await_message, await_message,
eth::{get_block_number, get_logs, EthResponse}, eth::{
get_block_number, get_logs, Address as EthAddress, BlockNumberOrTag, EthAction, EthMessage,
EthResponse, Filter, Log, Params, SubscriptionKind, SubscriptionResult,
},
get_typed_state, print_to_terminal, println, set_state, Address, Message, Request, Response, get_typed_state, print_to_terminal, println, set_state, Address, Message, Request, Response,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::string::FromUtf8Error; use std::collections::{
use std::{ hash_map::{Entry, HashMap},
collections::{ BTreeMap,
hash_map::{Entry, HashMap},
BTreeMap,
},
str::FromStr,
}; };
use std::string::FromUtf8Error;
wit_bindgen::generate!({ wit_bindgen::generate!({
path: "wit", path: "wit",
@ -28,23 +23,6 @@ wit_bindgen::generate!({
}, },
}); });
//TEMP
#[derive(Debug, Serialize, Deserialize)]
pub enum EthAction {
/// Subscribe to logs with a custom filter. ID is to be used to unsubscribe.
SubscribeLogs {
sub_id: u64,
kind: SubscriptionKind,
params: Params,
},
/// Kill a SubscribeLogs subscription of a given ID, to stop getting updates.
UnsubscribeLogs(u64),
Request {
method: String,
params: serde_json::Value,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
struct State { struct State {
// what contract this state pertains to // what contract this state pertains to
@ -221,10 +199,9 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
Request::new() Request::new()
.target((&our.node, "eth", "distro", "sys")) .target((&our.node, "eth", "distro", "sys"))
.body(serde_json::to_vec(&EthAction::SubscribeLogs { .body(serde_json::to_vec(&EthMessage {
sub_id: 8, id: 8,
kind, action: EthAction::SubscribeLogs { kind, params },
params,
})?) })?)
.send()?; .send()?;
@ -285,17 +262,16 @@ fn handle_eth_message(
pending_requests: &mut BTreeMap<u64, Vec<IndexerRequests>>, pending_requests: &mut BTreeMap<u64, Vec<IndexerRequests>>,
body: &[u8], body: &[u8],
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let Ok(res) = serde_json::from_slice::<EthResponse>(body) else { let Ok(res) = serde_json::from_slice::<EthMessage>(body) else {
return Err(anyhow::anyhow!("kns_indexer: got invalid message")); return Err(anyhow::anyhow!("kns_indexer: got invalid message"));
}; };
match res { match res.action {
EthResponse::Sub { id, result } => match result { EthAction::Sub { result } => {
SubscriptionResult::Log(log) => { if let SubscriptionResult::Log(log) = result {
handle_log(our, state, &log)?; handle_log(our, state, &log)?;
} }
_ => {} }
},
_ => {} _ => {}
} }

View File

@ -87,8 +87,10 @@ pub async fn provider(
) )
.await .await
{ {
println!("got error: {:?}", e); let _ = send_to_loop
} .send(make_error_message(our.to_string(), km, e))
.await;
};
}); });
} }
Err(anyhow::anyhow!("eth: fatal: message receiver closed!")) Err(anyhow::anyhow!("eth: fatal: message receiver closed!"))
@ -104,30 +106,24 @@ async fn handle_request(
public: Arc<bool>, public: Arc<bool>,
) -> Result<(), EthError> { ) -> Result<(), EthError> {
let Message::Request(req) = &km.message else { let Message::Request(req) = &km.message else {
return Err(EthError::ProviderError( return Err(EthError::InvalidMethod(
"eth: only accepts requests".to_string(), "eth: only accepts requests".to_string(),
)); ));
}; };
if let Some(provider) = provider.as_ref() { if let Some(provider) = provider.as_ref() {
let action = serde_json::from_slice::<EthAction>(&req.body).map_err(|e| { let ethmsg = serde_json::from_slice::<EthMessage>(&req.body).map_err(|e| {
EthError::ProviderError(format!("eth: failed to deserialize request: {:?}", e)) EthError::InvalidMethod(format!("eth: failed to deserialize request: {:?}", e))
})?; })?;
if !*public && km.source.node != our { if !*public && km.source.node != our {
return Err(EthError::ProviderError( return Err(EthError::PermissionDenied("not on the list.".to_string()));
"eth: only accepts requests from apps".to_string(),
));
} }
// we might want some of these in payloads.. sub items? // we might want some of these in payloads.. sub items?
let return_body: EthResponse = match action { let return_body: EthResponse = match ethmsg.action {
EthAction::SubscribeLogs { EthAction::SubscribeLogs { kind, params } => {
sub_id, let sub_id = (km.target.process.clone(), ethmsg.id);
kind,
params,
} => {
let sub_id = (km.target.process.clone(), sub_id);
let kind = serde_json::to_value(&kind).unwrap(); let kind = serde_json::to_value(&kind).unwrap();
let params = serde_json::to_value(&params).unwrap(); let params = serde_json::to_value(&params).unwrap();
@ -136,7 +132,7 @@ async fn handle_request(
.inner() .inner()
.prepare("eth_subscribe", [kind, params]) .prepare("eth_subscribe", [kind, params])
.await .await
.unwrap(); .map_err(|e| EthError::TransportError(e.to_string()))?;
let target = km.rsvp.clone().unwrap_or_else(|| Address { let target = km.rsvp.clone().unwrap_or_else(|| Address {
node: our.to_string(), node: our.to_string(),
@ -155,8 +151,8 @@ async fn handle_request(
connections.insert(sub_id, handle); connections.insert(sub_id, handle);
EthResponse::Ok EthResponse::Ok
} }
EthAction::UnsubscribeLogs(sub_id) => { EthAction::UnsubscribeLogs => {
let sub_id = (km.target.process.clone(), sub_id); let sub_id = (km.target.process.clone(), ethmsg.id);
let handle = connections let handle = connections
.remove(&sub_id) .remove(&sub_id)
.ok_or(EthError::SubscriptionNotFound)?; .ok_or(EthError::SubscriptionNotFound)?;
@ -165,16 +161,20 @@ async fn handle_request(
EthResponse::Ok EthResponse::Ok
} }
EthAction::Request { method, params } => { EthAction::Request { method, params } => {
let method = to_static_str(&method).ok_or(EthError::ProviderError(format!( let method = to_static_str(&method).ok_or(EthError::InvalidMethod(method))?;
"eth: method not found: {}",
method
)))?;
// throw transportErrorKinds straight back to process let response: serde_json::Value = provider
let response: serde_json::Value = .inner()
provider.inner().prepare(method, params).await.unwrap(); .prepare(method, params)
.await
.map_err(|e| EthError::TransportError(e.to_string()))?;
EthResponse::Request(response) EthResponse::Response { value: response }
}
EthAction::Sub { .. } => {
return Err(EthError::InvalidMethod(
"eth: provider doesn't accept sub resultss".to_string(),
))
} }
}; };
@ -247,11 +247,12 @@ async fn handle_subscription_stream(
Err(e) => { Err(e) => {
println!("got an error from the subscription stream: {:?}", e); println!("got an error from the subscription stream: {:?}", e);
// TODO should we stop the subscription here? // TODO should we stop the subscription here?
// return Err(EthError::ProviderError(format!("{:?}", e))); // return Err(EthError::TransportError??(format!("{:?}", e)));
} }
Ok(value) => { Ok(value) => {
let event: SubscriptionResult = serde_json::from_str(value.get()) let event: SubscriptionResult = serde_json::from_str(value.get()).map_err(|_| {
.map_err(|e| EthError::ProviderError(format!("{:?}", e)))?; EthError::RpcError("eth: failed to deserialize subscription result".to_string())
})?;
send_to_loop send_to_loop
.send(KernelMessage { .send(KernelMessage {
id: rand::random(), id: rand::random(),
@ -264,9 +265,9 @@ async fn handle_subscription_stream(
message: Message::Request(Request { message: Message::Request(Request {
inherit: false, inherit: false,
expects_response: None, expects_response: None,
body: serde_json::to_vec(&EthResponse::Sub { body: serde_json::to_vec(&EthMessage {
id: sub_id, id: sub_id,
result: event, action: EthAction::Sub { result: event },
}) })
.unwrap(), .unwrap(),
metadata: None, metadata: None,
@ -278,5 +279,32 @@ async fn handle_subscription_stream(
.unwrap(); .unwrap();
} }
} }
Err(EthError::SubscriptionClosed) Err(EthError::SubscriptionClosed(sub_id))
}
// todo, always send errors or no? general runtime question for other modules too.
fn make_error_message(our_node: String, km: KernelMessage, error: EthError) -> KernelMessage {
let source = km.rsvp.unwrap_or_else(|| Address {
node: our_node.clone(),
process: km.source.process.clone(),
});
KernelMessage {
id: km.id,
source: Address {
node: our_node,
process: ETH_PROCESS_ID.clone(),
},
target: source,
rsvp: None,
message: Message::Response((
Response {
inherit: false,
body: serde_json::to_vec(&EthResponse::Err(error)).unwrap(),
metadata: None,
capabilities: vec![],
},
None,
)),
lazy_load_blob: None,
}
} }

View File

@ -1,8 +1,17 @@
use alloy_rpc_types::pubsub::{Params, SubscriptionKind, SubscriptionResult}; use alloy_rpc_types::pubsub::{Params, SubscriptionKind, SubscriptionResult};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// The Request type that can be made to eth:distro:sys. Currently primitive, this /// The Message type that can be made to eth:distro:sys. The id is used to match the response,
/// enum will expand to support more actions in the future. /// if you're not doing send_and_await.
///
/// Will be serialized and deserialized using `serde_json::to_vec` and `serde_json::from_slice`.
#[derive(Debug, Serialize, Deserialize)]
pub struct EthMessage {
pub id: u64,
pub action: EthAction,
}
/// The Action and Request type that can be made to eth:distro:sys.
/// ///
/// Will be serialized and deserialized using `serde_json::to_vec` and `serde_json::from_slice`. /// Will be serialized and deserialized using `serde_json::to_vec` and `serde_json::from_slice`.
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -10,38 +19,44 @@ pub enum EthAction {
/// Subscribe to logs with a custom filter. ID is to be used to unsubscribe. /// Subscribe to logs with a custom filter. ID is to be used to unsubscribe.
/// Logs come in as alloy_rpc_types::pubsub::SubscriptionResults /// Logs come in as alloy_rpc_types::pubsub::SubscriptionResults
SubscribeLogs { SubscribeLogs {
sub_id: u64,
kind: SubscriptionKind, kind: SubscriptionKind,
params: Params, params: Params,
}, },
/// Kill a SubscribeLogs subscription of a given ID, to stop getting updates. /// Kill a SubscribeLogs subscription of a given ID, to stop getting updates.
UnsubscribeLogs(u64), UnsubscribeLogs,
/// Raw request. Used by kinode_process_lib. /// Raw request. Used by kinode_process_lib.
Request { Request {
method: String, method: String,
params: serde_json::Value, params: serde_json::Value,
}, },
/// Incoming subscription update.
Sub { result: SubscriptionResult },
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum EthResponse { pub enum EthResponse {
Ok, Ok,
Request(serde_json::Value), Response { value: serde_json::Value },
Err(EthError), Err(EthError),
Sub { id: u64, result: SubscriptionResult },
} }
/// The Response type which a process will get from requesting with an [`EthAction`] will be /// The Response type which a process will get from requesting with an [`EthMessage`] will be
/// of the form `Result<(), EthError>`, serialized and deserialized using `serde_json::to_vec` /// of the form `Result<(), EthError>`, serialized and deserialized using `serde_json::to_vec`
/// and `serde_json::from_slice`. /// and `serde_json::from_slice`.
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum EthError { pub enum EthError {
/// The ethers provider threw an error when trying to subscribe /// Underlying transport error
/// (contains ProviderError serialized to debug string) TransportError(String),
ProviderError(String), /// Subscription closed
SubscriptionClosed, SubscriptionClosed(u64),
/// The subscription ID was not found, so we couldn't unsubscribe. /// The subscription ID was not found, so we couldn't unsubscribe.
SubscriptionNotFound, SubscriptionNotFound,
/// Invalid method
InvalidMethod(String),
/// Permission denied
PermissionDenied(String),
/// Internal RPC error
RpcError(String),
} }
// //
@ -49,7 +64,7 @@ pub enum EthError {
// //
/// For static lifetimes of method strings. /// For static lifetimes of method strings.
/// Hopefully replaced asap by alloy-rs network abstraction. /// Replaced soon by alloy-rs network abstraction.
pub fn to_static_str(method: &str) -> Option<&'static str> { pub fn to_static_str(method: &str) -> Option<&'static str> {
match method { match method {
"eth_getBalance" => Some("eth_getBalance"), "eth_getBalance" => Some("eth_getBalance"),