From 77338b9c31e213d53f00173d9eb9e8083c711409 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Thu, 9 Nov 2023 15:47:08 -0500 Subject: [PATCH] quickfix: net dispose of old forwarding connection tasks, refactor !hi responses to DRY --- src/net/mod.rs | 62 +++++++++++++----------------------------------- src/net/utils.rs | 41 ++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 46 deletions(-) diff --git a/src/net/mod.rs b/src/net/mod.rs index f122554b..7f6c1ea6 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -354,7 +354,15 @@ async fn direct_networking( } } } - // 3. receive incoming TCP connections + // 3. join any closed forwarding connection tasks and destroy them + // TODO can do more here if desired + Some(res) = forwarding_connections.join_next() => { + match res { + Ok(()) => continue, + Err(_e) => continue, + } + } + // 4. receive incoming TCP connections Ok((stream, _socket_addr)) = tcp.accept() => { // TODO we can perform some amount of validation here // to prevent some amount of potential DDoS attacks. @@ -822,7 +830,7 @@ async fn handle_local_message( ) -> Result<()> { print_debug(&print_tx, "net: handling local message").await; let ipc = match km.message { - Message::Request(request) => request.ipc, + Message::Request(ref request) => &request.ipc, Message::Response((response, _context)) => { // these are received as a router, when we send ConnectionRequests // to a node we do routing for. @@ -846,7 +854,7 @@ async fn handle_local_message( }; if km.source.node != our.name { - if let Ok(act) = rmp_serde::from_slice::(&ipc) { + if let Ok(act) = rmp_serde::from_slice::(ipc) { match act { NetActions::QnsBatchUpdate(_) | NetActions::QnsUpdate(_) => { // for now, we don't get these from remote. @@ -912,43 +920,13 @@ async fn handle_local_message( }; // if we can't parse this to a netaction, treat it as a hello and print it // respond to a text message with a simple "delivered" response - print_tx - .send(Printout { - verbosity: 0, - content: format!( - "\x1b[3;32m{}: {}\x1b[0m", - km.source.node, - std::str::from_utf8(&ipc).unwrap_or("!!message parse error!!") - ), - }) - .await?; - kernel_message_tx - .send(KernelMessage { - id: km.id, - source: Address { - node: our.name.clone(), - process: ProcessId::from_str("net:sys:uqbar").unwrap(), - }, - target: km.rsvp.unwrap_or(km.source), - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - ipc: "delivered".as_bytes().to_vec(), - metadata: None, - }, - None, - )), - payload: None, - signed_capabilities: None, - }) - .await?; + parse_hello_message(&our, &km, ipc, &kernel_message_tx, &print_tx).await?; Ok(()) } else { // available commands: "peers", "pki", "names", "diagnostics" // first parse as raw string, then deserialize to NetActions object let mut printout = String::new(); - match std::str::from_utf8(&ipc) { + match std::str::from_utf8(ipc) { Ok("peers") => { printout.push_str(&format!( "{:#?}", @@ -988,7 +966,7 @@ async fn handle_local_message( } } _ => { - match rmp_serde::from_slice::(&ipc) { + match rmp_serde::from_slice::(ipc) { Ok(NetActions::ConnectionRequest(_)) => { // we shouldn't receive these from ourselves. } @@ -1033,16 +1011,8 @@ async fn handle_local_message( } } _ => { - print_tx - .send(Printout { - verbosity: 0, - content: format!( - "\x1b[3;32m{}: {}\x1b[0m", - km.source.node, - std::str::from_utf8(&ipc).unwrap_or("!!message parse error!!") - ), - }) - .await?; + parse_hello_message(&our, &km, ipc, &kernel_message_tx, &print_tx).await?; + return Ok(()) } } } diff --git a/src/net/utils.rs b/src/net/utils.rs index dd6c2894..c39ba6f0 100644 --- a/src/net/utils.rs +++ b/src/net/utils.rs @@ -461,6 +461,47 @@ fn strip_0x(s: &str) -> String { } } +pub async fn parse_hello_message( + our: &Identity, + km: &KernelMessage, + ipc: &[u8], + kernel_message_tx: &MessageSender, + print_tx: &PrintSender, +) -> Result<()> { + print_tx + .send(Printout { + verbosity: 0, + content: format!( + "\x1b[3;32m{}: {}\x1b[0m", + km.source.node, + std::str::from_utf8(&ipc).unwrap_or("!!message parse error!!") + ), + }) + .await?; + kernel_message_tx + .send(KernelMessage { + id: km.id, + source: Address { + node: our.name.clone(), + process: ProcessId::from_str("net:sys:uqbar").unwrap(), + }, + target: km.rsvp.as_ref().unwrap_or(&km.source).clone(), + rsvp: None, + message: Message::Response(( + Response { + inherit: false, + ipc: "delivered".as_bytes().to_vec(), + metadata: None, + }, + None, + )), + payload: None, + signed_capabilities: None, + }) + .await?; + Ok(()) +} + pub async fn print_debug(print_tx: &PrintSender, content: &str) { let _ = print_tx .send(Printout {