From 6306e6acb53ecd0061362bde29f2975b978e9d0a Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Thu, 18 Apr 2024 16:08:53 -0700 Subject: [PATCH] http-client: fix WS client closing: no addl messages (aside from Close) and clean up task --- kinode/src/http/client.rs | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/kinode/src/http/client.rs b/kinode/src/http/client.rs index 8baf8136..b2acb6c8 100644 --- a/kinode/src/http/client.rs +++ b/kinode/src/http/client.rs @@ -265,7 +265,7 @@ async fn listen_to_stream( match message { Ok(msg) => { // Handle different types of incoming WebSocket messages - let (body, blob) = match msg { + let (body, blob, should_exit) = match msg { TungsteniteMessage::Text(text) => ( HttpClientRequest::WebSocketPush { channel_id, @@ -275,6 +275,7 @@ async fn listen_to_stream( mime: Some("text/plain".into()), bytes: text.into_bytes(), }), + false, ), TungsteniteMessage::Binary(bytes) => ( HttpClientRequest::WebSocketPush { @@ -285,12 +286,13 @@ async fn listen_to_stream( mime: Some("application/octet-stream".into()), bytes, }), + false, ), TungsteniteMessage::Close(_) => { // remove the websocket from the map ws_streams.remove(&(target.process.clone(), channel_id)); - (HttpClientRequest::WebSocketClose { channel_id }, None) + (HttpClientRequest::WebSocketClose { channel_id }, None, true) } TungsteniteMessage::Ping(_) => ( HttpClientRequest::WebSocketPush { @@ -298,6 +300,7 @@ async fn listen_to_stream( message_type: WsMessageType::Ping, }, None, + false, ), TungsteniteMessage::Pong(_) => ( HttpClientRequest::WebSocketPush { @@ -305,6 +308,7 @@ async fn listen_to_stream( message_type: WsMessageType::Pong, }, None, + false, ), _ => { // should never get a TungsteniteMessage::Frame, ignore if we do @@ -312,15 +316,21 @@ async fn listen_to_stream( } }; - handle_ws_message( - our.clone(), - id, - target.clone(), - body, - blob, - send_to_loop.clone(), - ) - .await; + if ws_streams.contains_key(&(target.process.clone(), channel_id)) || should_exit { + handle_ws_message( + our.clone(), + id, + target.clone(), + body, + blob, + send_to_loop.clone(), + ) + .await; + } + + if should_exit { + break; + } } Err(e) => { println!("WebSocket Client Error ({}): {:?}", channel_id, e); @@ -633,7 +643,7 @@ async fn close_ws_connection( ws_streams: WebSocketStreams, _print_tx: PrintSender, ) -> Result { - let Some(mut ws_sink) = ws_streams.get_mut(&(target.process.clone(), channel_id)) else { + let Some((_, mut ws_sink)) = ws_streams.remove(&(target.process.clone(), channel_id)) else { return Err(HttpClientError::WsCloseFailed { channel_id }); };