diff --git a/Cargo.lock b/Cargo.lock index d78bec65..dfc45e4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -484,7 +484,7 @@ dependencies = [ "alloy-sol-types 0.5.4", "anyhow", "bincode", - "kinode_process_lib 0.5.9 (git+https://github.com/kinode-dao/process_lib?tag=v0.5.9-alpha)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=1e14a03)", "rand 0.8.5", "serde", "serde_json", @@ -3398,10 +3398,8 @@ name = "kinode_process_lib" version = "0.5.9" source = "git+https://github.com/kinode-dao/process_lib?tag=v0.5.9-alpha#5e705086bbd10fde89e11d3e3671f6a618a875a7" dependencies = [ - "alloy-rpc-types 0.1.0 (git+https://github.com/alloy-rs/alloy.git?rev=3b1c310)", "anyhow", "bincode", - "ethers-core", "http 1.0.0", "mime_guess", "rand 0.8.5", @@ -3449,7 +3447,7 @@ dependencies = [ [[package]] name = "kinode_process_lib" version = "0.6.0" -source = "git+https://github.com/kinode-dao/process_lib?rev=8d58cfb#8d58cfba0302681b6971cac26ea3f7e49d4602ec" +source = "git+https://github.com/kinode-dao/process_lib?rev=1e14a03#1e14a03e3e274f0bc7f16a08d81c7583589b12be" dependencies = [ "alloy-json-rpc", "alloy-primitives 0.6.2", @@ -3500,12 +3498,11 @@ name = "kns_indexer" version = "0.2.0" dependencies = [ "alloy-primitives 0.6.2", - "alloy-rpc-types 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=098ad56)", "alloy-sol-types 0.6.2", "anyhow", "bincode", "hex", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=8d58cfb)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=1e14a03)", "rmp-serde", "serde", "serde_json", diff --git a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml index f7bfb61b..ccc552c5 100644 --- a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml +++ b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml @@ -7,11 +7,10 @@ edition = "2021" [dependencies] anyhow = "1.0" alloy-primitives = "0.6.2" -alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56"} alloy-sol-types = "0.6.2" bincode = "1.3.3" hex = "0.4.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "8d58cfb" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "1e14a03" } rmp-serde = "1.1.2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 9dbf882f..a8b37091 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -1,24 +1,19 @@ -use alloy_primitives::Address as EthAddress; -use alloy_rpc_types::{ - pubsub::{Params, SubscriptionKind, SubscriptionResult}, - BlockNumberOrTag, Filter, Log, -}; use alloy_sol_types::{sol, SolEvent}; use kinode_process_lib::{ await_message, - eth::{get_block_number, get_logs, EthResponse}, + eth::{ + get_block_number, get_logs, Address as EthAddress, BlockNumberOrTag, EthAction, EthMessage, + EthResponse, Filter, Log, Params, SubscriptionKind, SubscriptionResult, + }, get_typed_state, print_to_terminal, println, set_state, Address, Message, Request, Response, }; use serde::{Deserialize, Serialize}; -use std::string::FromUtf8Error; -use std::{ - collections::{ - hash_map::{Entry, HashMap}, - BTreeMap, - }, - str::FromStr, +use std::collections::{ + hash_map::{Entry, HashMap}, + BTreeMap, }; +use std::string::FromUtf8Error; wit_bindgen::generate!({ path: "wit", @@ -28,23 +23,6 @@ wit_bindgen::generate!({ }, }); -//TEMP -#[derive(Debug, Serialize, Deserialize)] -pub enum EthAction { - /// Subscribe to logs with a custom filter. ID is to be used to unsubscribe. - SubscribeLogs { - sub_id: u64, - kind: SubscriptionKind, - params: Params, - }, - /// Kill a SubscribeLogs subscription of a given ID, to stop getting updates. - UnsubscribeLogs(u64), - Request { - method: String, - params: serde_json::Value, - }, -} - #[derive(Clone, Debug, Serialize, Deserialize)] struct State { // what contract this state pertains to @@ -221,10 +199,9 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { Request::new() .target((&our.node, "eth", "distro", "sys")) - .body(serde_json::to_vec(&EthAction::SubscribeLogs { - sub_id: 8, - kind, - params, + .body(serde_json::to_vec(&EthMessage { + id: 8, + action: EthAction::SubscribeLogs { kind, params }, })?) .send()?; @@ -285,17 +262,16 @@ fn handle_eth_message( pending_requests: &mut BTreeMap>, body: &[u8], ) -> anyhow::Result<()> { - let Ok(res) = serde_json::from_slice::(body) else { + let Ok(res) = serde_json::from_slice::(body) else { return Err(anyhow::anyhow!("kns_indexer: got invalid message")); }; - match res { - EthResponse::Sub { id, result } => match result { - SubscriptionResult::Log(log) => { + match res.action { + EthAction::Sub { result } => { + if let SubscriptionResult::Log(log) = result { handle_log(our, state, &log)?; } - _ => {} - }, + } _ => {} } diff --git a/kinode/src/eth/provider.rs b/kinode/src/eth/provider.rs index 6c6a8cb0..8f8639de 100644 --- a/kinode/src/eth/provider.rs +++ b/kinode/src/eth/provider.rs @@ -87,8 +87,10 @@ pub async fn provider( ) .await { - println!("got error: {:?}", e); - } + let _ = send_to_loop + .send(make_error_message(our.to_string(), km, e)) + .await; + }; }); } Err(anyhow::anyhow!("eth: fatal: message receiver closed!")) @@ -104,30 +106,24 @@ async fn handle_request( public: Arc, ) -> Result<(), EthError> { let Message::Request(req) = &km.message else { - return Err(EthError::ProviderError( + return Err(EthError::InvalidMethod( "eth: only accepts requests".to_string(), )); }; if let Some(provider) = provider.as_ref() { - let action = serde_json::from_slice::(&req.body).map_err(|e| { - EthError::ProviderError(format!("eth: failed to deserialize request: {:?}", e)) + let ethmsg = serde_json::from_slice::(&req.body).map_err(|e| { + EthError::InvalidMethod(format!("eth: failed to deserialize request: {:?}", e)) })?; if !*public && km.source.node != our { - return Err(EthError::ProviderError( - "eth: only accepts requests from apps".to_string(), - )); + return Err(EthError::PermissionDenied("not on the list.".to_string())); } // we might want some of these in payloads.. sub items? - let return_body: EthResponse = match action { - EthAction::SubscribeLogs { - sub_id, - kind, - params, - } => { - let sub_id = (km.target.process.clone(), sub_id); + let return_body: EthResponse = match ethmsg.action { + EthAction::SubscribeLogs { kind, params } => { + let sub_id = (km.target.process.clone(), ethmsg.id); let kind = serde_json::to_value(&kind).unwrap(); let params = serde_json::to_value(¶ms).unwrap(); @@ -136,7 +132,7 @@ async fn handle_request( .inner() .prepare("eth_subscribe", [kind, params]) .await - .unwrap(); + .map_err(|e| EthError::TransportError(e.to_string()))?; let target = km.rsvp.clone().unwrap_or_else(|| Address { node: our.to_string(), @@ -155,8 +151,8 @@ async fn handle_request( connections.insert(sub_id, handle); EthResponse::Ok } - EthAction::UnsubscribeLogs(sub_id) => { - let sub_id = (km.target.process.clone(), sub_id); + EthAction::UnsubscribeLogs => { + let sub_id = (km.target.process.clone(), ethmsg.id); let handle = connections .remove(&sub_id) .ok_or(EthError::SubscriptionNotFound)?; @@ -165,16 +161,20 @@ async fn handle_request( EthResponse::Ok } EthAction::Request { method, params } => { - let method = to_static_str(&method).ok_or(EthError::ProviderError(format!( - "eth: method not found: {}", - method - )))?; + let method = to_static_str(&method).ok_or(EthError::InvalidMethod(method))?; - // throw transportErrorKinds straight back to process - let response: serde_json::Value = - provider.inner().prepare(method, params).await.unwrap(); + let response: serde_json::Value = provider + .inner() + .prepare(method, params) + .await + .map_err(|e| EthError::TransportError(e.to_string()))?; - EthResponse::Request(response) + EthResponse::Response { value: response } + } + EthAction::Sub { .. } => { + return Err(EthError::InvalidMethod( + "eth: provider doesn't accept sub resultss".to_string(), + )) } }; @@ -247,11 +247,12 @@ async fn handle_subscription_stream( Err(e) => { println!("got an error from the subscription stream: {:?}", e); // TODO should we stop the subscription here? - // return Err(EthError::ProviderError(format!("{:?}", e))); + // return Err(EthError::TransportError??(format!("{:?}", e))); } Ok(value) => { - let event: SubscriptionResult = serde_json::from_str(value.get()) - .map_err(|e| EthError::ProviderError(format!("{:?}", e)))?; + let event: SubscriptionResult = serde_json::from_str(value.get()).map_err(|_| { + EthError::RpcError("eth: failed to deserialize subscription result".to_string()) + })?; send_to_loop .send(KernelMessage { id: rand::random(), @@ -264,9 +265,9 @@ async fn handle_subscription_stream( message: Message::Request(Request { inherit: false, expects_response: None, - body: serde_json::to_vec(&EthResponse::Sub { + body: serde_json::to_vec(&EthMessage { id: sub_id, - result: event, + action: EthAction::Sub { result: event }, }) .unwrap(), metadata: None, @@ -278,5 +279,32 @@ async fn handle_subscription_stream( .unwrap(); } } - Err(EthError::SubscriptionClosed) + Err(EthError::SubscriptionClosed(sub_id)) +} + +// todo, always send errors or no? general runtime question for other modules too. +fn make_error_message(our_node: String, km: KernelMessage, error: EthError) -> KernelMessage { + let source = km.rsvp.unwrap_or_else(|| Address { + node: our_node.clone(), + process: km.source.process.clone(), + }); + KernelMessage { + id: km.id, + source: Address { + node: our_node, + process: ETH_PROCESS_ID.clone(), + }, + target: source, + rsvp: None, + message: Message::Response(( + Response { + inherit: false, + body: serde_json::to_vec(&EthResponse::Err(error)).unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + )), + lazy_load_blob: None, + } } diff --git a/lib/src/eth.rs b/lib/src/eth.rs index d70fa737..696af2e9 100644 --- a/lib/src/eth.rs +++ b/lib/src/eth.rs @@ -1,8 +1,17 @@ use alloy_rpc_types::pubsub::{Params, SubscriptionKind, SubscriptionResult}; use serde::{Deserialize, Serialize}; -/// The Request type that can be made to eth:distro:sys. Currently primitive, this -/// enum will expand to support more actions in the future. +/// The Message type that can be made to eth:distro:sys. The id is used to match the response, +/// if you're not doing send_and_await. +/// +/// Will be serialized and deserialized using `serde_json::to_vec` and `serde_json::from_slice`. +#[derive(Debug, Serialize, Deserialize)] +pub struct EthMessage { + pub id: u64, + pub action: EthAction, +} + +/// The Action and Request type that can be made to eth:distro:sys. /// /// Will be serialized and deserialized using `serde_json::to_vec` and `serde_json::from_slice`. #[derive(Debug, Serialize, Deserialize)] @@ -10,38 +19,44 @@ pub enum EthAction { /// Subscribe to logs with a custom filter. ID is to be used to unsubscribe. /// Logs come in as alloy_rpc_types::pubsub::SubscriptionResults SubscribeLogs { - sub_id: u64, kind: SubscriptionKind, params: Params, }, /// Kill a SubscribeLogs subscription of a given ID, to stop getting updates. - UnsubscribeLogs(u64), + UnsubscribeLogs, /// Raw request. Used by kinode_process_lib. Request { method: String, params: serde_json::Value, }, + /// Incoming subscription update. + Sub { result: SubscriptionResult }, } #[derive(Debug, Serialize, Deserialize)] pub enum EthResponse { Ok, - Request(serde_json::Value), + Response { value: serde_json::Value }, Err(EthError), - Sub { id: u64, result: SubscriptionResult }, } -/// The Response type which a process will get from requesting with an [`EthAction`] will be +/// The Response type which a process will get from requesting with an [`EthMessage`] will be /// of the form `Result<(), EthError>`, serialized and deserialized using `serde_json::to_vec` /// and `serde_json::from_slice`. #[derive(Debug, Serialize, Deserialize)] pub enum EthError { - /// The ethers provider threw an error when trying to subscribe - /// (contains ProviderError serialized to debug string) - ProviderError(String), - SubscriptionClosed, + /// Underlying transport error + TransportError(String), + /// Subscription closed + SubscriptionClosed(u64), /// The subscription ID was not found, so we couldn't unsubscribe. SubscriptionNotFound, + /// Invalid method + InvalidMethod(String), + /// Permission denied + PermissionDenied(String), + /// Internal RPC error + RpcError(String), } // @@ -49,7 +64,7 @@ pub enum EthError { // /// For static lifetimes of method strings. -/// Hopefully replaced asap by alloy-rs network abstraction. +/// Replaced soon by alloy-rs network abstraction. pub fn to_static_str(method: &str) -> Option<&'static str> { match method { "eth_getBalance" => Some("eth_getBalance"),