diff --git a/src/http/server.rs b/src/http/server.rs index 25152b52..3d87c453 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -25,7 +25,7 @@ type HttpSender = tokio::sync::oneshot::Sender<(HttpResponse, Vec)>; /// mapping from an open websocket connection to a channel that will ingest /// WebSocketPush messages from the app that handles the connection, and /// send them to the connection. -type WebSocketSenders = Arc>; +type WebSocketSenders = Arc>; type WebSocketSender = tokio::sync::mpsc::Sender; type PathBindings = Arc>>; @@ -127,20 +127,31 @@ async fn serve( let cloned_msg_tx = send_to_loop.clone(); let cloned_our = our.clone(); let cloned_jwt_secret_bytes = jwt_secret_bytes.clone(); + let cloned_print_tx = print_tx.clone(); let ws_route = warp::path::end() .and(warp::ws()) .and(warp::any().map(move || cloned_our.clone())) .and(warp::any().map(move || cloned_jwt_secret_bytes.clone())) .and(warp::any().map(move || ws_senders.clone())) .and(warp::any().map(move || cloned_msg_tx.clone())) + .and(warp::any().map(move || cloned_print_tx.clone())) .map( |ws_connection: Ws, our: Arc, jwt_secret_bytes: Arc>, ws_senders: WebSocketSenders, - send_to_loop: MessageSender| { + send_to_loop: MessageSender, + print_tx: PrintSender| { ws_connection.on_upgrade(move |ws: WebSocket| async move { - maintain_websocket(ws, our, jwt_secret_bytes, ws_senders, send_to_loop).await + maintain_websocket( + ws, + our, + jwt_secret_bytes, + ws_senders, + send_to_loop, + print_tx, + ) + .await }) }, ); @@ -379,6 +390,7 @@ async fn maintain_websocket( jwt_secret_bytes: Arc>, ws_senders: WebSocketSenders, send_to_loop: MessageSender, + _print_tx: PrintSender, ) { let (mut write_stream, mut read_stream) = ws.split(); @@ -420,7 +432,7 @@ async fn maintain_websocket( return; } - let ws_channel_id: u64 = rand::random(); + let ws_channel_id: u32 = rand::random(); let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100); ws_senders.insert(ws_channel_id, (owner_process.clone(), ws_sender)); @@ -505,7 +517,7 @@ async fn maintain_websocket( } async fn websocket_close( - channel_id: u64, + channel_id: u32, process: ProcessId, ws_senders: &WebSocketSenders, send_to_loop: &MessageSender, diff --git a/src/http/types.rs b/src/http/types.rs index 5486bd36..31ce989a 100644 --- a/src/http/types.rs +++ b/src/http/types.rs @@ -74,17 +74,17 @@ pub enum HttpServerAction { /// Processes will RECEIVE this kind of request when a client connects to them. /// If a process does not want this websocket open, they can respond with an /// [`enum@HttpServerAction::WebSocketClose`] message. - WebSocketOpen(u64), + WebSocketOpen(u32), /// Processes can both SEND and RECEIVE this kind of request. /// When sent, expects a payload containing the WebSocket message bytes to send. WebSocketPush { - channel_id: u64, + channel_id: u32, message_type: WsMessageType, }, /// Processes can both SEND and RECEIVE this kind of request. Sending will /// close a socket the process controls. Receiving will indicate that the /// client closed the socket. - WebSocketClose(u64), + WebSocketClose(u32), } /// The possible message types for WebSocketPush. Ping and Pong are limited to 125 bytes @@ -129,7 +129,7 @@ pub struct WsRegister { /// Structure sent from this server to client websocket upon opening a new connection. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct WsRegisterResponse { - pub channel_id: u64, + pub channel_id: u32, // TODO symmetric key exchange here }