diff --git a/Cargo.lock b/Cargo.lock index affc5680..4a1a4d3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -407,7 +407,7 @@ dependencies = [ "alloy-sol-types", "anyhow", "bincode", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=3232423)", + "kinode_process_lib 0.6.0", "rand 0.8.5", "serde", "serde_json", @@ -2631,6 +2631,26 @@ dependencies = [ "lib", ] +[[package]] +name = "kinode_process_lib" +version = "0.6.0" +dependencies = [ + "alloy-json-rpc", + "alloy-primitives", + "alloy-rpc-types", + "alloy-transport", + "anyhow", + "bincode", + "http 1.0.0", + "mime_guess", + "rand 0.8.5", + "serde", + "serde_json", + "thiserror", + "url", + "wit-bindgen", +] + [[package]] name = "kinode_process_lib" version = "0.6.0" @@ -2724,7 +2744,7 @@ dependencies = [ "anyhow", "bincode", "hex", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=3232423)", + "kinode_process_lib 0.6.0", "rmp-serde", "serde", "serde_json", diff --git a/kinode/packages/app_store/app_store/Cargo.toml b/kinode/packages/app_store/app_store/Cargo.toml index f34a470c..e6a5b871 100644 --- a/kinode/packages/app_store/app_store/Cargo.toml +++ b/kinode/packages/app_store/app_store/Cargo.toml @@ -9,7 +9,8 @@ alloy-primitives = "0.6.2" alloy-sol-types = "0.6.2" anyhow = "1.0" bincode = "1.3.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "3232423" } +# kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "3232423" } +kinode_process_lib = { path = "../../../../../process_lib" } rand = "0.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml index 285e7fd4..2c269326 100644 --- a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml +++ b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml @@ -10,7 +10,8 @@ alloy-primitives = "0.6.2" alloy-sol-types = "0.6.2" bincode = "1.3.3" hex = "0.4.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "3232423" } +# kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "3232423" } +kinode_process_lib = { path = "../../../../../process_lib" } rmp-serde = "1.1.2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index fdd67343..ccb8f311 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -130,7 +130,7 @@ impl Guest for Component { fn main(our: Address, mut state: State) -> anyhow::Result<()> { // first, await a message from the kernel which will contain the - // contract address for the KNS version we want to track. + // chain ID and contract address for the KNS version we want to track. let mut contract_address: Option = None; loop { let Ok(Message::Request { source, body, .. }) = await_message() else { diff --git a/kinode/src/eth/provider.rs b/kinode/src/eth/provider.rs index d1272052..55fe58b8 100644 --- a/kinode/src/eth/provider.rs +++ b/kinode/src/eth/provider.rs @@ -5,6 +5,7 @@ use alloy_rpc_types::pubsub::SubscriptionResult; use alloy_transport_ws::WsConnect; use anyhow::Result; use dashmap::DashMap; +use futures::Future; use lib::types::core::*; use lib::types::eth::*; use serde::{Deserialize, Serialize}; @@ -17,17 +18,20 @@ use url::Url; /// mapping of chain id to ordered lists of providers type Providers = Arc>; +#[derive(Debug)] struct ActiveProviders { pub urls: Vec, pub nodes: Vec, } +#[derive(Debug)] struct UrlProvider { pub trusted: bool, pub url: String, pub pubsub: Option>, } +#[derive(Debug)] struct NodeProvider { pub trusted: bool, pub name: String, @@ -36,6 +40,7 @@ struct NodeProvider { /// existing subscriptions held by local processes type ActiveSubscriptions = Arc>>; +#[derive(Debug)] enum ActiveSub { Local(JoinHandle<()>), Remote(String), // name of node providing this subscription for us @@ -112,6 +117,8 @@ pub async fn provider( ap.add_provider_config(entry); } + println!("providers: {providers:?}\r"); + // handles of longrunning subscriptions. let mut active_subscriptions: ActiveSubscriptions = Arc::new(DashMap::new()); @@ -149,6 +156,7 @@ async fn handle_message( providers: &mut Providers, active_subscriptions: &mut ActiveSubscriptions, ) -> Result<(), EthError> { + println!("provider: handle_message\r"); match &km.message { Message::Response(_) => handle_passthrough_response(our, send_to_loop, km).await, Message::Request(req) => { @@ -187,6 +195,7 @@ async fn handle_passthrough_response( send_to_loop: &MessageSender, km: KernelMessage, ) -> Result<(), EthError> { + println!("provider: handle_passthrough_response\r"); send_to_loop .send(KernelMessage { id: rand::random(), @@ -213,6 +222,7 @@ async fn handle_eth_action( providers: &mut Providers, active_subscriptions: &mut ActiveSubscriptions, ) -> Result<(), EthError> { + println!("provider: handle_eth_action\r"); // check our access settings if the request is from a remote node if km.source.node != our { if !access_settings.deny.contains(&km.source.node) { @@ -245,6 +255,7 @@ async fn handle_eth_action( send_to_loop.clone(), eth_action, providers.clone(), + active_subscriptions.clone(), ))); let mut subs = active_subscriptions .entry(km.source.process) @@ -279,6 +290,7 @@ async fn handle_eth_action( } /// spawned as a task +/// cleans itself up when the subscription is closed or fails. async fn create_new_subscription( our: String, km_id: u64, @@ -287,7 +299,76 @@ async fn create_new_subscription( send_to_loop: MessageSender, eth_action: EthAction, providers: Providers, + active_subscriptions: ActiveSubscriptions, ) { + println!("provider: create_new_subscription\r"); + match build_subscription( + our.clone(), + km_id, + target.clone(), + rsvp.clone(), + send_to_loop.clone(), + ð_action, + providers, + ) + .await + { + Ok(future) => { + // send a response to the target that the subscription was successful + send_to_loop + .send(KernelMessage { + id: km_id, + source: Address { + node: our.to_string(), + process: ETH_PROCESS_ID.clone(), + }, + target: target.clone(), + rsvp: rsvp.clone(), + message: Message::Response(( + Response { + inherit: false, + body: serde_json::to_vec(&EthResponse::Ok).unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + )), + lazy_load_blob: None, + }) + .await + .expect("eth: sender died!"); + // await the subscription error and kill it if so + if let Err(e) = future.await { + send_to_loop + .send(make_error_message(&our, km_id, target.clone(), e)) + .await + .expect("eth: kernel sender died!"); + } + } + Err(e) => { + send_to_loop + .send(make_error_message(&our, km_id, target.clone(), e)) + .await + .expect("eth: kernel sender died!"); + } + } + active_subscriptions + .entry(target.process) + .and_modify(|sub_map| { + sub_map.remove(&km_id); + }); +} + +async fn build_subscription( + our: String, + km_id: u64, + target: Address, + rsvp: Option
, + send_to_loop: MessageSender, + eth_action: &EthAction, + providers: Providers, +) -> Result>, EthError> { + println!("provider: build_subscription\r"); let EthAction::SubscribeLogs { sub_id, chain_id, @@ -295,19 +376,12 @@ async fn create_new_subscription( params, } = eth_action else { - return; + return Err(EthError::InvalidMethod( + "eth: only accepts subscribe logs requests".to_string(), + )); }; let Some(aps) = providers.get_mut(&chain_id) else { - send_to_loop - .send(make_error_message( - &our, - sub_id, - target, - EthError::NoRpcForChain, - )) - .await - .expect("eth: kernel sender died!"); - return; + return Err(EthError::NoRpcForChain); }; // first, try any url providers we have for this chain, // then if we have none or they all fail, go to node providers. @@ -322,31 +396,21 @@ async fn create_new_subscription( .await { let rx = pubsub.inner().get_raw_subscription(id).await; - if let Err(e) = - handle_subscription_stream(&our, sub_id, rx, &target, &rsvp, &send_to_loop) - .await - { - send_to_loop - .send(make_error_message(&our, sub_id, target, e)) - .await - .expect("eth: kernel sender died!"); - } - return; + return Ok(maintain_subscription( + our, + *sub_id, + rx, + target, + rsvp, + send_to_loop, + )); } } } for node_provider in &aps.nodes { // todo } - send_to_loop - .send(make_error_message( - &our, - sub_id, - target, - EthError::NoRpcForChain, - )) - .await - .expect("eth: kernel sender died!"); + return Err(EthError::NoRpcForChain); } async fn handle_eth_config_action( @@ -357,6 +421,7 @@ async fn handle_eth_config_action( eth_config_action: EthConfigAction, providers: &mut Providers, ) -> Result<(), EthError> { + println!("provider: handle_eth_config_action\r"); if km.source.node != our { return Err(EthError::PermissionDenied); } @@ -611,14 +676,15 @@ async fn handle_eth_config_action( /// Executed as a long-lived task. The JoinHandle is stored in the `connections` map. /// This task is responsible for connecting to the ETH RPC provider and streaming logs /// for a specific subscription made by a process. -async fn handle_subscription_stream( - our: &str, +async fn maintain_subscription( + our: String, sub_id: u64, mut rx: RawSubscription, - target: &Address, - rsvp: &Option
, - send_to_loop: &MessageSender, + target: Address, + rsvp: Option
, + send_to_loop: MessageSender, ) -> Result<(), EthError> { + println!("provider: maintain_subscription\r"); loop { match rx.recv().await { Err(e) => { @@ -660,9 +726,10 @@ async fn handle_subscription_stream( } } -fn make_error_message(our: &str, id: u64, target: Address, error: EthError) -> KernelMessage { +fn make_error_message(our: &str, km_id: u64, target: Address, error: EthError) -> KernelMessage { + println!("provider: make_error_message\r"); KernelMessage { - id, + id: km_id, source: Address { node: our.to_string(), process: ETH_PROCESS_ID.clone(), diff --git a/kinode/src/main.rs b/kinode/src/main.rs index c4337d62..6d8c988c 100644 --- a/kinode/src/main.rs +++ b/kinode/src/main.rs @@ -548,7 +548,6 @@ async fn main() { timer_service_receiver, print_sender.clone(), )); - #[cfg(not(feature = "simulation-mode"))] tasks.spawn(eth::provider::provider( our.name.clone(), eth_provider_config, @@ -557,17 +556,6 @@ async fn main() { caps_oracle_sender.clone(), print_sender.clone(), )); - #[cfg(feature = "simulation-mode")] - if let Some(ref rpc_url) = rpc_url { - tasks.spawn(eth::provider::provider( - our.name.clone(), - eth_provider, - public, - kernel_message_sender.clone(), - eth_provider_receiver, - print_sender.clone(), - )); - } tasks.spawn(vfs::vfs( our.name.clone(), kernel_message_sender.clone(), diff --git a/kinode/src/state.rs b/kinode/src/state.rs index f7f1992a..95ecc1b6 100644 --- a/kinode/src/state.rs +++ b/kinode/src/state.rs @@ -391,11 +391,11 @@ async fn bootstrap( for (package_metadata, mut package) in packages.clone() { let package_name = package_metadata.properties.package_name.as_str(); - // special case tester: only load it in if in simulation mode - if package_name == "tester" { - #[cfg(not(feature = "simulation-mode"))] - continue; - } + // // special case tester: only load it in if in simulation mode + // if package_name == "tester" { + // #[cfg(not(feature = "simulation-mode"))] + // continue; + // } println!("fs: handling package {package_name}...\r"); let package_publisher = package_metadata.properties.publisher.as_str();