diff --git a/kinode/packages/app_store/app_store/Cargo.toml b/kinode/packages/app_store/app_store/Cargo.toml index d8e4d866..0fe63e6c 100644 --- a/kinode/packages/app_store/app_store/Cargo.toml +++ b/kinode/packages/app_store/app_store/Cargo.toml @@ -9,7 +9,7 @@ alloy-primitives = "0.6.2" alloy-sol-types = "0.6.2" anyhow = "1.0" bincode = "1.3.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "9231881" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "ad17eaa" } rand = "0.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/kinode/packages/app_store/app_store/src/lib.rs b/kinode/packages/app_store/app_store/src/lib.rs index 287f8681..e773004c 100644 --- a/kinode/packages/app_store/app_store/src/lib.rs +++ b/kinode/packages/app_store/app_store/src/lib.rs @@ -1,5 +1,6 @@ use kinode_process_lib::eth::{ - get_logs, subscribe, unsubscribe, Address as EthAddress, EthSub, Filter, SubscriptionResult, + get_logs, subscribe, unsubscribe, Address as EthAddress, EthSub, EthSubResult, Filter, + SubscriptionResult, }; use kinode_process_lib::http::{bind_http_path, serve_ui, HttpServerRequest}; use kinode_process_lib::kernel_types as kt; @@ -58,7 +59,7 @@ pub enum Req { RemoteRequest(RemoteRequest), FTWorkerCommand(FTWorkerCommand), FTWorkerResult(FTWorkerResult), - Eth(EthSub), + Eth(EthSubResult), Http(HttpServerRequest), } @@ -184,11 +185,13 @@ fn handle_message( Req::FTWorkerResult(r) => { println!("app store: got weird ft_worker result: {r:?}"); } - Req::Eth(sub) => { + Req::Eth(eth_result) => { if source.node() != our.node() || source.process != "eth:distro:sys" { return Err(anyhow::anyhow!("eth sub event from weird addr: {source}")); } - handle_eth_sub_event(our, &mut state, sub.result)?; + if let Ok(EthSub { result, .. }) = eth_result { + handle_eth_sub_event(our, &mut state, result)?; + } } Req::Http(incoming) => { if source.node() != our.node() diff --git a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml index 664b7df3..ee9f411b 100644 --- a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml +++ b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml @@ -10,7 +10,7 @@ alloy-primitives = "0.6.2" alloy-sol-types = "0.6.2" bincode = "1.3.3" hex = "0.4.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "9231881" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "ad17eaa" } rmp-serde = "1.1.2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 19898f86..ca98d3fa 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -4,7 +4,7 @@ use kinode_process_lib::{ await_message, eth::{ get_block_number, get_logs, subscribe, Address as EthAddress, BlockNumberOrTag, EthSub, - Filter, Log, SubscriptionResult, + EthSubResult, Filter, Log, SubscriptionResult, }, get_typed_state, print_to_terminal, println, set_state, Address, Message, Request, Response, }; @@ -253,12 +253,14 @@ fn handle_eth_message( pending_requests: &mut BTreeMap>, body: &[u8], ) -> anyhow::Result<()> { - let Ok(res) = serde_json::from_slice::(body) else { + let Ok(eth_result) = serde_json::from_slice::(body) else { return Err(anyhow::anyhow!("kns_indexer: got invalid message")); }; - if let SubscriptionResult::Log(log) = res.result { - handle_log(our, state, &log)?; + if let Ok(EthSub { result, .. }) = eth_result { + if let SubscriptionResult::Log(log) = result { + handle_log(our, state, &log)?; + } } // check the pending_requests btreemap to see if there are any requests that diff --git a/kinode/src/eth/provider.rs b/kinode/src/eth/provider.rs index eb58f6f6..4c56e761 100644 --- a/kinode/src/eth/provider.rs +++ b/kinode/src/eth/provider.rs @@ -243,28 +243,29 @@ async fn handle_local_request( EthResponse::Response { value: response } } }; - - let response = KernelMessage { - id: km.id, - source: Address { - node: our.to_string(), - process: ETH_PROCESS_ID.clone(), - }, - target: km.source.clone(), - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - body: serde_json::to_vec(&return_body).unwrap(), - metadata: req.metadata.clone(), - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - }; - - let _ = send_to_loop.send(response).await; + if let Some(_) = req.expects_response { + let _ = send_to_loop + .send(KernelMessage { + id: km.id, + source: Address { + node: our.to_string(), + process: ETH_PROCESS_ID.clone(), + }, + target: km.source.clone(), + rsvp: km.rsvp.clone(), + message: Message::Response(( + Response { + inherit: false, + body: serde_json::to_vec(&return_body).unwrap(), + metadata: req.metadata.clone(), + capabilities: vec![], + }, + None, + )), + lazy_load_blob: None, + }) + .await; + } Ok(()) } @@ -407,10 +408,33 @@ async fn handle_subscription_stream( ) -> Result<(), EthError> { match rx.recv().await { Err(e) => { - return Err(EthError::SubscriptionClosed(sub_id))?; + let error = Err(EthError::SubscriptionClosed(sub_id))?; + let _ = send_to_loop + .send(KernelMessage { + id: rand::random(), + source: Address { + node: our, + process: ETH_PROCESS_ID.clone(), + }, + target: target.clone(), + rsvp: rsvp.clone(), + message: Message::Request(Request { + inherit: false, + expects_response: None, + body: serde_json::to_vec(&EthSubResult::Err(EthSubError { + id: sub_id, + error: e.to_string(), + })) + .unwrap(), + metadata: None, + capabilities: vec![], + }), + lazy_load_blob: None, + }) + .await + .unwrap(); } Ok(value) => { - // this should not return in case of one failed event? let event: SubscriptionResult = serde_json::from_str(value.get()).map_err(|_| { EthError::RpcError("eth: failed to deserialize subscription result".to_string()) })?; @@ -426,10 +450,10 @@ async fn handle_subscription_stream( message: Message::Request(Request { inherit: false, expects_response: None, - body: serde_json::to_vec(&EthSub { + body: serde_json::to_vec(&EthSubResult::Ok(EthSub { id: sub_id, result: event, - }) + })) .unwrap(), metadata: None, capabilities: vec![], @@ -443,7 +467,6 @@ async fn handle_subscription_stream( Err(EthError::SubscriptionClosed(sub_id)) } -// todo, always send errors or no? general runtime question for other modules too. fn make_error_message(our_node: String, km: KernelMessage, error: EthError) -> KernelMessage { let source = km.rsvp.unwrap_or_else(|| Address { node: our_node.clone(), diff --git a/lib/src/eth.rs b/lib/src/eth.rs index a000637f..e36564b9 100644 --- a/lib/src/eth.rs +++ b/lib/src/eth.rs @@ -22,13 +22,24 @@ pub enum EthAction { }, } -/// Incoming Request for subscription updates that processes will receive. +/// Incoming Result type for subscription updates or errors that processes will receive. +pub type EthSubResult = Result; + +/// Incoming Request type for subscription updates. #[derive(Debug, Serialize, Deserialize)] pub struct EthSub { pub id: u64, pub result: SubscriptionResult, } +/// Incoming Request for subscription errors that processes will receive. +/// If your subscription is closed unexpectedly, you will receive this. +#[derive(Debug, Serialize, Deserialize)] +pub struct EthSubError { + pub id: u64, + pub error: String, +} + /// The Response type which a process will get from requesting with an [`EthAction`] will be /// of the form `Result<(), EthError>`, serialized and deserialized using `serde_json::to_vec` /// and `serde_json::from_slice`.