From bf856343af3b46b39f6b6e603f9c07ab6c49aa60 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Mon, 4 Mar 2024 18:59:53 -0300 Subject: [PATCH 1/8] add openssl and libclang notes to setup in readme --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index cb0260a9..c2e5eafb 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,10 @@ If you have questions, join the [Kinode discord](https://discord.gg/TCgdca5Bjt) ### Building components +On certain operating systems, you may need to install these dependencies if they are not already present: +- openssl-sys: https://docs.rs/crate/openssl-sys/0.9.19 +- libclang 5.0: https://rust-lang.github.io/rust-bindgen/requirements.html + ```bash # Clone the repo. From 3a4ea0fd679434617eb468520f147dd1a6293d6b Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Mon, 4 Mar 2024 19:12:56 -0300 Subject: [PATCH 2/8] fix terminal paste bug, some cleanup --- kinode/src/main.rs | 3 +-- kinode/src/terminal/mod.rs | 7 ++----- kinode/src/terminal/utils.rs | 4 ++-- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/kinode/src/main.rs b/kinode/src/main.rs index ffd26d49..6967518c 100644 --- a/kinode/src/main.rs +++ b/kinode/src/main.rs @@ -632,7 +632,6 @@ async fn main() { // abort all remaining tasks tasks.shutdown().await; - //let _ = crossterm::terminal::disable_raw_mode().unwrap(); let stdout = std::io::stdout(); let mut stdout = stdout.lock(); let _ = crossterm::execute!( @@ -640,7 +639,7 @@ async fn main() { crossterm::event::DisableBracketedPaste, crossterm::terminal::SetTitle(""), crossterm::style::SetForegroundColor(crossterm::style::Color::Red), - crossterm::style::Print(format!("\r\n{quit_msg}")), + crossterm::style::Print(format!("\r\n{quit_msg}\r\n")), crossterm::style::ResetColor, ); } diff --git a/kinode/src/terminal/mod.rs b/kinode/src/terminal/mod.rs index 1dc186b1..c2f94330 100644 --- a/kinode/src/terminal/mod.rs +++ b/kinode/src/terminal/mod.rs @@ -29,10 +29,7 @@ impl RawMode { impl Drop for RawMode { fn drop(&mut self) { match disable_raw_mode() { - Ok(_) => { - // let is_enabled = crossterm::terminal::is_raw_mode_enabled(); - // println!("terminal: disabled raw mode successfully: {is_enabled:?}\r"); - } + Ok(_) => {} Err(e) => { println!("terminal: failed to disable raw mode: {e:?}\r"); } @@ -248,7 +245,7 @@ pub async fn terminal( // handle pasting of text from outside Event::Paste(pasted) => { current_line.insert_str(line_col, &pasted); - line_col = current_line.len(); + line_col = line_col + pasted.len(); cursor_col = std::cmp::min(line_col.try_into().unwrap_or(win_cols), win_cols); execute!( stdout, diff --git a/kinode/src/terminal/utils.rs b/kinode/src/terminal/utils.rs index 998971e9..59b4c252 100644 --- a/kinode/src/terminal/utils.rs +++ b/kinode/src/terminal/utils.rs @@ -29,8 +29,8 @@ impl CommandHistory { pub fn add(&mut self, line: String) { self.working_line = None; // only add line to history if it's not exactly the same - // as the previous line - if &line != self.lines.front().unwrap_or(&"".into()) { + // as the previous line and also not an empty line + if &line != self.lines.front().unwrap_or(&"".into()) && line != "" { let _ = writeln!(self.history_writer, "{}", &line); self.lines.push_front(line); } From 38c4495d770699fac3d6f4d7989158c9215bcb8c Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 5 Mar 2024 13:07:27 -0300 Subject: [PATCH 3/8] bugfix: properly set RSVP and sub_id in eth subscription relay --- kinode/src/eth/mod.rs | 6 ++--- kinode/src/eth/subscription.rs | 40 +++++++++++++++++++--------------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index 4e9a3296..f7ed67b9 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -530,6 +530,7 @@ async fn fulfill_request( let response = forward_to_node_provider( our, km_id, + None, node_provider, eth_action.clone(), send_to_loop, @@ -551,6 +552,7 @@ async fn fulfill_request( async fn forward_to_node_provider( our: &str, km_id: u64, + rsvp: Option
, node_provider: &NodeProvider, eth_action: EthAction, send_to_loop: &MessageSender, @@ -559,8 +561,6 @@ async fn forward_to_node_provider( if !node_provider.usable || node_provider.name == our { return EthResponse::Err(EthError::PermissionDenied); } - // in order, forward the request to each node provider - // until one sends back a satisfactory response kernel_message( our, km_id, @@ -568,7 +568,7 @@ async fn forward_to_node_provider( node: node_provider.name.clone(), process: ETH_PROCESS_ID.clone(), }, - None, + rsvp, true, Some(60), // TODO eth_action.clone(), diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 66d22f0b..8529f996 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -48,10 +48,10 @@ pub async fn create_new_subscription( .entry(target.clone()) .or_insert(HashMap::new()); let active_subscriptions = active_subscriptions.clone(); - subs.insert( - sub_id, - match maybe_raw_sub { - Ok(rx) => { + match maybe_raw_sub { + Ok(rx) => { + subs.insert( + sub_id, // this is a local sub, as in, we connect to the rpc endpt ActiveSub::Local(tokio::spawn(async move { // await the subscription error and kill it if so @@ -74,7 +74,7 @@ pub async fn create_new_subscription( &our, rand::random(), target.clone(), - None, + rsvp, true, None, EthSubResult::Err(e), @@ -85,15 +85,18 @@ pub async fn create_new_subscription( sub_map.remove(&km_id); }); } - })) - } - Err((provider_node, remote_sub_id)) => { - // this is a remote sub, given by a relay node - let (sender, rx) = tokio::sync::mpsc::channel(10); - let keepalive_km_id = rand::random(); - let (keepalive_err_sender, keepalive_err_receiver) = - tokio::sync::mpsc::channel(1); - response_channels.insert(keepalive_km_id, keepalive_err_sender); + })), + ); + } + Err((provider_node, remote_sub_id)) => { + // this is a remote sub, given by a relay node + let (sender, rx) = tokio::sync::mpsc::channel(10); + let keepalive_km_id = rand::random(); + let (keepalive_err_sender, keepalive_err_receiver) = + tokio::sync::mpsc::channel(1); + response_channels.insert(keepalive_km_id, keepalive_err_sender); + subs.insert( + remote_sub_id, ActiveSub::Remote { provider_node: provider_node.clone(), handle: tokio::spawn(async move { @@ -133,10 +136,10 @@ pub async fn create_new_subscription( } }), sender, - } - } - }, - ); + }, + ); + } + } } Err(e) => { error_message(&our, km_id, target.clone(), e, &send_to_loop).await; @@ -205,6 +208,7 @@ async fn build_subscription( match forward_to_node_provider( &our, km_id, + Some(target.clone()), node_provider, EthAction::SubscribeLogs { sub_id: remote_sub_id, From 2b2c9a4ac15657583e2ae8c44f02846224433e95 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 5 Mar 2024 13:27:59 -0300 Subject: [PATCH 4/8] bugfix: properly delete relay subs after networking error --- kinode/src/eth/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index f7ed67b9..021c02ad 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -246,7 +246,7 @@ async fn handle_network_error( // if we hold active subscriptions for the remote node that this error refers to, // close them here -- they will need to resubscribe // TODO is this necessary? - if let Some(sub_map) = active_subscriptions.get(&wrapped_error.source) { + if let Some(sub_map) = active_subscriptions.get(&wrapped_error.error.target) { for (_sub_id, sub) in sub_map.iter() { if let ActiveSub::Local(handle) = sub { verbose_print( From ba3374e234038ef711f7b14a6c7f484a019828d9 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 5 Mar 2024 17:10:10 -0300 Subject: [PATCH 5/8] change net api to be response-based instead of prints, add scripts for easy usage from term --- Cargo.lock | 40 ++++ Cargo.toml | 1 + .../kns_indexer/kns_indexer/src/lib.rs | 10 +- .../terminal/namehash_to_name/Cargo.toml | 17 ++ .../terminal/namehash_to_name/src/lib.rs | 80 ++++++++ .../terminal/net_diagnostics/Cargo.toml | 17 ++ .../terminal/net_diagnostics/src/lib.rs | 67 +++++++ kinode/packages/terminal/peer/Cargo.toml | 17 ++ kinode/packages/terminal/peer/src/lib.rs | 83 ++++++++ kinode/packages/terminal/peers/Cargo.toml | 17 ++ kinode/packages/terminal/peers/src/lib.rs | 77 ++++++++ kinode/packages/terminal/pkg/scripts.json | 54 +++++- kinode/packages/terminal/terminal/src/lib.rs | 28 ++- kinode/src/eth/mod.rs | 70 ++++--- kinode/src/kernel/mod.rs | 2 +- kinode/src/net/mod.rs | 177 ++++++++---------- kinode/src/net/utils.rs | 2 +- lib/src/core.rs | 21 ++- lib/src/eth.rs | 9 +- 19 files changed, 647 insertions(+), 142 deletions(-) create mode 100644 kinode/packages/terminal/namehash_to_name/Cargo.toml create mode 100644 kinode/packages/terminal/namehash_to_name/src/lib.rs create mode 100644 kinode/packages/terminal/net_diagnostics/Cargo.toml create mode 100644 kinode/packages/terminal/net_diagnostics/src/lib.rs create mode 100644 kinode/packages/terminal/peer/Cargo.toml create mode 100644 kinode/packages/terminal/peer/src/lib.rs create mode 100644 kinode/packages/terminal/peers/Cargo.toml create mode 100644 kinode/packages/terminal/peers/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 872514bc..ba76a973 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3008,6 +3008,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "namehash_to_name" +version = "0.1.0" +dependencies = [ + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", + "rmp-serde", + "serde", + "wit-bindgen", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -3026,6 +3036,16 @@ dependencies = [ "tempfile", ] +[[package]] +name = "net_diagnostics" +version = "0.1.0" +dependencies = [ + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", + "rmp-serde", + "serde", + "wit-bindgen", +] + [[package]] name = "nibble_vec" version = "0.1.0" @@ -3283,6 +3303,26 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "peer" +version = "0.1.0" +dependencies = [ + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", + "rmp-serde", + "serde", + "wit-bindgen", +] + +[[package]] +name = "peers" +version = "0.1.0" +dependencies = [ + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", + "rmp-serde", + "serde", + "wit-bindgen", +] + [[package]] name = "percent-encoding" version = "2.3.1" diff --git a/Cargo.toml b/Cargo.toml index cbd8822c..c3793117 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "kinode/packages/kns_indexer/kns_indexer", "kinode/packages/kns_indexer/get_block", "kinode/packages/terminal/terminal", "kinode/packages/terminal/alias", "kinode/packages/terminal/cat", "kinode/packages/terminal/echo", "kinode/packages/terminal/hi", "kinode/packages/terminal/m", "kinode/packages/terminal/top", + "kinode/packages/terminal/namehash_to_name", "kinode/packages/terminal/net_diagnostics", "kinode/packages/terminal/peer", "kinode/packages/terminal/peers", "kinode/packages/tester/tester", "kinode/packages/tester/test_runner", ] default-members = ["lib"] diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 16b889d1..3269949e 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -49,12 +49,12 @@ pub enum IndexerRequests { } #[derive(Clone, Debug, Serialize, Deserialize)] -pub enum NetActions { +pub enum NetAction { KnsUpdate(KnsUpdate), KnsBatchUpdate(Vec), } -impl TryInto> for NetActions { +impl TryInto> for NetAction { type Error = anyhow::Error; fn try_into(self) -> Result, Self::Error> { Ok(rmp_serde::to_vec(&self)?) @@ -172,7 +172,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // shove all state into net::net Request::new() .target((&our.node, "net", "distro", "sys")) - .try_body(NetActions::KnsBatchUpdate( + .try_body(NetAction::KnsBatchUpdate( state.nodes.values().cloned().collect::>(), ))? .send()?; @@ -214,7 +214,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // shove all state into net::net Request::new() .target((&our.node, "net", "distro", "sys")) - .try_body(NetActions::KnsBatchUpdate( + .try_body(NetAction::KnsBatchUpdate( state.nodes.values().cloned().collect::>(), ))? .send()?; @@ -403,7 +403,7 @@ fn handle_log(our: &Address, state: &mut State, log: ð::Log) -> anyhow::Resul { Request::new() .target((&our.node, "net", "distro", "sys")) - .try_body(NetActions::KnsUpdate(node.clone()))? + .try_body(NetAction::KnsUpdate(node.clone()))? .send()?; } diff --git a/kinode/packages/terminal/namehash_to_name/Cargo.toml b/kinode/packages/terminal/namehash_to_name/Cargo.toml new file mode 100644 index 00000000..b54bcc3e --- /dev/null +++ b/kinode/packages/terminal/namehash_to_name/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "namehash_to_name" +version = "0.1.0" +edition = "2021" + + +[dependencies] +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } +rmp-serde = "1.1.2" +serde = { version = "1.0", features = ["derive"] } +wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } + +[lib] +crate-type = ["cdylib"] + +[package.metadata.component] +package = "kinode:process" diff --git a/kinode/packages/terminal/namehash_to_name/src/lib.rs b/kinode/packages/terminal/namehash_to_name/src/lib.rs new file mode 100644 index 00000000..90b041fe --- /dev/null +++ b/kinode/packages/terminal/namehash_to_name/src/lib.rs @@ -0,0 +1,80 @@ +use kinode_process_lib::{ + await_next_request_body, call_init, println, Address, Message, NodeId, Request, +}; +use serde::{Deserialize, Serialize}; + +wit_bindgen::generate!({ + path: "wit", + world: "process", + exports: { + world: Component, + }, +}); + +// types copied from runtime networking core + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Identity { + pub name: NodeId, + pub networking_key: String, + pub ws_routing: Option<(String, u16)>, + pub allowed_routers: Vec, +} + +/// Must be parsed from message pack vector. +/// all Get actions must be sent from local process. used for debugging +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetAction { + /// get a list of peers we are connected to + GetPeers, + /// get the [`Identity`] struct for a single peer + GetPeer(String), + /// get the [`NodeId`] associated with a given namehash, if any + GetName(String), + /// get a user-readable diagnostics string containing networking inforamtion + GetDiagnostics, +} + +/// For now, only sent in response to a ConnectionRequest. +/// Must be parsed from message pack vector +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetResponse { + Accepted(NodeId), + Rejected(NodeId), + /// response to [`NetAction::GetPeers`] + Peers(Vec), + /// response to [`NetAction::GetPeer`] + Peer(Option), + /// response to [`NetAction::GetName`] + Name(Option), + /// response to [`NetAction::GetDiagnostics`]. A user-readable string. + Diagnostics(String), +} + +call_init!(init); + +fn init(_our: Address) { + let Ok(args) = await_next_request_body() else { + println!("failed to get args, aborting"); + return; + }; + let Ok(namehash) = String::from_utf8(args) else { + println!("argument must be a string"); + return; + }; + let Ok(Ok(Message::Response { body, .. })) = Request::to(("our", "net", "distro", "sys")) + .body(rmp_serde::to_vec(&NetAction::GetName(namehash.clone())).unwrap()) + .send_and_await_response(5) + else { + println!("failed to get name from networking module"); + return; + }; + let Ok(NetResponse::Name(maybe_name)) = rmp_serde::from_slice(&body) else { + println!("got malformed response from networking module"); + return; + }; + match maybe_name { + Some(name) => println!("{namehash}: {name}"), + None => println!("no name found for {namehash}"), + } +} diff --git a/kinode/packages/terminal/net_diagnostics/Cargo.toml b/kinode/packages/terminal/net_diagnostics/Cargo.toml new file mode 100644 index 00000000..72c8dfd1 --- /dev/null +++ b/kinode/packages/terminal/net_diagnostics/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "net_diagnostics" +version = "0.1.0" +edition = "2021" + + +[dependencies] +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } +rmp-serde = "1.1.2" +serde = { version = "1.0", features = ["derive"] } +wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } + +[lib] +crate-type = ["cdylib"] + +[package.metadata.component] +package = "kinode:process" diff --git a/kinode/packages/terminal/net_diagnostics/src/lib.rs b/kinode/packages/terminal/net_diagnostics/src/lib.rs new file mode 100644 index 00000000..9079fd36 --- /dev/null +++ b/kinode/packages/terminal/net_diagnostics/src/lib.rs @@ -0,0 +1,67 @@ +use kinode_process_lib::{call_init, println, Address, Message, NodeId, Request}; +use serde::{Deserialize, Serialize}; + +wit_bindgen::generate!({ + path: "wit", + world: "process", + exports: { + world: Component, + }, +}); + +// types copied from runtime networking core + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Identity { + pub name: NodeId, + pub networking_key: String, + pub ws_routing: Option<(String, u16)>, + pub allowed_routers: Vec, +} + +/// Must be parsed from message pack vector. +/// all Get actions must be sent from local process. used for debugging +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetAction { + /// get a list of peers we are connected to + GetPeers, + /// get the [`Identity`] struct for a single peer + GetPeer(String), + /// get the [`NodeId`] associated with a given namehash, if any + GetName(String), + /// get a user-readable diagnostics string containing networking inforamtion + GetDiagnostics, +} + +/// For now, only sent in response to a ConnectionRequest. +/// Must be parsed from message pack vector +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetResponse { + Accepted(NodeId), + Rejected(NodeId), + /// response to [`NetAction::GetPeers`] + Peers(Vec), + /// response to [`NetAction::GetPeer`] + Peer(Option), + /// response to [`NetAction::GetName`] + Name(Option), + /// response to [`NetAction::GetDiagnostics`]. A user-readable string. + Diagnostics(String), +} + +call_init!(init); + +fn init(_our: Address) { + let Ok(Ok(Message::Response { body, .. })) = Request::to(("our", "net", "distro", "sys")) + .body(rmp_serde::to_vec(&NetAction::GetDiagnostics).unwrap()) + .send_and_await_response(5) + else { + println!("failed to get diagnostics from networking module"); + return; + }; + let Ok(NetResponse::Diagnostics(printout)) = rmp_serde::from_slice(&body) else { + println!("got malformed response from networking module"); + return; + }; + println!("{printout}"); +} diff --git a/kinode/packages/terminal/peer/Cargo.toml b/kinode/packages/terminal/peer/Cargo.toml new file mode 100644 index 00000000..0446cc12 --- /dev/null +++ b/kinode/packages/terminal/peer/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "peer" +version = "0.1.0" +edition = "2021" + + +[dependencies] +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } +rmp-serde = "1.1.2" +serde = { version = "1.0", features = ["derive"] } +wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } + +[lib] +crate-type = ["cdylib"] + +[package.metadata.component] +package = "kinode:process" diff --git a/kinode/packages/terminal/peer/src/lib.rs b/kinode/packages/terminal/peer/src/lib.rs new file mode 100644 index 00000000..9aa05f43 --- /dev/null +++ b/kinode/packages/terminal/peer/src/lib.rs @@ -0,0 +1,83 @@ +use kinode_process_lib::{ + await_next_request_body, call_init, println, Address, Message, NodeId, Request, +}; +use serde::{Deserialize, Serialize}; + +wit_bindgen::generate!({ + path: "wit", + world: "process", + exports: { + world: Component, + }, +}); + +// types copied from runtime networking core + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Identity { + pub name: NodeId, + pub networking_key: String, + pub ws_routing: Option<(String, u16)>, + pub allowed_routers: Vec, +} + +/// Must be parsed from message pack vector. +/// all Get actions must be sent from local process. used for debugging +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetAction { + /// get a list of peers we are connected to + GetPeers, + /// get the [`Identity`] struct for a single peer + GetPeer(String), + /// get the [`NodeId`] associated with a given namehash, if any + GetName(String), + /// get a user-readable diagnostics string containing networking inforamtion + GetDiagnostics, +} + +/// For now, only sent in response to a ConnectionRequest. +/// Must be parsed from message pack vector +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetResponse { + Accepted(NodeId), + Rejected(NodeId), + /// response to [`NetAction::GetPeers`] + Peers(Vec), + /// response to [`NetAction::GetPeer`] + Peer(Option), + /// response to [`NetAction::GetName`] + Name(Option), + /// response to [`NetAction::GetDiagnostics`]. A user-readable string. + Diagnostics(String), +} + +call_init!(init); + +fn init(_our: Address) { + let Ok(args) = await_next_request_body() else { + println!("failed to get args, aborting"); + return; + }; + let Ok(name) = String::from_utf8(args) else { + println!("argument must be a string"); + return; + }; + let Ok(Ok(Message::Response { body, .. })) = Request::to(("our", "net", "distro", "sys")) + .body(rmp_serde::to_vec(&NetAction::GetPeer(name.clone())).unwrap()) + .send_and_await_response(5) + else { + println!("failed to get response from networking module"); + return; + }; + let Ok(NetResponse::Peer(maybe_peer_id)) = rmp_serde::from_slice(&body) else { + println!("got malformed response from networking module"); + return; + }; + match maybe_peer_id { + Some(peer_id) => println!( + "peer identity for {}:\n networking key: {}\n routing: {:?}\n routers: {:?}", + peer_id.name, peer_id.networking_key, peer_id.ws_routing, peer_id.allowed_routers + ), + None => println!("no PKI entry found with name {name}"), + } +} diff --git a/kinode/packages/terminal/peers/Cargo.toml b/kinode/packages/terminal/peers/Cargo.toml new file mode 100644 index 00000000..26901629 --- /dev/null +++ b/kinode/packages/terminal/peers/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "peers" +version = "0.1.0" +edition = "2021" + + +[dependencies] +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } +rmp-serde = "1.1.2" +serde = { version = "1.0", features = ["derive"] } +wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } + +[lib] +crate-type = ["cdylib"] + +[package.metadata.component] +package = "kinode:process" diff --git a/kinode/packages/terminal/peers/src/lib.rs b/kinode/packages/terminal/peers/src/lib.rs new file mode 100644 index 00000000..bc324e22 --- /dev/null +++ b/kinode/packages/terminal/peers/src/lib.rs @@ -0,0 +1,77 @@ +use kinode_process_lib::{call_init, println, Address, Message, NodeId, Request}; +use serde::{Deserialize, Serialize}; + +wit_bindgen::generate!({ + path: "wit", + world: "process", + exports: { + world: Component, + }, +}); + +// types copied from runtime networking core + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Identity { + pub name: NodeId, + pub networking_key: String, + pub ws_routing: Option<(String, u16)>, + pub allowed_routers: Vec, +} + +/// Must be parsed from message pack vector. +/// all Get actions must be sent from local process. used for debugging +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetAction { + /// get a list of peers we are connected to + GetPeers, + /// get the [`Identity`] struct for a single peer + GetPeer(String), + /// get the [`NodeId`] associated with a given namehash, if any + GetName(String), + /// get a user-readable diagnostics string containing networking inforamtion + GetDiagnostics, +} + +/// For now, only sent in response to a ConnectionRequest. +/// Must be parsed from message pack vector +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetResponse { + Accepted(NodeId), + Rejected(NodeId), + /// response to [`NetAction::GetPeers`] + Peers(Vec), + /// response to [`NetAction::GetPeer`] + Peer(Option), + /// response to [`NetAction::GetName`] + Name(Option), + /// response to [`NetAction::GetDiagnostics`]. A user-readable string. + Diagnostics(String), +} + +call_init!(init); + +fn init(_our: Address) { + let Ok(Ok(Message::Response { body, .. })) = Request::to(("our", "net", "distro", "sys")) + .body(rmp_serde::to_vec(&NetAction::GetPeers).unwrap()) + .send_and_await_response(5) + else { + println!("failed to get peers from networking module"); + return; + }; + let Ok(NetResponse::Peers(identities)) = rmp_serde::from_slice(&body) else { + println!("got malformed response from networking module"); + return; + }; + let identities = identities + .iter() + .map(|peer_id| { + format!( + "{}:\n networking key: {}\n routing: {:?}\n routers: {:?}", + peer_id.name, peer_id.networking_key, peer_id.ws_routing, peer_id.allowed_routers + ) + }) + .collect::>() + .join("\n"); + println!("identities of current connected peers:\n{identities}"); +} diff --git a/kinode/packages/terminal/pkg/scripts.json b/kinode/packages/terminal/pkg/scripts.json index d555c90c..6939c373 100644 --- a/kinode/packages/terminal/pkg/scripts.json +++ b/kinode/packages/terminal/pkg/scripts.json @@ -41,6 +41,55 @@ "net:distro:sys" ] }, + "m.wasm": { + "root": true, + "public": true, + "request_networking": true + }, + "namehash_to_name.wasm": { + "root": false, + "public": false, + "request_networking": false, + "request_capabilities": [ + "net:distro:sys" + ], + "grant_capabilities": [ + "net:distro:sys" + ] + }, + "net_diagnostics.wasm": { + "root": false, + "public": false, + "request_networking": false, + "request_capabilities": [ + "net:distro:sys" + ], + "grant_capabilities": [ + "net:distro:sys" + ] + }, + "peer.wasm": { + "root": false, + "public": false, + "request_networking": false, + "request_capabilities": [ + "net:distro:sys" + ], + "grant_capabilities": [ + "net:distro:sys" + ] + }, + "peers.wasm": { + "root": false, + "public": false, + "request_networking": false, + "request_capabilities": [ + "net:distro:sys" + ], + "grant_capabilities": [ + "net:distro:sys" + ] + }, "top.wasm": { "root": false, "public": false, @@ -49,10 +98,5 @@ "kernel:distro:sys" ], "grant_capabilities": [] - }, - "m.wasm": { - "root": true, - "public": true, - "request_networking": true } } \ No newline at end of file diff --git a/kinode/packages/terminal/terminal/src/lib.rs b/kinode/packages/terminal/terminal/src/lib.rs index cdb3053d..831eaee1 100644 --- a/kinode/packages/terminal/terminal/src/lib.rs +++ b/kinode/packages/terminal/terminal/src/lib.rs @@ -62,27 +62,43 @@ impl Guest for Component { aliases: HashMap::from([ ( "alias".to_string(), - "alias:terminal:sys".parse::().unwrap(), + ProcessId::new(Some("alias"), "terminal", "sys"), ), ( "cat".to_string(), - "cat:terminal:sys".parse::().unwrap(), + ProcessId::new(Some("cat"), "terminal", "sys"), ), ( "echo".to_string(), - "echo:terminal:sys".parse::().unwrap(), + ProcessId::new(Some("echo"), "terminal", "sys"), ), ( "hi".to_string(), - "hi:terminal:sys".parse::().unwrap(), + ProcessId::new(Some("hi"), "terminal", "sys"), ), ( "m".to_string(), - "m:terminal:sys".parse::().unwrap(), + ProcessId::new(Some("m"), "terminal", "sys"), + ), + ( + "namehash_to_name".to_string(), + ProcessId::new(Some("namehash_to_name"), "terminal", "sys"), + ), + ( + "net_diagnostics".to_string(), + ProcessId::new(Some("net_diagnostics"), "terminal", "sys"), + ), + ( + "peer".to_string(), + ProcessId::new(Some("peer"), "terminal", "sys"), + ), + ( + "peers".to_string(), + ProcessId::new(Some("peers"), "terminal", "sys"), ), ( "top".to_string(), - "top:terminal:sys".parse::().unwrap(), + ProcessId::new(Some("top"), "terminal", "sys"), ), ]), }, diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index 021c02ad..d7270f7e 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -142,27 +142,6 @@ struct ModuleState { print_tx: PrintSender, } -async fn activate_url_provider(provider: &mut UrlProvider) -> Result<()> { - match Url::parse(&provider.url)?.scheme() { - "ws" | "wss" => { - let connector = WsConnect { - url: provider.url.to_string(), - auth: None, - }; - let client = tokio::time::timeout( - std::time::Duration::from_secs(10), - ClientBuilder::default().ws(connector), - ) - .await??; - provider.pubsub = Some(Provider::new_with_client(client)); - Ok(()) - } - _ => Err(anyhow::anyhow!( - "Only `ws://` or `wss://` providers are supported." - )), - } -} - /// The ETH provider runtime process is responsible for connecting to one or more ETH RPC providers /// and using them to service indexing requests from other apps. This is the runtime entry point /// for the entire module. @@ -674,10 +653,59 @@ async fn handle_eth_config_action( EthConfigAction::GetAccessSettings => { return EthConfigResponse::AccessSettings(state.access_settings.clone()); } + EthConfigAction::GetState => { + return EthConfigResponse::State { + active_subscriptions: state + .active_subscriptions + .iter() + .map(|e| { + ( + e.key().clone(), + e.value() + .iter() + .map(|(id, sub)| { + ( + *id, + match sub { + ActiveSub::Local(_) => None, + ActiveSub::Remote { provider_node, .. } => { + Some(provider_node.clone()) + } + }, + ) + }) + .collect(), + ) + }) + .collect(), + outstanding_requests: state.response_channels.iter().map(|e| *e.key()).collect(), + }; + } } EthConfigResponse::Ok } +async fn activate_url_provider(provider: &mut UrlProvider) -> Result<()> { + match Url::parse(&provider.url)?.scheme() { + "ws" | "wss" => { + let connector = WsConnect { + url: provider.url.to_string(), + auth: None, + }; + let client = tokio::time::timeout( + std::time::Duration::from_secs(10), + ClientBuilder::default().ws(connector), + ) + .await??; + provider.pubsub = Some(Provider::new_with_client(client)); + Ok(()) + } + _ => Err(anyhow::anyhow!( + "Only `ws://` or `wss://` providers are supported." + )), + } +} + fn providers_to_saved_configs(providers: &Providers) -> SavedConfigs { providers .iter() diff --git a/kinode/src/kernel/mod.rs b/kinode/src/kernel/mod.rs index c2b6c0fb..f9813ec0 100644 --- a/kinode/src/kernel/mod.rs +++ b/kinode/src/kernel/mod.rs @@ -860,7 +860,7 @@ pub async fn kernel( message: t::Message::Request(t::Request { inherit: false, expects_response: None, - body: rmp_serde::to_vec(&t::NetActions::KnsBatchUpdate(default_pki_entries)) + body: rmp_serde::to_vec(&t::NetAction::KnsBatchUpdate(default_pki_entries)) .unwrap(), metadata: None, capabilities: vec![], diff --git a/kinode/src/net/mod.rs b/kinode/src/net/mod.rs index 88b46e38..b83b798e 100644 --- a/kinode/src/net/mod.rs +++ b/kinode/src/net/mod.rs @@ -889,19 +889,20 @@ async fn handle_local_message( Message::Response((response, _context)) => { // these are received as a router, when we send ConnectionRequests // to a node we do routing for. - match rmp_serde::from_slice::(&response.body) { - Ok(NetResponses::Accepted(_)) => { + match rmp_serde::from_slice::(&response.body) { + Ok(NetResponse::Accepted(_)) => { // TODO anything here? } - Ok(NetResponses::Rejected(to)) => { + Ok(NetResponse::Rejected(to)) => { // drop from our pending map // this will drop the socket, causing initiator to see it as failed pending_passthroughs .ok_or(anyhow!("got net response as non-router"))? .remove(&(to, km.source.node)); } - Err(_) => { - // this is usually the "delivered" response to a raw message + _ => { + // this is the "delivered" response to a raw message, + // or a response to a Get that was somehow given.. ignore } } return Ok(()); @@ -909,16 +910,16 @@ async fn handle_local_message( }; if km.source.node != our.name { - if let Ok(act) = rmp_serde::from_slice::(body) { + if let Ok(act) = rmp_serde::from_slice::(body) { match act { - NetActions::KnsBatchUpdate(_) | NetActions::KnsUpdate(_) => { + NetAction::KnsBatchUpdate(_) | NetAction::KnsUpdate(_) => { // for now, we don't get these from remote. } - NetActions::ConnectionRequest(from) => { + NetAction::ConnectionRequest(from) => { // someone wants to open a passthrough with us through a router! // if we are an indirect node, and source is one of our routers, // respond by attempting to init a matching passthrough. - let res: Result = if our.allowed_routers.contains(&km.source.node) + let res: Result = if our.allowed_routers.contains(&km.source.node) { let router_id = peers .get(&km.source.node) @@ -942,9 +943,9 @@ async fn handle_local_message( print_tx, ) .await; - Ok(NetResponses::Accepted(from.clone())) + Ok(NetResponse::Accepted(from.clone())) } else { - Ok(NetResponses::Rejected(from.clone())) + Ok(NetResponse::Rejected(from.clone())) }; kernel_message_tx .send(KernelMessage { @@ -959,7 +960,7 @@ async fn handle_local_message( Response { inherit: false, body: rmp_serde::to_vec( - &res.unwrap_or(NetResponses::Rejected(from)), + &res.unwrap_or(NetResponse::Rejected(from)), )?, metadata: None, capabilities: vec![], @@ -970,6 +971,9 @@ async fn handle_local_message( }) .await?; } + _ => { + // we don't accept any other actions from remote + } } return Ok(()); }; @@ -978,14 +982,12 @@ async fn handle_local_message( parse_hello_message(our, &km, body, kernel_message_tx, print_tx).await?; Ok(()) } else { - // available commands: "peers", "pki", "names", "diagnostics" - // first parse as raw string, then deserialize to NetActions object - let mut printout = String::new(); - match rmp_serde::from_slice::(body) { - Ok(NetActions::ConnectionRequest(_)) => { + let maybe_response = match rmp_serde::from_slice::(body)? { + NetAction::ConnectionRequest(_) => { // we shouldn't receive these from ourselves. + None } - Ok(NetActions::KnsUpdate(log)) => { + NetAction::KnsUpdate(log) => { pki.insert( log.name.clone(), Identity { @@ -1000,8 +1002,9 @@ async fn handle_local_message( }, ); names.insert(log.node, log.name); + None } - Ok(NetActions::KnsBatchUpdate(log_list)) => { + NetAction::KnsBatchUpdate(log_list) => { for log in log_list { pki.insert( log.name.clone(), @@ -1018,96 +1021,70 @@ async fn handle_local_message( ); names.insert(log.node, log.name); } + None } - _ => match std::str::from_utf8(body) { - Ok("peers") => { + NetAction::GetPeers => Some(NetResponse::Peers( + peers + .iter() + .map(|p| p.identity.clone()) + .collect::>(), + )), + NetAction::GetPeer(peer_name) => Some(NetResponse::Peer( + peers.get(&peer_name).map(|p| p.identity.clone()), + )), + NetAction::GetName(namehash) => { + Some(NetResponse::Name(names.get(&namehash).map(|n| n.clone()))) + } + NetAction::GetDiagnostics => { + let mut printout = String::new(); + printout.push_str(&format!( + "indexing from contract address {}\r\n", + contract_address + )); + printout.push_str(&format!("our Identity: {:#?}\r\n", our)); + printout.push_str("we have connections with peers:\r\n"); + for peer in peers.iter() { printout.push_str(&format!( - "{:#?}", - peers - .iter() - .map(|p| p.identity.name.clone()) - .collect::>() + " {}, routing_for={}\r\n", + peer.identity.name, peer.routing_for, )); } - Ok("pki") => { - printout.push_str(&format!("{:#?}", pki)); - } - Ok("names") => { - printout.push_str(&format!("{:#?}", names)); - } - Ok("diagnostics") => { + printout.push_str(&format!("we have {} entries in the PKI\r\n", pki.len())); + if pending_passthroughs.is_some() { printout.push_str(&format!( - "indexing from contract address {}\r\n", - contract_address + "we have {} pending passthrough connections\r\n", + pending_passthroughs.unwrap().len() )); - printout.push_str(&format!("our Identity: {:#?}\r\n", our)); - printout.push_str("we have connections with peers:\r\n"); - for peer in peers.iter() { - printout.push_str(&format!( - " {}, routing_for={}\r\n", - peer.identity.name, peer.routing_for, - )); - } - printout.push_str(&format!("we have {} entries in the PKI\r\n", pki.len())); - if pending_passthroughs.is_some() { - printout.push_str(&format!( - "we have {} pending passthrough connections\r\n", - pending_passthroughs.unwrap().len() - )); - } - if forwarding_connections.is_some() { - printout.push_str(&format!( - "we have {} open passthrough connections\r\n", - forwarding_connections.unwrap().len() - )); - } } - Ok(other) => { - // parse non-commands as a request to fetch networking data - // about a specific node name - printout.push_str(&format!("net: printing known identity for {}\r\n", other)); - match pki.get(other) { - Some(id) => { - printout.push_str(&format!("{:#?}", *id)); - } - None => { - printout.push_str("no such identity known!"); - } - } - } - _ => {} - }, - } - if !printout.is_empty() { - if let Message::Request(req) = km.message { - if req.expects_response.is_some() { - kernel_message_tx - .send(KernelMessage { - id: km.id, - source: Address { - node: our.name.clone(), - process: ProcessId::new(Some("net"), "distro", "sys"), - }, - target: km.rsvp.unwrap_or(km.source), - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - body: printout.clone().into_bytes(), - metadata: None, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - }) - .await?; + if forwarding_connections.is_some() { + printout.push_str(&format!( + "we have {} open passthrough connections\r\n", + forwarding_connections.unwrap().len() + )); } + Some(NetResponse::Diagnostics(printout)) } - print_tx - .send(Printout { - verbosity: 0, - content: printout, + }; + if let Some(response_body) = maybe_response { + kernel_message_tx + .send(KernelMessage { + id: km.id, + source: Address { + node: our.name.clone(), + process: ProcessId::new(Some("net"), "distro", "sys"), + }, + target: km.rsvp.unwrap_or(km.source), + rsvp: None, + message: Message::Response(( + Response { + inherit: false, + body: rmp_serde::to_vec(&response_body)?, + metadata: None, + capabilities: vec![], + }, + None, + )), + lazy_load_blob: None, }) .await?; } diff --git a/kinode/src/net/utils.rs b/kinode/src/net/utils.rs index 69fbe3c8..5e1ef64b 100644 --- a/kinode/src/net/utils.rs +++ b/kinode/src/net/utils.rs @@ -226,7 +226,7 @@ pub async fn create_passthrough( message: Message::Request(Request { inherit: false, expects_response: Some(5), - body: rmp_serde::to_vec(&NetActions::ConnectionRequest(from_id.name.clone()))?, + body: rmp_serde::to_vec(&NetAction::ConnectionRequest(from_id.name.clone()))?, metadata: None, capabilities: vec![], }), diff --git a/lib/src/core.rs b/lib/src/core.rs index b05a0f73..5d2353c8 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -1532,8 +1532,9 @@ pub enum TimerAction { // /// Must be parsed from message pack vector. +/// all Get actions must be sent from local process. used for debugging #[derive(Clone, Debug, Serialize, Deserialize)] -pub enum NetActions { +pub enum NetAction { /// Received from a router of ours when they have a new pending passthrough for us. /// We should respond (if we desire) by using them to initialize a routed connection /// with the NodeId given. @@ -1542,14 +1543,30 @@ pub enum NetActions { /// in the future could get from remote provider KnsUpdate(KnsUpdate), KnsBatchUpdate(Vec), + /// get a list of peers we are connected to + GetPeers, + /// get the [`Identity`] struct for a single peer + GetPeer(String), + /// get the [`NodeId`] associated with a given namehash, if any + GetName(String), + /// get a user-readable diagnostics string containing networking inforamtion + GetDiagnostics, } /// For now, only sent in response to a ConnectionRequest. /// Must be parsed from message pack vector #[derive(Clone, Debug, Serialize, Deserialize)] -pub enum NetResponses { +pub enum NetResponse { Accepted(NodeId), Rejected(NodeId), + /// response to [`NetAction::GetPeers`] + Peers(Vec), + /// response to [`NetAction::GetPeer`] + Peer(Option), + /// response to [`NetAction::GetName`] + Name(Option), + /// response to [`NetAction::GetDiagnostics`]. A user-readable string. + Diagnostics(String), } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/lib/src/eth.rs b/lib/src/eth.rs index a66900d8..e8026b5e 100644 --- a/lib/src/eth.rs +++ b/lib/src/eth.rs @@ -1,6 +1,6 @@ use alloy_rpc_types::pubsub::{Params, SubscriptionKind, SubscriptionResult}; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; /// The Action and Request type that can be made to eth:distro:sys. Any process with messaging /// capabilities can send this action to the eth provider. @@ -110,6 +110,8 @@ pub enum EthConfigAction { GetProviders, /// Get the current access settings. GetAccessSettings, + /// Get the state of calls and subscriptions. Used for debugging. + GetState, } /// Response type from an [`EthConfigAction`] request. @@ -124,6 +126,11 @@ pub enum EthConfigResponse { AccessSettings(AccessSettings), /// Permission denied due to missing capability PermissionDenied, + /// Response from a GetState request + State { + active_subscriptions: HashMap>>, // None if local, Some(node_provider_name) if remote + outstanding_requests: HashSet, + }, } /// Settings for our ETH provider From bbe5905e59a0b3bec668081ad5a57ca535f3227b Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 5 Mar 2024 17:31:43 -0300 Subject: [PATCH 6/8] update readme --- README.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 3dbcaf3a..1c1e7923 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,8 @@ The `sys` publisher is not a real node ID, but it's also not a special case valu - CTRL+R to search history, CTRL+R again to toggle through search results, CTRL+G to cancel search - `m
`: send an inter-process message.
is formatted as @. is formatted as ::. JSON containing spaces must be wrapped in single-quotes (`''`). - - Example: `m our@net:distro:sys diagnostics` + - Example: `m our@eth:distro:sys "SetPublic" -a 5` + - the '-a' flag is used to expect a response with a given timeout - `our` will always be interpolated by the system as your node's name - `hi `: send a text message to another node's command line. - Example: `hi ben.os hello world` @@ -114,6 +115,9 @@ The `sys` publisher is not a real node ID, but it's also not a special case valu - Example: `cat /terminal:sys/pkg/scripts.json` - `echo `: print `text` to the terminal - Example: `echo foo` +- `net_diagnostics`: print some useful networking diagnostic data +- `peers`: print the peers the node currently hold connections with +- `peer `: print the peer's PKI info, if it exists ### Terminal example usage From 274ae268f27803134e06e98fc96e2243ece25ddb Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Wed, 6 Mar 2024 12:54:40 -0300 Subject: [PATCH 7/8] improve prints slightly --- kinode/packages/kns_indexer/get_block/src/lib.rs | 2 +- kinode/packages/terminal/alias/src/lib.rs | 2 +- kinode/packages/terminal/cat/src/lib.rs | 4 ++-- kinode/packages/terminal/echo/src/lib.rs | 2 +- kinode/packages/terminal/hi/src/lib.rs | 2 +- kinode/packages/terminal/m/src/lib.rs | 2 +- kinode/packages/terminal/namehash_to_name/src/lib.rs | 2 +- kinode/packages/terminal/peer/src/lib.rs | 2 +- kinode/packages/terminal/top/src/lib.rs | 2 +- 9 files changed, 10 insertions(+), 10 deletions(-) diff --git a/kinode/packages/kns_indexer/get_block/src/lib.rs b/kinode/packages/kns_indexer/get_block/src/lib.rs index 11c9c235..52c2eee0 100644 --- a/kinode/packages/kns_indexer/get_block/src/lib.rs +++ b/kinode/packages/kns_indexer/get_block/src/lib.rs @@ -12,7 +12,7 @@ call_init!(init); fn init(_our: Address) { let Ok(args) = await_next_request_body() else { - println!("get_block: failed to get args, aborting"); + println!("failed to get args"); return; }; diff --git a/kinode/packages/terminal/alias/src/lib.rs b/kinode/packages/terminal/alias/src/lib.rs index cdce18a8..b0d15097 100644 --- a/kinode/packages/terminal/alias/src/lib.rs +++ b/kinode/packages/terminal/alias/src/lib.rs @@ -23,7 +23,7 @@ call_init!(init); fn init(_our: Address) { let Ok(args) = await_next_request_body() else { - println!("failed to get args, aborting"); + println!("failed to get args"); return; }; diff --git a/kinode/packages/terminal/cat/src/lib.rs b/kinode/packages/terminal/cat/src/lib.rs index e05b24b3..cf529e67 100644 --- a/kinode/packages/terminal/cat/src/lib.rs +++ b/kinode/packages/terminal/cat/src/lib.rs @@ -14,12 +14,12 @@ call_init!(init); fn init(_our: Address) { let Ok(args) = await_next_request_body() else { - println!("failed to get args, aborting"); + println!("failed to get args"); return; }; let Ok(file_path) = String::from_utf8(args) else { - println!("bad args, aborting"); + println!("argument must be a single string"); return; }; diff --git a/kinode/packages/terminal/echo/src/lib.rs b/kinode/packages/terminal/echo/src/lib.rs index f5f94a92..92609f57 100644 --- a/kinode/packages/terminal/echo/src/lib.rs +++ b/kinode/packages/terminal/echo/src/lib.rs @@ -12,7 +12,7 @@ call_init!(init); fn init(_our: Address) { let Ok(args) = await_next_request_body() else { - println!("failed to get args, aborting"); + println!("failed to get args"); return; }; diff --git a/kinode/packages/terminal/hi/src/lib.rs b/kinode/packages/terminal/hi/src/lib.rs index 6116b242..9b53a133 100644 --- a/kinode/packages/terminal/hi/src/lib.rs +++ b/kinode/packages/terminal/hi/src/lib.rs @@ -14,7 +14,7 @@ call_init!(init); fn init(our: Address) { let Ok(args) = await_next_request_body() else { - println!("failed to get args, aborting"); + println!("failed to get args"); return; }; diff --git a/kinode/packages/terminal/m/src/lib.rs b/kinode/packages/terminal/m/src/lib.rs index c284a72d..c45cb661 100644 --- a/kinode/packages/terminal/m/src/lib.rs +++ b/kinode/packages/terminal/m/src/lib.rs @@ -16,7 +16,7 @@ call_init!(init); fn init(_our: Address) { let Ok(body) = await_next_request_body() else { - println!("failed to get args, aborting"); + println!("failed to get args"); return; }; let body_string = String::from_utf8(body).unwrap(); diff --git a/kinode/packages/terminal/namehash_to_name/src/lib.rs b/kinode/packages/terminal/namehash_to_name/src/lib.rs index 90b041fe..77e29bb8 100644 --- a/kinode/packages/terminal/namehash_to_name/src/lib.rs +++ b/kinode/packages/terminal/namehash_to_name/src/lib.rs @@ -55,7 +55,7 @@ call_init!(init); fn init(_our: Address) { let Ok(args) = await_next_request_body() else { - println!("failed to get args, aborting"); + println!("failed to get args"); return; }; let Ok(namehash) = String::from_utf8(args) else { diff --git a/kinode/packages/terminal/peer/src/lib.rs b/kinode/packages/terminal/peer/src/lib.rs index 9aa05f43..12cb7499 100644 --- a/kinode/packages/terminal/peer/src/lib.rs +++ b/kinode/packages/terminal/peer/src/lib.rs @@ -55,7 +55,7 @@ call_init!(init); fn init(_our: Address) { let Ok(args) = await_next_request_body() else { - println!("failed to get args, aborting"); + println!("failed to get args"); return; }; let Ok(name) = String::from_utf8(args) else { diff --git a/kinode/packages/terminal/top/src/lib.rs b/kinode/packages/terminal/top/src/lib.rs index e4b93b56..479d83e4 100644 --- a/kinode/packages/terminal/top/src/lib.rs +++ b/kinode/packages/terminal/top/src/lib.rs @@ -15,7 +15,7 @@ call_init!(init); fn init(_our: Address) { let Ok(args) = await_next_request_body() else { - println!("failed to get args, aborting"); + println!("failed to get args"); return; }; From 0b3fc47575895d34e3fc751b3b8b6eae43f98a5c Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Wed, 6 Mar 2024 14:14:58 -0300 Subject: [PATCH 8/8] http_server feat: add unbind action --- kinode/src/http/server.rs | 70 +++++++++++++++++++++++++++++------- lib/src/http/server_types.rs | 4 +++ 2 files changed, 62 insertions(+), 12 deletions(-) diff --git a/kinode/src/http/server.rs b/kinode/src/http/server.rs index 5cd7415d..9a443ed1 100644 --- a/kinode/src/http/server.rs +++ b/kinode/src/http/server.rs @@ -40,7 +40,7 @@ type PathBindings = Arc>>; type WsPathBindings = Arc>>; struct BoundPath { - pub app: ProcessId, + pub app: Option, // if None, path has been unbound pub path: String, pub secure_subdomain: Option, pub authenticated: bool, @@ -49,7 +49,7 @@ struct BoundPath { } struct BoundWsPath { - pub app: ProcessId, + pub app: Option, // if None, path has been unbound pub secure_subdomain: Option, pub authenticated: bool, pub encrypted: bool, // TODO use @@ -191,7 +191,7 @@ pub async fn http_server( // add RPC path let mut bindings_map: Router = Router::new(); let rpc_bound_path = BoundPath { - app: ProcessId::new(Some("rpc"), "distro", "sys"), + app: Some(ProcessId::new(Some("rpc"), "distro", "sys")), path: path.clone(), secure_subdomain: None, // TODO maybe RPC should have subdomain? authenticated: false, @@ -397,6 +397,10 @@ async fn ws_handler( }; let bound_path = route.handler(); + let Some(app) = bound_path.app.clone() else { + return Err(warp::reject::not_found()); + }; + if let Some(ref subdomain) = bound_path.secure_subdomain { let _ = print_tx .send(Printout { @@ -435,7 +439,6 @@ async fn ws_handler( return Err(warp::reject::reject()); } - let app = bound_path.app.clone(); let extension = bound_path.extension; drop(ws_path_bindings); @@ -500,6 +503,10 @@ async fn http_handler( }; let bound_path = route.handler(); + let Some(app) = &bound_path.app else { + return Ok(warp::reply::with_status(vec![], StatusCode::NOT_FOUND).into_response()); + }; + if bound_path.authenticated && !auth_cookie_valid( &our, @@ -578,7 +585,7 @@ async fn http_handler( // RPC functionality: if path is /rpc:distro:sys/message, // we extract message from base64 encoded bytes in data // and send it to the correct app. - let (message, is_fire_and_forget) = if bound_path.app == "rpc:distro:sys" { + let (message, is_fire_and_forget) = if app == &"rpc:distro:sys" { match handle_rpc_message(our, id, body, print_tx).await { Ok((message, is_fire_and_forget)) => (message, is_fire_and_forget), Err(e) => { @@ -601,7 +608,7 @@ async fn http_handler( }, target: Address { node: our.to_string(), - process: bound_path.app.clone(), + process: app.clone(), }, rsvp: None, message: Message::Request(Request { @@ -1147,7 +1154,7 @@ async fn handle_app_message( path_bindings.add( &normalize_path(&path), BoundPath { - app: km.source.process.clone(), + app: Some(km.source.process.clone()), path: path.clone(), secure_subdomain: None, authenticated, @@ -1170,7 +1177,7 @@ async fn handle_app_message( path_bindings.add( &normalize_path(&path), BoundPath { - app: km.source.process.clone(), + app: Some(km.source.process.clone()), path: path.clone(), secure_subdomain: None, authenticated, @@ -1194,7 +1201,7 @@ async fn handle_app_message( path_bindings.add( &normalize_path(&path), BoundPath { - app: km.source.process.clone(), + app: Some(km.source.process.clone()), path: path.clone(), secure_subdomain: Some(subdomain), authenticated: true, @@ -1217,7 +1224,7 @@ async fn handle_app_message( path_bindings.add( &normalize_path(&path), BoundPath { - app: km.source.process.clone(), + app: Some(km.source.process.clone()), path: path.clone(), secure_subdomain: Some(subdomain), authenticated: true, @@ -1227,6 +1234,27 @@ async fn handle_app_message( ); } } + HttpServerAction::Unbind { mut path } => { + let mut path_bindings = path_bindings.write().await; + if km.source.process != "homepage:homepage:sys" { + path = if path.starts_with('/') { + format!("/{}{}", km.source.process, path) + } else { + format!("/{}/{}", km.source.process, path) + }; + } + path_bindings.add( + &normalize_path(&path), + BoundPath { + app: None, + path: path.clone(), + secure_subdomain: None, + authenticated: false, + local_only: false, + static_content: None, + }, + ); + } HttpServerAction::WebSocketBind { mut path, authenticated, @@ -1242,7 +1270,7 @@ async fn handle_app_message( ws_path_bindings.add( &normalize_path(&path), BoundWsPath { - app: km.source.process.clone(), + app: Some(km.source.process.clone()), secure_subdomain: None, authenticated, encrypted, @@ -1267,7 +1295,7 @@ async fn handle_app_message( ws_path_bindings.add( &normalize_path(&path), BoundWsPath { - app: km.source.process.clone(), + app: Some(km.source.process.clone()), secure_subdomain: Some(subdomain), authenticated: true, encrypted, @@ -1275,6 +1303,24 @@ async fn handle_app_message( }, ); } + HttpServerAction::WebSocketUnbind { mut path } => { + let mut ws_path_bindings = ws_path_bindings.write().await; + path = if path.starts_with('/') { + format!("/{}{}", km.source.process, path) + } else { + format!("/{}/{}", km.source.process, path) + }; + ws_path_bindings.add( + &normalize_path(&path), + BoundWsPath { + app: None, + secure_subdomain: None, + authenticated: false, + encrypted: false, + extension: false, + }, + ); + } HttpServerAction::WebSocketOpen { .. } => { // we cannot receive these, only send them to processes send_action_response( diff --git a/lib/src/http/server_types.rs b/lib/src/http/server_types.rs index 363b00e4..d35dbc7a 100644 --- a/lib/src/http/server_types.rs +++ b/lib/src/http/server_types.rs @@ -90,6 +90,8 @@ pub enum HttpServerAction { /// lazy_load_blob bytes and serve them as the response to any request to this path. cache: bool, }, + /// Unbind a previously-bound HTTP path + Unbind { path: String }, /// Bind a path to receive incoming WebSocket connections. /// Doesn't need a cache since does not serve assets. WebSocketBind { @@ -107,6 +109,8 @@ pub enum HttpServerAction { encrypted: bool, extension: bool, }, + /// Unbind a previously-bound WebSocket path + WebSocketUnbind { path: String }, /// Processes will RECEIVE this kind of request when a client connects to them. /// If a process does not want this websocket open, they should issue a *request* /// containing a [`type@HttpServerAction::WebSocketClose`] message and this channel ID.