Merge pull request #311 from kinode-dao/hf/ws-client-close-fix

http-client: fix WS client closing
This commit is contained in:
nick.kino 2024-04-22 09:30:06 -07:00 committed by GitHub
commit 41e59fbb69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -265,7 +265,7 @@ async fn listen_to_stream(
match message {
Ok(msg) => {
// Handle different types of incoming WebSocket messages
let (body, blob) = match msg {
let (body, blob, should_exit) = match msg {
TungsteniteMessage::Text(text) => (
HttpClientRequest::WebSocketPush {
channel_id,
@ -275,6 +275,7 @@ async fn listen_to_stream(
mime: Some("text/plain".into()),
bytes: text.into_bytes(),
}),
false,
),
TungsteniteMessage::Binary(bytes) => (
HttpClientRequest::WebSocketPush {
@ -285,12 +286,13 @@ async fn listen_to_stream(
mime: Some("application/octet-stream".into()),
bytes,
}),
false,
),
TungsteniteMessage::Close(_) => {
// remove the websocket from the map
ws_streams.remove(&(target.process.clone(), channel_id));
(HttpClientRequest::WebSocketClose { channel_id }, None)
(HttpClientRequest::WebSocketClose { channel_id }, None, true)
}
TungsteniteMessage::Ping(_) => (
HttpClientRequest::WebSocketPush {
@ -298,6 +300,7 @@ async fn listen_to_stream(
message_type: WsMessageType::Ping,
},
None,
false,
),
TungsteniteMessage::Pong(_) => (
HttpClientRequest::WebSocketPush {
@ -305,6 +308,7 @@ async fn listen_to_stream(
message_type: WsMessageType::Pong,
},
None,
false,
),
_ => {
// should never get a TungsteniteMessage::Frame, ignore if we do
@ -312,6 +316,7 @@ async fn listen_to_stream(
}
};
if ws_streams.contains_key(&(target.process.clone(), channel_id)) || should_exit {
handle_ws_message(
our.clone(),
id,
@ -322,6 +327,11 @@ async fn listen_to_stream(
)
.await;
}
if should_exit {
break;
}
}
Err(e) => {
println!("WebSocket Client Error ({}): {:?}", channel_id, e);
@ -633,7 +643,7 @@ async fn close_ws_connection(
ws_streams: WebSocketStreams,
_print_tx: PrintSender,
) -> Result<HttpClientResponse, HttpClientError> {
let Some(mut ws_sink) = ws_streams.get_mut(&(target.process.clone(), channel_id)) else {
let Some((_, mut ws_sink)) = ws_streams.remove(&(target.process.clone(), channel_id)) else {
return Err(HttpClientError::WsCloseFailed { channel_id });
};