diff --git a/src/http/server.rs b/src/http/server.rs index 0854c11a..5d44e337 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -45,8 +45,9 @@ struct BoundPath { struct BoundWsPath { pub app: ProcessId, + pub secure_subdomain: Option, pub authenticated: bool, - pub local_only: bool, + pub encrypted: bool, // TODO use } /// HTTP server: a runtime module that handles HTTP requests at a given port. @@ -91,10 +92,8 @@ pub async fn http_server( bindings_map.add("/rpc:sys:uqbar/message", rpc_bound_path); let path_bindings: PathBindings = Arc::new(RwLock::new(bindings_map)); - // ws path bindings - let mut ws_bindings_map: Router = Router::new(); - let ws_path_bindings: WsPathBindings = Arc::new(RwLock::new(ws_bindings_map)); + let ws_path_bindings: WsPathBindings = Arc::new(RwLock::new(Router::new())); tokio::spawn(serve( our_name.clone(), @@ -152,6 +151,7 @@ async fn serve( let cloned_print_tx = print_tx.clone(); let ws_route = warp::ws() .and(warp::path::full()) + .and(warp::filters::host::optional()) .and(warp::filters::header::headers_cloned()) .and(warp::any().map(move || cloned_our.clone())) .and(warp::any().map(move || cloned_jwt_secret_bytes.clone())) @@ -159,7 +159,7 @@ async fn serve( .and(warp::any().map(move || ws_path_bindings.clone())) .and(warp::any().map(move || cloned_msg_tx.clone())) .and(warp::any().map(move || cloned_print_tx.clone())) - .map(ws_handler); + .and_then(ws_handler); // filter to receive and handle login requests let cloned_our = our.clone(); @@ -245,9 +245,10 @@ async fn login_handler( } } -async fn ws_handler ( +async fn ws_handler( ws_connection: Ws, path: warp::path::FullPath, + host: Option, headers: warp::http::HeaderMap, our: Arc, jwt_secret_bytes: Arc>, @@ -255,135 +256,67 @@ async fn ws_handler ( ws_path_bindings: WsPathBindings, send_to_loop: MessageSender, print_tx: PrintSender, -) { - +) -> Result { let original_path = normalize_path(path.as_str()); - let _ = print_tx.send(Printout{ - verbosity: 1, - content: format!("got ws request for {original_path}") + let _ = print_tx.send(Printout { + verbosity: 1, + content: format!("got ws request for {original_path}"), }); let serialized_headers = serialize_headers(&headers); let ws_path_bindings = ws_path_bindings.read().await; - if let Ok(route) = ws_path_bindings.recognize(&original_path) { - let bound_path = route.handler(); + let Ok(route) = ws_path_bindings.recognize(&original_path) else { + return Err(warp::reject::not_found()); + }; - if bound_path.authenticated { - match serialized_headers.get("cookie") { - Some(auth_token) => { - if let Ok(our_name) = verify_auth_token(&auth_token, &jwt_secret_bytes) { - if our_name != *our { - return; - } - } else { - return; - } - } - None => { - return; - } - } + let bound_path = route.handler(); + if let Some(ref subdomain) = bound_path.secure_subdomain { + let _ = print_tx + .send(Printout { + verbosity: 1, + content: format!( + "got request for path {original_path} bound by subdomain {subdomain}" + ), + }) + .await; + // assert that host matches what this app wants it to be + if host.is_none() { + return Err(warp::reject::not_found()); + } + let host = host.as_ref().unwrap(); + // parse out subdomain from host (there can only be one) + let request_subdomain = host.host().split('.').next().unwrap_or(""); + if request_subdomain != subdomain { + return Err(warp::reject::not_found()); } - - ws_connection.on_upgrade(move |ws: WebSocket| async move { - - let (mut write_stream, mut read_stream) = ws.split(); - - let _ = print_tx.send(Printout{ - verbosity: 1, - content: format!("got new client websocket connection") - }).await; - - let ws_channel_id: u32 = rand::random(); - let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100); - - ws_senders.insert(ws_channel_id, (bound_path.app.clone(), ws_sender)); - - let _ = send_to_loop - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our.to_string(), - process: HTTP_SERVER_PROCESS_ID.clone(), - }, - target: Address { - node: our.clone().to_string(), - process: bound_path.app.clone(), - }, - rsvp: None, - message: Message::Request(Request { - inherit: false, - expects_response: None, - ipc: serde_json::to_vec(&HttpServerRequest::WebSocketOpen(ws_channel_id)).unwrap(), - metadata: Some("ws".into()), - }), - payload: None, - signed_capabilities: None, - }).await; - - let _ = print_tx - .send(Printout { - verbosity: 1, - content: format!("websocket channel {ws_channel_id} opened"), - }); - - loop { - tokio::select! { - read = read_stream.next() => { - match read { - _ => { - websocket_close(ws_channel_id, bound_path.app.clone(), &ws_senders, &send_to_loop).await; - } - Some(Ok(msg)) => { - let _ = send_to_loop.send(KernelMessage { - id: rand::random(), - source: Address { - node: our.to_string(), - process: HTTP_SERVER_PROCESS_ID.clone(), - }, - target: Address { - node: our.to_string(), - process: bound_path.app.clone(), - }, - rsvp: None, - message: Message::Request(Request { - inherit: false, - expects_response: None, - ipc: serde_json::to_vec(&HttpServerRequest::WebSocketPush { - channel_id: ws_channel_id, - message_type: WsMessageType::Binary, - }).unwrap(), - metadata: Some("ws".into()), - }), - payload: Some(Payload { - mime: None, - bytes: msg.into_bytes(), - }), - signed_capabilities: None, - }); - } - } - } - Some(outgoing) = ws_receiver.recv() => { - match write_stream.send(outgoing).await { - Ok(()) => continue, - Err(_) => { - websocket_close(ws_channel_id, bound_path.app.clone(), &ws_senders, &send_to_loop).await; - } - } - } - } - } - let stream = write_stream.reunite(read_stream).unwrap(); - let _ = stream.close().await; - }); - - } else { - - return; - } + + if bound_path.authenticated { + let Some(auth_token) = serialized_headers.get("cookie") else { + return Err(warp::reject::not_found()); + }; + let Ok(our_name) = verify_auth_token(&auth_token, &jwt_secret_bytes) else { + return Err(warp::reject::not_found()); + }; + if our_name != *our { + return Err(warp::reject::not_found()); + } + } + + let app = bound_path.app.clone(); + Ok(ws_connection.on_upgrade(move |ws: WebSocket| async move { + maintain_websocket( + ws, + our.clone(), + app, + jwt_secret_bytes.clone(), + ws_senders.clone(), + send_to_loop.clone(), + print_tx.clone(), + ) + .await; + })) } async fn http_handler( @@ -658,17 +591,13 @@ async fn handle_rpc_message( async fn maintain_websocket( ws: WebSocket, our: Arc, - jwt_secret_bytes: Arc>, + app: ProcessId, + _jwt_secret_bytes: Arc>, // TODO use for encrypted channels ws_senders: WebSocketSenders, - path_bindings: PathBindings, send_to_loop: MessageSender, print_tx: PrintSender, ) { let (mut write_stream, mut read_stream) = ws.split(); - - // first, receive a message from client that contains the target process - // and the auth token - let _ = print_tx .send(Printout { verbosity: 1, @@ -676,72 +605,10 @@ async fn maintain_websocket( }) .await; - let Some(Ok(register_msg)) = read_stream.next().await else { - // stream closed, exit - let _ = print_tx - .send(Printout { - verbosity: 1, - content: format!("client failed to send registration message"), - }) - .await; - let stream = write_stream.reunite(read_stream).unwrap(); - let _ = stream.close().await; - return; - }; - - let Ok(ws_register) = serde_json::from_slice::(register_msg.as_bytes()) else { - // stream error, exit - let _ = print_tx - .send(Printout { - verbosity: 1, - content: format!("couldn't parse registration message from client"), - }) - .await; - let stream = write_stream.reunite(read_stream).unwrap(); - let _ = stream.close().await; - return; - }; - - let Ok(owner_process) = ProcessId::from_str(&ws_register.target_process) else { - // invalid process id, exit - let _ = print_tx - .send(Printout { - verbosity: 1, - content: format!("client sent malformed process ID"), - }) - .await; - let stream = write_stream.reunite(read_stream).unwrap(); - let _ = stream.close().await; - return; - }; - - let _ = print_tx - .send(Printout { - verbosity: 1, - content: format!("channel is intended for {owner_process}"), - }) - .await; - - let Ok(our_name) = verify_auth_token(&ws_register.auth_token, &jwt_secret_bytes) else { - // invalid auth token, exit - let stream = write_stream.reunite(read_stream).unwrap(); - let _ = stream.close().await; - return; - }; - - if our_name != *our { - // invalid auth token, exit - let stream = write_stream.reunite(read_stream).unwrap(); - let _ = stream.close().await; - return; - } - let ws_channel_id: u32 = rand::random(); let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100); - ws_senders.insert(ws_channel_id, (owner_process.clone(), ws_sender)); + ws_senders.insert(ws_channel_id, (app.clone(), ws_sender)); - // send a message to the process associated with this channel - // notifying them that the channel is now open let _ = send_to_loop .send(KernelMessage { id: rand::random(), @@ -750,8 +617,8 @@ async fn maintain_websocket( process: HTTP_SERVER_PROCESS_ID.clone(), }, target: Address { - node: our.to_string(), - process: owner_process.clone(), + node: our.clone().to_string(), + process: app.clone(), }, rsvp: None, message: Message::Request(Request { @@ -765,83 +632,53 @@ async fn maintain_websocket( }) .await; - // respond to the client notifying them that the channel is now open - let Ok(()) = write_stream - .send(warp::ws::Message::text( - serde_json::to_string(&WsRegisterResponse { - channel_id: ws_channel_id, - }) - .unwrap(), - )) - .await - else { - // stream error, exit - let stream = write_stream.reunite(read_stream).unwrap(); - let _ = stream.close().await; - return; - }; - - let _ = print_tx - .send(Printout { - verbosity: 1, - content: format!("websocket channel {ws_channel_id} opened"), - }) - .await; - + let _ = print_tx.send(Printout { + verbosity: 1, + content: format!("websocket channel {ws_channel_id} opened"), + }); loop { tokio::select! { read = read_stream.next() => { match read { - None => { - // stream closed, remove and exit - websocket_close(ws_channel_id, owner_process, &ws_senders, &send_to_loop).await; - break; - } - Some(Err(_e)) => { - // stream error, remove and exit - websocket_close(ws_channel_id, owner_process, &ws_senders, &send_to_loop).await; - break; - } Some(Ok(msg)) => { - // forward message to process associated with this channel - let _ = send_to_loop - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our.to_string(), - process: HTTP_SERVER_PROCESS_ID.clone(), - }, - target: Address { - node: our.to_string(), - process: owner_process.clone(), - }, - rsvp: None, - message: Message::Request(Request { - inherit: false, - expects_response: None, - ipc: serde_json::to_vec(&HttpServerRequest::WebSocketPush { - channel_id: ws_channel_id, - message_type: WsMessageType::Binary, - }).unwrap(), - metadata: Some("ws".into()), - }), - payload: Some(Payload { - mime: None, - bytes: msg.into_bytes(), - }), - signed_capabilities: None, - }) - .await; + let _ = send_to_loop.send(KernelMessage { + id: rand::random(), + source: Address { + node: our.to_string(), + process: HTTP_SERVER_PROCESS_ID.clone(), + }, + target: Address { + node: our.to_string(), + process: app.clone(), + }, + rsvp: None, + message: Message::Request(Request { + inherit: false, + expects_response: None, + ipc: serde_json::to_vec(&HttpServerRequest::WebSocketPush { + channel_id: ws_channel_id, + message_type: WsMessageType::Binary, + }).unwrap(), + metadata: Some("ws".into()), + }), + payload: Some(Payload { + mime: None, + bytes: msg.into_bytes(), + }), + signed_capabilities: None, + }); + } + _ => { + websocket_close(ws_channel_id, app.clone(), &ws_senders, &send_to_loop).await; + break; } } } Some(outgoing) = ws_receiver.recv() => { - // forward message to websocket match write_stream.send(outgoing).await { Ok(()) => continue, - Err(_e) => { - // stream error, remove and exit - websocket_close(ws_channel_id, owner_process, &ws_senders, &send_to_loop).await; + Err(_) => { + websocket_close(ws_channel_id, app.clone(), &ws_senders, &send_to_loop).await; break; } } @@ -962,21 +799,6 @@ async fn handle_app_message( return; }; match message { - HttpServerAction::BindWs { - mut path, - authenticated, - local_only, - } => { - let mut ws_path_bindings = ws_path_bindings.write().await; - ws_path_bindings.add( - &normalize_path(&path), - BoundWsPath { - app: km.source.process.clone(), - authenticated, - local_only, - } - ); - } HttpServerAction::Bind { mut path, authenticated, @@ -1074,7 +896,38 @@ async fn handle_app_message( } send_action_response(km.id, km.source, &send_to_loop, Ok(())).await; } - HttpServerAction::WebSocketOpen(_) => { + HttpServerAction::WebSocketBind { + path, + authenticated, + encrypted, + } => { + let mut ws_path_bindings = ws_path_bindings.write().await; + ws_path_bindings.add( + &normalize_path(&path), + BoundWsPath { + app: km.source.process.clone(), + secure_subdomain: None, + authenticated, + encrypted, + }, + ); + } + HttpServerAction::WebSocketSecureBind { path, encrypted } => { + let process_id_hash = + format!("{:x}", Sha256::digest(km.source.process.to_string())); + let subdomain = process_id_hash.split_at(32).0.to_owned(); + let mut ws_path_bindings = ws_path_bindings.write().await; + ws_path_bindings.add( + &normalize_path(&path), + BoundWsPath { + app: km.source.process.clone(), + secure_subdomain: Some(subdomain), + authenticated: true, + encrypted, + }, + ); + } + HttpServerAction::WebSocketOpen { .. } => { // we cannot receive these, only send them to processes send_action_response( km.id, diff --git a/src/http/types.rs b/src/http/types.rs index 4d11e977..5a331dc2 100644 --- a/src/http/types.rs +++ b/src/http/types.rs @@ -83,13 +83,6 @@ pub enum HttpClientError { /// with the shape Result<(), HttpServerActionError> serialized to JSON. #[derive(Debug, Serialize, Deserialize)] pub enum HttpServerAction { - // Binding an address for websockets. Doesn't need a cache since does not serve assets - // and might not need local_only. - BindWs { - path: String, - authenticated: bool, - local_only: bool, - }, /// Bind expects a payload if and only if `cache` is TRUE. The payload should /// be the static file to serve at this path. Bind { @@ -118,10 +111,30 @@ pub enum HttpServerAction { /// payload bytes and serve them as the response to any request to this path. cache: bool, }, + /// Bind a path to receive incoming WebSocket connections. + /// Doesn't need a cache since does not serve assets. + WebSocketBind { + path: String, + authenticated: bool, + encrypted: bool, + }, + /// SecureBind is the same as Bind, except that it forces new connections to be made + /// from the unique subdomain of the process that bound the path. These are *always* + /// authenticated. Since the subdomain is unique, it will require the user to be + /// logged in separately to the general domain authentication. + WebSocketSecureBind { + path: String, + encrypted: bool, + }, /// Processes will RECEIVE this kind of request when a client connects to them. /// If a process does not want this websocket open, they should issue a *request* /// containing a [`type@HttpServerAction::WebSocketClose`] message and this channel ID. - WebSocketOpen(u32), + WebSocketOpen { + path: String, + channel_id: u32, + authenticated: bool, + encrypted: bool, + }, /// When sent, expects a payload containing the WebSocket message bytes to send. WebSocketPush { channel_id: u32,