diff --git a/src/http/server.rs b/src/http/server.rs index d43305f8..0854c11a 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -33,6 +33,7 @@ type WebSocketSenders = Arc>; type WebSocketSender = tokio::sync::mpsc::Sender; type PathBindings = Arc>>; +type WsPathBindings = Arc>>; struct BoundPath { pub app: ProcessId, @@ -42,6 +43,12 @@ struct BoundPath { pub static_content: Option, // TODO store in filesystem and cache } +struct BoundWsPath { + pub app: ProcessId, + pub authenticated: bool, + pub local_only: bool, +} + /// HTTP server: a runtime module that handles HTTP requests at a given port. /// The server accepts bindings-requests from apps. These can be used in two ways: /// @@ -84,11 +91,17 @@ pub async fn http_server( bindings_map.add("/rpc:sys:uqbar/message", rpc_bound_path); let path_bindings: PathBindings = Arc::new(RwLock::new(bindings_map)); + + // ws path bindings + let mut ws_bindings_map: Router = Router::new(); + let ws_path_bindings: WsPathBindings = Arc::new(RwLock::new(ws_bindings_map)); + tokio::spawn(serve( our_name.clone(), our_port, http_response_senders.clone(), path_bindings.clone(), + ws_path_bindings.clone(), ws_senders.clone(), encoded_keyfile.clone(), jwt_secret_bytes.clone(), @@ -102,6 +115,7 @@ pub async fn http_server( km, http_response_senders.clone(), path_bindings.clone(), + ws_path_bindings.clone(), ws_senders.clone(), send_to_loop.clone(), ) @@ -117,6 +131,7 @@ async fn serve( our_port: u16, http_response_senders: HttpResponseSenders, path_bindings: PathBindings, + ws_path_bindings: WsPathBindings, ws_senders: WebSocketSenders, encoded_keyfile: Arc>, jwt_secret_bytes: Arc>, @@ -135,33 +150,16 @@ async fn serve( let cloned_our = our.clone(); let cloned_jwt_secret_bytes = jwt_secret_bytes.clone(); let cloned_print_tx = print_tx.clone(); - let ws_route = warp::path::end() - .and(warp::ws()) + let ws_route = warp::ws() + .and(warp::path::full()) + .and(warp::filters::header::headers_cloned()) .and(warp::any().map(move || cloned_our.clone())) .and(warp::any().map(move || cloned_jwt_secret_bytes.clone())) .and(warp::any().map(move || ws_senders.clone())) + .and(warp::any().map(move || ws_path_bindings.clone())) .and(warp::any().map(move || cloned_msg_tx.clone())) .and(warp::any().map(move || cloned_print_tx.clone())) - .map( - |ws_connection: Ws, - our: Arc, - jwt_secret_bytes: Arc>, - ws_senders: WebSocketSenders, - send_to_loop: MessageSender, - print_tx: PrintSender| { - ws_connection.on_upgrade(move |ws: WebSocket| async move { - maintain_websocket( - ws, - our, - jwt_secret_bytes, - ws_senders, - send_to_loop, - print_tx, - ) - .await - }) - }, - ); + .map(ws_handler); // filter to receive and handle login requests let cloned_our = our.clone(); @@ -247,6 +245,147 @@ async fn login_handler( } } +async fn ws_handler ( + ws_connection: Ws, + path: warp::path::FullPath, + headers: warp::http::HeaderMap, + our: Arc, + jwt_secret_bytes: Arc>, + ws_senders: WebSocketSenders, + ws_path_bindings: WsPathBindings, + send_to_loop: MessageSender, + print_tx: PrintSender, +) { + + let original_path = normalize_path(path.as_str()); + let _ = print_tx.send(Printout{ + verbosity: 1, + content: format!("got ws request for {original_path}") + }); + + let serialized_headers = serialize_headers(&headers); + let ws_path_bindings = ws_path_bindings.read().await; + + if let Ok(route) = ws_path_bindings.recognize(&original_path) { + let bound_path = route.handler(); + + if bound_path.authenticated { + match serialized_headers.get("cookie") { + Some(auth_token) => { + if let Ok(our_name) = verify_auth_token(&auth_token, &jwt_secret_bytes) { + if our_name != *our { + return; + } + } else { + return; + } + } + None => { + return; + } + } + } + + ws_connection.on_upgrade(move |ws: WebSocket| async move { + + let (mut write_stream, mut read_stream) = ws.split(); + + let _ = print_tx.send(Printout{ + verbosity: 1, + content: format!("got new client websocket connection") + }).await; + + let ws_channel_id: u32 = rand::random(); + let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100); + + ws_senders.insert(ws_channel_id, (bound_path.app.clone(), ws_sender)); + + let _ = send_to_loop + .send(KernelMessage { + id: rand::random(), + source: Address { + node: our.to_string(), + process: HTTP_SERVER_PROCESS_ID.clone(), + }, + target: Address { + node: our.clone().to_string(), + process: bound_path.app.clone(), + }, + rsvp: None, + message: Message::Request(Request { + inherit: false, + expects_response: None, + ipc: serde_json::to_vec(&HttpServerRequest::WebSocketOpen(ws_channel_id)).unwrap(), + metadata: Some("ws".into()), + }), + payload: None, + signed_capabilities: None, + }).await; + + let _ = print_tx + .send(Printout { + verbosity: 1, + content: format!("websocket channel {ws_channel_id} opened"), + }); + + loop { + tokio::select! { + read = read_stream.next() => { + match read { + _ => { + websocket_close(ws_channel_id, bound_path.app.clone(), &ws_senders, &send_to_loop).await; + } + Some(Ok(msg)) => { + let _ = send_to_loop.send(KernelMessage { + id: rand::random(), + source: Address { + node: our.to_string(), + process: HTTP_SERVER_PROCESS_ID.clone(), + }, + target: Address { + node: our.to_string(), + process: bound_path.app.clone(), + }, + rsvp: None, + message: Message::Request(Request { + inherit: false, + expects_response: None, + ipc: serde_json::to_vec(&HttpServerRequest::WebSocketPush { + channel_id: ws_channel_id, + message_type: WsMessageType::Binary, + }).unwrap(), + metadata: Some("ws".into()), + }), + payload: Some(Payload { + mime: None, + bytes: msg.into_bytes(), + }), + signed_capabilities: None, + }); + } + } + } + Some(outgoing) = ws_receiver.recv() => { + match write_stream.send(outgoing).await { + Ok(()) => continue, + Err(_) => { + websocket_close(ws_channel_id, bound_path.app.clone(), &ws_senders, &send_to_loop).await; + } + } + } + } + } + let stream = write_stream.reunite(read_stream).unwrap(); + let _ = stream.close().await; + }); + + } else { + + return; + + } +} + async fn http_handler( method: warp::http::Method, socket_addr: Option, @@ -521,6 +660,7 @@ async fn maintain_websocket( our: Arc, jwt_secret_bytes: Arc>, ws_senders: WebSocketSenders, + path_bindings: PathBindings, send_to_loop: MessageSender, print_tx: PrintSender, ) { @@ -754,6 +894,7 @@ async fn handle_app_message( km: KernelMessage, http_response_senders: HttpResponseSenders, path_bindings: PathBindings, + ws_path_bindings: WsPathBindings, ws_senders: WebSocketSenders, send_to_loop: MessageSender, ) { @@ -821,6 +962,21 @@ async fn handle_app_message( return; }; match message { + HttpServerAction::BindWs { + mut path, + authenticated, + local_only, + } => { + let mut ws_path_bindings = ws_path_bindings.write().await; + ws_path_bindings.add( + &normalize_path(&path), + BoundWsPath { + app: km.source.process.clone(), + authenticated, + local_only, + } + ); + } HttpServerAction::Bind { mut path, authenticated, diff --git a/src/http/types.rs b/src/http/types.rs index 23e2b118..4d11e977 100644 --- a/src/http/types.rs +++ b/src/http/types.rs @@ -83,6 +83,13 @@ pub enum HttpClientError { /// with the shape Result<(), HttpServerActionError> serialized to JSON. #[derive(Debug, Serialize, Deserialize)] pub enum HttpServerAction { + // Binding an address for websockets. Doesn't need a cache since does not serve assets + // and might not need local_only. + BindWs { + path: String, + authenticated: bool, + local_only: bool, + }, /// Bind expects a payload if and only if `cache` is TRUE. The payload should /// be the static file to serve at this path. Bind {