From 862af9ecd766bb84a0cde454fbcbc0a5ac620ebe Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Wed, 13 Dec 2023 23:34:59 -0300 Subject: [PATCH] Revert "Merge pull request #84 from uqbar-dao/revert-81-da/ws-fix" This reverts commit fea8d61bd2c52eff1404634993d945507f220e5f, reversing changes made to dbcf5757d46e617356d076c8186625c72eb2a0cc. --- src/http/server.rs | 22 +++++++++++++++++----- src/http/types.rs | 8 ++++---- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/src/http/server.rs b/src/http/server.rs index fe46d5b4..3608ea1a 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -25,7 +25,7 @@ type HttpSender = tokio::sync::oneshot::Sender<(HttpResponse, Vec)>; /// mapping from an open websocket connection to a channel that will ingest /// WebSocketPush messages from the app that handles the connection, and /// send them to the connection. -type WebSocketSenders = Arc>; +type WebSocketSenders = Arc>; type WebSocketSender = tokio::sync::mpsc::Sender; type PathBindings = Arc>>; @@ -127,20 +127,31 @@ async fn serve( let cloned_msg_tx = send_to_loop.clone(); let cloned_our = our.clone(); let cloned_jwt_secret_bytes = jwt_secret_bytes.clone(); + let cloned_print_tx = print_tx.clone(); let ws_route = warp::path::end() .and(warp::ws()) .and(warp::any().map(move || cloned_our.clone())) .and(warp::any().map(move || cloned_jwt_secret_bytes.clone())) .and(warp::any().map(move || ws_senders.clone())) .and(warp::any().map(move || cloned_msg_tx.clone())) + .and(warp::any().map(move || cloned_print_tx.clone())) .map( |ws_connection: Ws, our: Arc, jwt_secret_bytes: Arc>, ws_senders: WebSocketSenders, - send_to_loop: MessageSender| { + send_to_loop: MessageSender, + print_tx: PrintSender| { ws_connection.on_upgrade(move |ws: WebSocket| async move { - maintain_websocket(ws, our, jwt_secret_bytes, ws_senders, send_to_loop).await + maintain_websocket( + ws, + our, + jwt_secret_bytes, + ws_senders, + send_to_loop, + print_tx, + ) + .await }) }, ); @@ -379,6 +390,7 @@ async fn maintain_websocket( jwt_secret_bytes: Arc>, ws_senders: WebSocketSenders, send_to_loop: MessageSender, + _print_tx: PrintSender, ) { let (mut write_stream, mut read_stream) = ws.split(); @@ -420,7 +432,7 @@ async fn maintain_websocket( return; } - let ws_channel_id: u64 = rand::random(); + let ws_channel_id: u32 = rand::random(); let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100); ws_senders.insert(ws_channel_id, (owner_process.clone(), ws_sender)); @@ -530,7 +542,7 @@ async fn maintain_websocket( } async fn websocket_close( - channel_id: u64, + channel_id: u32, process: ProcessId, ws_senders: &WebSocketSenders, send_to_loop: &MessageSender, diff --git a/src/http/types.rs b/src/http/types.rs index 5486bd36..31ce989a 100644 --- a/src/http/types.rs +++ b/src/http/types.rs @@ -74,17 +74,17 @@ pub enum HttpServerAction { /// Processes will RECEIVE this kind of request when a client connects to them. /// If a process does not want this websocket open, they can respond with an /// [`enum@HttpServerAction::WebSocketClose`] message. - WebSocketOpen(u64), + WebSocketOpen(u32), /// Processes can both SEND and RECEIVE this kind of request. /// When sent, expects a payload containing the WebSocket message bytes to send. WebSocketPush { - channel_id: u64, + channel_id: u32, message_type: WsMessageType, }, /// Processes can both SEND and RECEIVE this kind of request. Sending will /// close a socket the process controls. Receiving will indicate that the /// client closed the socket. - WebSocketClose(u64), + WebSocketClose(u32), } /// The possible message types for WebSocketPush. Ping and Pong are limited to 125 bytes @@ -129,7 +129,7 @@ pub struct WsRegister { /// Structure sent from this server to client websocket upon opening a new connection. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct WsRegisterResponse { - pub channel_id: u64, + pub channel_id: u32, // TODO symmetric key exchange here }