diff --git a/src/http/server.rs b/src/http/server.rs index 3d87c453..25152b52 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -25,7 +25,7 @@ type HttpSender = tokio::sync::oneshot::Sender<(HttpResponse, Vec)>; /// mapping from an open websocket connection to a channel that will ingest /// WebSocketPush messages from the app that handles the connection, and /// send them to the connection. -type WebSocketSenders = Arc>; +type WebSocketSenders = Arc>; type WebSocketSender = tokio::sync::mpsc::Sender; type PathBindings = Arc>>; @@ -127,31 +127,20 @@ async fn serve( let cloned_msg_tx = send_to_loop.clone(); let cloned_our = our.clone(); let cloned_jwt_secret_bytes = jwt_secret_bytes.clone(); - let cloned_print_tx = print_tx.clone(); let ws_route = warp::path::end() .and(warp::ws()) .and(warp::any().map(move || cloned_our.clone())) .and(warp::any().map(move || cloned_jwt_secret_bytes.clone())) .and(warp::any().map(move || ws_senders.clone())) .and(warp::any().map(move || cloned_msg_tx.clone())) - .and(warp::any().map(move || cloned_print_tx.clone())) .map( |ws_connection: Ws, our: Arc, jwt_secret_bytes: Arc>, ws_senders: WebSocketSenders, - send_to_loop: MessageSender, - print_tx: PrintSender| { + send_to_loop: MessageSender| { ws_connection.on_upgrade(move |ws: WebSocket| async move { - maintain_websocket( - ws, - our, - jwt_secret_bytes, - ws_senders, - send_to_loop, - print_tx, - ) - .await + maintain_websocket(ws, our, jwt_secret_bytes, ws_senders, send_to_loop).await }) }, ); @@ -390,7 +379,6 @@ async fn maintain_websocket( jwt_secret_bytes: Arc>, ws_senders: WebSocketSenders, send_to_loop: MessageSender, - _print_tx: PrintSender, ) { let (mut write_stream, mut read_stream) = ws.split(); @@ -432,7 +420,7 @@ async fn maintain_websocket( return; } - let ws_channel_id: u32 = rand::random(); + let ws_channel_id: u64 = rand::random(); let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100); ws_senders.insert(ws_channel_id, (owner_process.clone(), ws_sender)); @@ -517,7 +505,7 @@ async fn maintain_websocket( } async fn websocket_close( - channel_id: u32, + channel_id: u64, process: ProcessId, ws_senders: &WebSocketSenders, send_to_loop: &MessageSender, diff --git a/src/http/types.rs b/src/http/types.rs index 31ce989a..5486bd36 100644 --- a/src/http/types.rs +++ b/src/http/types.rs @@ -74,17 +74,17 @@ pub enum HttpServerAction { /// Processes will RECEIVE this kind of request when a client connects to them. /// If a process does not want this websocket open, they can respond with an /// [`enum@HttpServerAction::WebSocketClose`] message. - WebSocketOpen(u32), + WebSocketOpen(u64), /// Processes can both SEND and RECEIVE this kind of request. /// When sent, expects a payload containing the WebSocket message bytes to send. WebSocketPush { - channel_id: u32, + channel_id: u64, message_type: WsMessageType, }, /// Processes can both SEND and RECEIVE this kind of request. Sending will /// close a socket the process controls. Receiving will indicate that the /// client closed the socket. - WebSocketClose(u32), + WebSocketClose(u64), } /// The possible message types for WebSocketPush. Ping and Pong are limited to 125 bytes @@ -129,7 +129,7 @@ pub struct WsRegister { /// Structure sent from this server to client websocket upon opening a new connection. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct WsRegisterResponse { - pub channel_id: u32, + pub channel_id: u64, // TODO symmetric key exchange here }