From 3fdd605c181a008aed283319fff7fa0d89789f18 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Fri, 17 Nov 2023 19:28:51 -0500 Subject: [PATCH] WIP not compiling, httpserver rewrite --- src/http/server.rs | 807 +++++++++++++++++++-------------------------- src/http/types.rs | 129 +++++--- src/http/utils.rs | 30 +- src/types.rs | 230 +++++++++++-- 4 files changed, 632 insertions(+), 564 deletions(-) diff --git a/src/http/server.rs b/src/http/server.rs index e0cbafc5..9cb7f390 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -3,22 +3,33 @@ use crate::http::utils::*; use crate::register; use crate::types::*; use anyhow::Result; -use futures::SinkExt; -use futures::StreamExt; +use dashmap::DashMap; +use futures::{SinkExt, StreamExt}; use route_recognizer::Router; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; -use tokio::sync::oneshot; -use tokio::sync::Mutex; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use warp::http::{header::HeaderValue, StatusCode}; use warp::ws::{WebSocket, Ws}; use warp::{Filter, Reply}; -// types and constants -type HttpSender = tokio::sync::oneshot::Sender; -type HttpResponseSenders = Arc>>; +const HTTP_SELF_IMPOSED_TIMEOUT: u64 = 15; + +/// mapping from a given HTTP request (assigned an ID) to the oneshot +/// channel that will get a response from the app that handles the request, +/// and a string which contains the path that the request was made to. +type HttpResponseSenders = Arc>; +type HttpSender = tokio::sync::oneshot::Sender<(HttpResponse, Vec)>; + +/// mapping from an open websocket connection to a channel that will ingest +/// WebSocketPush messages from the app that handles the connection, and +/// send them to the connection. +type WebSocketSenders = Arc>; +type WebSocketSender = tokio::sync::mpsc::Sender<(WsMessageType, Vec)>; + +type StaticAssets = Arc>>; + type PathBindings = Arc>>; /// HTTP server: a runtime module that handles HTTP requests at a given port. @@ -44,8 +55,10 @@ pub async fn http_server( send_to_loop: MessageSender, print_tx: PrintSender, ) -> Result<()> { - let http_response_senders = Arc::new(Mutex::new(HashMap::new())); - let websockets: WebSockets = Arc::new(Mutex::new(HashMap::new())); + let our_name = Arc::new(our_name); + let jwt_secret_bytes = Arc::new(jwt_secret_bytes); + let http_response_senders: HttpResponseSenders = Arc::new(DashMap::new()); + let ws_senders: WebSocketSenders = Arc::new(DashMap::new()); // Add RPC path let mut bindings_map: Router = Router::new(); @@ -59,85 +72,295 @@ pub async fn http_server( let path_bindings: PathBindings = Arc::new(RwLock::new(bindings_map)); - let _ = tokio::join!( - http_serve( - our_name.clone(), - our_port, + tokio::spawn(serve( + our_name.clone(), + our_port, + http_response_senders.clone(), + path_bindings.clone(), + ws_senders.clone(), + jwt_secret_bytes.clone(), + send_to_loop.clone(), + print_tx.clone(), + )); + + while let Some(km) = recv_in_server.recv().await { + // we *can* move this into a dedicated task, but it's not necessary + handle_app_message( + km, http_response_senders.clone(), path_bindings.clone(), - websockets.clone(), + ws_senders.clone(), jwt_secret_bytes.clone(), send_to_loop.clone(), - print_tx.clone() - ), - async move { - while let Some(kernel_message) = recv_in_server.recv().await { - let KernelMessage { - id, - source, - message, - payload, - .. - } = kernel_message; - - if let Err(e) = http_handle_messages( - our_name.clone(), - id, - source.clone(), - message, - payload, - http_response_senders.clone(), - path_bindings.clone(), - websockets.clone(), - jwt_secret_bytes.clone(), - send_to_loop.clone(), - print_tx.clone(), - ) - .await - { - send_to_loop - .send(make_error_message(our_name.clone(), id, source.clone(), e)) - .await - .unwrap(); - } - } - } - ); - Err(anyhow::anyhow!("http_server: exited")) + print_tx.clone(), + ) + .await; + } + return Err(anyhow::anyhow!("http_server: http_server loop exited")); } -async fn handle_websocket( - ws: WebSocket, - our: String, - jwt_secret_bytes: Vec, - websockets: WebSockets, +/// The 'server' part. Listens on a port assigned by runtime, and handles +/// all HTTP requests on it. Also allows incoming websocket connections. +async fn serve( + our: Arc, + our_port: u16, + http_response_senders: HttpResponseSenders, + path_bindings: PathBindings, + ws_senders: WebSocketSenders, + jwt_secret_bytes: Arc>, send_to_loop: MessageSender, print_tx: PrintSender, ) { - let (write_stream, mut read_stream) = ws.split(); - let write_stream = Arc::new(Mutex::new(write_stream)); + let _ = print_tx + .send(Printout { + verbosity: 0, + content: format!("http_server: running on port {}", our_port), + }) + .await; + + // Filter to receive websockets + let cloned_msg_tx = send_to_loop.clone(); + let cloned_our = our.clone(); + let cloned_jwt_secret_bytes = jwt_secret_bytes.clone(); + let ws_route = warp::path::end() + .and(warp::ws()) + .and(warp::any().map(move || cloned_our.clone())) + .and(warp::any().map(move || cloned_jwt_secret_bytes.clone())) + .and(warp::any().map(move || ws_senders.clone())) + .and(warp::any().map(move || cloned_msg_tx.clone())) + .map( + |ws_connection: Ws, + our: Arc, + jwt_secret_bytes: Arc>, + ws_senders: WebSocketSenders, + send_to_loop: MessageSender| { + ws_connection.on_upgrade(move |ws: WebSocket| async move { + maintain_websocket(ws, our, jwt_secret_bytes, ws_senders, send_to_loop).await + }) + }, + ); + // Filter to receive HTTP requests + let filter = warp::filters::method::method() + .and(warp::addr::remote()) + .and(warp::path::full()) + .and(warp::filters::header::headers_cloned()) + .and(warp::filters::body::bytes()) + .and(warp::any().map(move || our.clone())) + .and(warp::any().map(move || http_response_senders.clone())) + .and(warp::any().map(move || path_bindings.clone())) + .and(warp::any().map(move || jwt_secret_bytes.clone())) + .and(warp::any().map(move || send_to_loop.clone())) + .and_then(http_handler); + + let filter_with_ws = ws_route.or(filter); + warp::serve(filter_with_ws) + .run(([0, 0, 0, 0], our_port)) + .await; +} + +async fn http_handler( + method: warp::http::Method, + socket_addr: Option, + path: warp::path::FullPath, + headers: warp::http::HeaderMap, + body: warp::hyper::body::Bytes, + our: Arc, + http_response_senders: HttpResponseSenders, + path_bindings: PathBindings, + jwt_secret_bytes: Arc>, + send_to_loop: MessageSender, +) -> Result { + // TODO this is all so dirty. Figure out what actually matters. + + // trim trailing "/" + let original_path = normalize_path(path.as_str()); + let id: u64 = rand::random(); + let serialized_headers = serialize_headers(&headers); + let path_bindings = path_bindings.read().await; + + let Ok(route) = path_bindings.recognize(&original_path) else { + return Ok(warp::reply::with_status(vec![], StatusCode::NOT_FOUND).into_response()); + }; + let bound_path = route.handler(); + + if bound_path.authenticated { + let auth_token = serialized_headers + .get("cookie") + .cloned() + .unwrap_or_default(); + if !auth_cookie_valid(&our, &auth_token, &jwt_secret_bytes) { + return Ok(warp::reply::with_status(vec![], StatusCode::UNAUTHORIZED).into_response()); + } + } + + let is_local = socket_addr + .map(|addr| addr.ip().is_loopback()) + .unwrap_or(false); + + if bound_path.local_only && !is_local { + return Ok(warp::reply::with_status(vec![], StatusCode::FORBIDDEN).into_response()); + } + + // RPC functionality: if path is /rpc:sys:uqbar/message, + // we extract message from base64 encoded bytes in data + // and send it to the correct app. + let message = if bound_path.app == "rpc:sys:uqbar" { + match handle_rpc_message(our, id, body).await { + Ok(message) => message, + Err(e) => { + return Ok(warp::reply::with_status(vec![], e).into_response()); + } + } + } else { + // otherwise, make a message to the correct app + KernelMessage { + id, + source: Address { + node: our.to_string(), + process: HTTP_SERVER_PROCESS_ID.clone(), + }, + target: Address { + node: our.to_string(), + process: bound_path.app.clone(), + }, + rsvp: None, + message: Message::Request(Request { + inherit: false, + expects_response: Some(HTTP_SELF_IMPOSED_TIMEOUT), + ipc: serde_json::to_vec(&IncomingHttpRequest { + source_socket_addr: socket_addr.map(|addr| addr.to_string()), + method: method.to_string(), + raw_path: original_path.clone(), + headers: serialized_headers, + }) + .unwrap(), + metadata: None, + }), + payload: Some(Payload { + mime: None, + bytes: body.to_vec(), + }), + signed_capabilities: None, + } + }; + + let (response_sender, response_receiver) = tokio::sync::oneshot::channel(); + http_response_senders.insert(id, (original_path, response_sender)); + + match send_to_loop.send(message).await { + Ok(_) => {} + Err(_) => { + return Ok( + warp::reply::with_status(vec![], StatusCode::INTERNAL_SERVER_ERROR).into_response(), + ); + } + } + + let timeout_duration = tokio::time::Duration::from_secs(HTTP_SELF_IMPOSED_TIMEOUT); + let result = tokio::time::timeout(timeout_duration, response_receiver).await; + + let (http_response, body) = match result { + Ok(Ok(res)) => res, + Ok(Err(_)) => { + return Ok( + warp::reply::with_status(vec![], StatusCode::INTERNAL_SERVER_ERROR).into_response(), + ); + } + Err(_) => { + return Ok( + warp::reply::with_status(vec![], StatusCode::REQUEST_TIMEOUT).into_response(), + ); + } + }; + + let reply = warp::reply::with_status( + body, + StatusCode::from_u16(http_response.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), + ); + let mut response = reply.into_response(); + + // Merge the deserialized headers into the existing headers + let existing_headers = response.headers_mut(); + for (header_name, header_value) in deserialize_headers(http_response.headers).iter() { + if header_name == "set-cookie" || header_name == "Set-Cookie" { + if let Ok(cookie) = header_value.to_str() { + let cookie_headers: Vec<&str> = cookie + .split("; ") + .filter(|&cookie| !cookie.is_empty()) + .collect(); + for cookie_header in cookie_headers { + if let Ok(valid_cookie) = HeaderValue::from_str(cookie_header) { + existing_headers.append(header_name, valid_cookie); + } + } + } + } else { + existing_headers.insert(header_name.to_owned(), header_value.to_owned()); + } + } + Ok(response) +} + +async fn handle_rpc_message( + our: Arc, + id: u64, + body: warp::hyper::body::Bytes, +) -> Result { + let Ok(rpc_message) = serde_json::from_slice::(&body) else { + return Err(StatusCode::BAD_REQUEST); + }; + + let Ok(target_process) = ProcessId::from_str(&rpc_message.process) else { + return Err(StatusCode::BAD_REQUEST); + }; + + Ok(KernelMessage { + id, + source: Address { + node: our.to_string(), + process: HTTP_SERVER_PROCESS_ID.clone(), + }, + target: Address { + node: rpc_message.node.unwrap_or(our.to_string()), + process: target_process, + }, + rsvp: None, + message: Message::Request(Request { + inherit: false, + expects_response: Some(15), // no effect on runtime + ipc: match rpc_message.ipc { + Some(ipc_string) => ipc_string.into_bytes(), + None => Vec::new(), + }, + metadata: rpc_message.metadata, + }), + payload: match base64::decode(rpc_message.data.unwrap_or("".to_string())) { + Ok(bytes) => Some(Payload { + mime: rpc_message.mime, + bytes, + }), + Err(_) => None, + }, + signed_capabilities: None, + }) +} + +async fn maintain_websocket( + ws: WebSocket, + our: Arc, + jwt_secret_bytes: Arc>, + ws_senders: WebSocketSenders, + send_to_loop: MessageSender, +) { + let (write_stream, mut read_stream) = ws.split(); - // How do we handle authentication? let ws_id: u64 = rand::random(); + let + while let Some(Ok(msg)) = read_stream.next().await { if msg.is_binary() { - let _ = print_tx - .send(Printout { - verbosity: 1, - content: "GOT WEBSOCKET BYTES".to_string(), - }) - .await; let bytes = msg.as_bytes(); - let _ = print_tx - .send(Printout { - verbosity: 1, - content: format!( - "WEBSOCKET MESSAGE (BYTES) {}", - String::from_utf8(bytes.to_vec()).unwrap_or_default() - ), - }) - .await; match serde_json::from_slice::(bytes) { Ok(parsed_msg) => { handle_incoming_ws( @@ -152,23 +375,10 @@ async fn handle_websocket( ) .await; } - Err(e) => { - let _ = print_tx - .send(Printout { - verbosity: 1, - content: format!("Failed to parse WebSocket message: {}", e), - }) - .await; - } + Err(e) => {} } } else if msg.is_text() { if let Ok(msg_str) = msg.to_str() { - let _ = print_tx - .send(Printout { - verbosity: 1, - content: format!("WEBSOCKET MESSAGE (TEXT): {}", msg_str), - }) - .await; if let Ok(parsed_msg) = serde_json::from_str(msg_str) { handle_incoming_ws( parsed_msg, @@ -185,6 +395,7 @@ async fn handle_websocket( } } else if msg.is_close() { // Delete the websocket from the map + ws_senders.remove(&ws_id); let mut ws_map = websockets.lock().await; for (node, node_map) in ws_map.iter_mut() { for (channel_id, id_map) in node_map.iter_mut() { @@ -205,53 +416,57 @@ async fn handle_websocket( } } -async fn http_handle_messages( - our: String, - id: u64, - source: Address, - message: Message, - payload: Option, +async fn handle_app_message( + km: KernelMessage, http_response_senders: HttpResponseSenders, path_bindings: PathBindings, - websockets: WebSockets, - ws_proxies: WebSocketProxies, - jwt_secret_bytes: Vec, + ws_senders: WebSocketSenders, + jwt_secret_bytes: Arc>, send_to_loop: MessageSender, print_tx: PrintSender, -) -> Result<(), HttpServerError> { - match message { - Message::Response((ref response, _)) => { +) { + // when we get a Response, try to match it to an outstanding HTTP + // request and send it there. + // when we get a Request, parse it into an HttpServerAction and perform it. + match km.message { + Message::Response((ref response, _context)) => { + let Some((_id, (path, sender))) = http_response_senders.remove(&km.id) else { + return + }; + // if path is /rpc/message, return accordingly with base64 encoded payload + if path == "/rpc:sys:uqbar/message" { + let payload = km.payload.map(|p| { + Payload { + mime: p.mime, + bytes: base64::encode(p.bytes).into_bytes(), + } + }); + + let mut default_headers = HashMap::new(); + default_headers.insert("Content-Type".to_string(), "text/html".to_string()); + + let _ = sender.send((HttpResponse { + status: 200, + headers: default_headers, + }, + serde_json::to_vec(&RpcResponseBody { + ipc: response.ipc, + payload, + }).unwrap(), + )); + } else { + let Ok(response) = serde_json::from_slice::(&response.ipc) else { + // the receiver will automatically trigger a 503 when sender is dropped. + return + }; + + } + let mut senders = http_response_senders.lock().await; match senders.remove(&id) { // if no corresponding entry, nowhere to send response None => {} Some((path, channel)) => { - // if path is /rpc/message, return accordingly with base64 encoded payload - if path == *"/rpc:sys:uqbar/message" { - let payload = payload.map(|p| { - let bytes = p.bytes; - let base64_bytes = base64::encode(bytes); - Payload { - mime: p.mime, - bytes: base64_bytes.into_bytes(), - } - }); - let body = serde_json::json!({ - "ipc": response.ipc, - "payload": payload - }) - .to_string() - .as_bytes() - .to_vec(); - let mut default_headers = HashMap::new(); - default_headers.insert("Content-Type".to_string(), "text/html".to_string()); - - let _ = channel.send(HttpResponse { - status: 200, - headers: default_headers, - body: Some(body), - }); - // error case here? } else { // else try deserializing ipc into a HttpResponse let json = serde_json::from_slice::(&response.ipc); @@ -506,343 +721,3 @@ async fn http_handle_messages( Ok(()) } - -// TODO: add a way to register a websocket connection (should be a Vector of websockets) -// Then forward websocket messages to the correct place -async fn http_serve( - our: String, - our_port: u16, - http_response_senders: HttpResponseSenders, - path_bindings: PathBindings, - websockets: WebSockets, - jwt_secret_bytes: Vec, - send_to_loop: MessageSender, - print_tx: PrintSender, -) { - let cloned_msg_tx = send_to_loop.clone(); - let cloned_print_tx = print_tx.clone(); - let cloned_our = our.clone(); - let cloned_jwt_secret_bytes = jwt_secret_bytes.clone(); - let ws_route = warp::path::end() - .and(warp::ws()) - .and(warp::any().map(move || cloned_our.clone())) - .and(warp::any().map(move || cloned_jwt_secret_bytes.clone())) - .and(warp::any().map(move || websockets.clone())) - .and(warp::any().map(move || cloned_msg_tx.clone())) - .and(warp::any().map(move || cloned_print_tx.clone())) - .map( - |ws_connection: Ws, - our: String, - jwt_secret_bytes: Vec, - websockets: WebSockets, - send_to_loop: MessageSender, - print_tx: PrintSender| { - ws_connection.on_upgrade(move |ws: WebSocket| async move { - handle_websocket( - ws, - our, - jwt_secret_bytes, - websockets, - send_to_loop, - print_tx, - ) - .await - }) - }, - ); - - let print_tx_move = print_tx.clone(); - let filter = warp::filters::method::method() - .and(warp::addr::remote()) - .and(warp::path::full()) - .and(warp::filters::header::headers_cloned()) - .and( - warp::filters::query::raw() - .or(warp::any().map(String::default)) - .unify() - .map(|query_string: String| { - if query_string.is_empty() { - HashMap::new() - } else { - match serde_urlencoded::from_str(&query_string) { - Ok(map) => map, - Err(_) => HashMap::new(), - } - } - }), - ) - .and(warp::filters::body::bytes()) - .and(warp::any().map(move || our.clone())) - .and(warp::any().map(move || http_response_senders.clone())) - .and(warp::any().map(move || path_bindings.clone())) - .and(warp::any().map(move || jwt_secret_bytes.clone())) - .and(warp::any().map(move || send_to_loop.clone())) - .and(warp::any().map(move || print_tx_move.clone())) - .and_then(handler); - - let filter_with_ws = ws_route.or(filter); - - let _ = print_tx - .send(Printout { - verbosity: 1, - content: format!("http_server: running on: {}", our_port), - }) - .await; - warp::serve(filter_with_ws) - .run(([0, 0, 0, 0], our_port)) - .await; -} - -async fn handler( - method: warp::http::Method, - address: Option, - path: warp::path::FullPath, - headers: warp::http::HeaderMap, - query_params: HashMap, - body: warp::hyper::body::Bytes, - our: String, - http_response_senders: HttpResponseSenders, - path_bindings: PathBindings, - jwt_secret_bytes: Vec, - send_to_loop: MessageSender, - _print_tx: PrintSender, -) -> Result { - let address = match address { - Some(a) => a.to_string(), - None => "".to_string(), - }; - // trim trailing "/" - let original_path = normalize_path(path.as_str()); - let id: u64 = rand::random(); - let real_headers = serialize_headers(&headers); - let path_bindings = path_bindings.read().await; - - let Ok(route) = path_bindings.recognize(&original_path) else { - return Ok(warp::reply::with_status(vec![], StatusCode::NOT_FOUND).into_response()); - }; - let bound_path = route.handler(); - - let app = bound_path.app.to_string(); - let url_params: HashMap<&str, &str> = route.params().into_iter().collect(); - let raw_path = remove_process_id(&original_path); - let path = remove_process_id(&bound_path.original_path); - - if bound_path.authenticated { - let auth_token = real_headers.get("cookie").cloned().unwrap_or_default(); - if !auth_cookie_valid(our.clone(), &auth_token, jwt_secret_bytes) { - // send 401 - return Ok(warp::reply::with_status(vec![], StatusCode::UNAUTHORIZED).into_response()); - } - } - - if bound_path.local_only && !address.starts_with("127.0.0.1:") { - // send 403 - return Ok(warp::reply::with_status(vec![], StatusCode::FORBIDDEN).into_response()); - } - - // RPC functionality: if path is /rpc:sys:uqbar/message, - // we extract message from base64 encoded bytes in data - // and send it to the correct app. - - let message = if app == *"rpc:sys:uqbar" { - let rpc_message: RpcMessage = match serde_json::from_slice(&body) { - // to_vec()? - Ok(v) => v, - Err(_) => { - return Ok( - warp::reply::with_status(vec![], StatusCode::BAD_REQUEST).into_response() - ); - } - }; - - let target_process = match ProcessId::from_str(&rpc_message.process) { - Ok(p) => p, - Err(_) => { - return Ok( - warp::reply::with_status(vec![], StatusCode::BAD_REQUEST).into_response() - ); - } - }; - - let payload = match base64::decode(rpc_message.data.unwrap_or("".to_string())) { - Ok(bytes) => Some(Payload { - mime: rpc_message.mime, - bytes, - }), - Err(_) => None, - }; - let node = match rpc_message.node { - Some(node_str) => node_str, - None => our.clone(), - }; - - let ipc_bytes: Vec = match rpc_message.ipc { - Some(ipc_string) => ipc_string.into_bytes(), - None => Vec::new(), - }; - - KernelMessage { - id, - source: Address { - node: our.clone(), - process: HTTP_SERVER_PROCESS_ID.clone(), - }, - target: Address { - node, - process: target_process, - }, - rsvp: Some(Address { - node: our.clone(), - process: HTTP_SERVER_PROCESS_ID.clone(), - }), - message: Message::Request(Request { - inherit: false, - expects_response: Some(15), // no effect on runtime - ipc: ipc_bytes, - metadata: rpc_message.metadata, - }), - payload, - signed_capabilities: None, - } - } else if app == *"encryptor:sys:uqbar" { - let body_json = match String::from_utf8(body.to_vec()) { - Ok(s) => s, - Err(_) => { - return Ok( - warp::reply::with_status(vec![], StatusCode::BAD_REQUEST).into_response() - ); - } - }; - - let body: serde_json::Value = match serde_json::from_str(&body_json) { - Ok(v) => v, - Err(_) => { - return Ok( - warp::reply::with_status(vec![], StatusCode::BAD_REQUEST).into_response() - ); - } - }; - - let channel_id = body["channel_id"].as_str().unwrap_or(""); - let public_key_hex = body["public_key_hex"].as_str().unwrap_or(""); - - KernelMessage { - id, - source: Address { - node: our.clone(), - process: HTTP_SERVER_PROCESS_ID.clone(), - }, - target: Address { - node: our.clone(), - process: ProcessId::from_str("encryptor:sys:uqbar").unwrap(), - }, - rsvp: None, //? - message: Message::Request(Request { - inherit: false, - expects_response: None, - ipc: serde_json::json!({ - "GetKeyAction": { - "channel_id": channel_id, - "public_key_hex": public_key_hex, - } - }) - .to_string() - .into_bytes(), - metadata: None, - }), - payload: None, - signed_capabilities: None, - } - } else { - // otherwise, make a message, to the correct app. - KernelMessage { - id, - source: Address { - node: our.clone(), - process: HTTP_SERVER_PROCESS_ID.clone(), - }, - target: Address { - node: our.clone(), - process: bound_path.app.clone(), - }, - rsvp: Some(Address { - node: our.clone(), - process: HTTP_SERVER_PROCESS_ID.clone(), - }), - message: Message::Request(Request { - inherit: false, - expects_response: Some(15), // no effect on runtime - ipc: serde_json::json!({ - "address": address, - "method": method.to_string(), - "raw_path": raw_path.clone(), - "path": path.clone(), - "headers": serialize_headers(&headers), - "query_params": query_params, - "url_params": url_params, - }) - .to_string() - .into_bytes(), - metadata: None, - }), - payload: Some(Payload { - mime: Some("application/octet-stream".to_string()), // TODO adjust MIME type as needed - bytes: body.to_vec(), - }), - signed_capabilities: None, - } - }; - let (response_sender, response_receiver) = oneshot::channel(); - http_response_senders - .lock() - .await - .insert(id, (original_path.clone(), response_sender)); - - send_to_loop.send(message).await.unwrap(); - let timeout_duration = tokio::time::Duration::from_secs(15); // adjust as needed - let result = tokio::time::timeout(timeout_duration, response_receiver).await; - - let from_channel = match result { - Ok(Ok(from_channel)) => from_channel, - Ok(Err(_)) => { - return Ok( - warp::reply::with_status(vec![], StatusCode::INTERNAL_SERVER_ERROR).into_response(), - ); - } - Err(_) => { - return Ok( - warp::reply::with_status(vec![], StatusCode::REQUEST_TIMEOUT).into_response(), - ); - } - }; - - let reply = warp::reply::with_status( - match from_channel.body { - Some(val) => val, - None => vec![], - }, - StatusCode::from_u16(from_channel.status).unwrap(), - ); - let mut response = reply.into_response(); - - // Merge the deserialized headers into the existing headers - let existing_headers = response.headers_mut(); - for (header_name, header_value) in deserialize_headers(from_channel.headers).iter() { - if header_name == "set-cookie" || header_name == "Set-Cookie" { - if let Ok(cookie) = header_value.to_str() { - let cookie_headers: Vec<&str> = cookie - .split("; ") - .filter(|&cookie| !cookie.is_empty()) - .collect(); - for cookie_header in cookie_headers { - if let Ok(valid_cookie) = HeaderValue::from_str(cookie_header) { - existing_headers.append(header_name, valid_cookie); - } - } - } - } else { - existing_headers.insert(header_name.clone(), header_value.clone()); - } - } - Ok(response) -} diff --git a/src/http/types.rs b/src/http/types.rs index 22df57b7..183e2915 100644 --- a/src/http/types.rs +++ b/src/http/types.rs @@ -1,11 +1,23 @@ -use crate::types::Address; +use crate::types::{Address, Payload}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use thiserror::Error; /// HTTP Request type that can be shared over WASM boundary to apps. +/// This is the one you receive from the http_server:sys:uqbar service. #[derive(Debug, Serialize, Deserialize)] -pub struct HttpRequest { +pub struct IncomingHttpRequest { + pub source_socket_addr: Option, // will parse to SocketAddr + pub method: String, // will parse to http::Method + pub raw_path: String, + pub headers: HashMap, + // BODY is stored in the payload, as bytes +} + +/// HTTP Request type that can be shared over WASM boundary to apps. +/// This is the one you send to the http_client:sys:uqbar service. +#[derive(Debug, Serialize, Deserialize)] +pub struct OutgoingHttpRequest { pub method: String, // must parse to http::Method pub version: Option, // must parse to http::Version pub url: String, // must parse to url::Url @@ -22,6 +34,12 @@ pub struct HttpResponse { // BODY is stored in the payload, as bytes } +#[derive(Debug, Serialize, Deserialize)] +pub struct RpcResponseBody { + pub ipc: Vec, + pub payload: Option, +} + #[derive(Error, Debug, Serialize, Deserialize)] pub enum HttpClientError { #[error("http_client: request could not be parsed to HttpRequest: {}.", req)] @@ -36,55 +54,67 @@ pub enum HttpClientError { RequestFailed { error: String }, } -#[derive(Error, Debug, Serialize, Deserialize)] -pub enum HttpServerError { - #[error("http_server: json is None")] - NoJson, - #[error("http_server: response not ok")] - ResponseError, - #[error("http_server: bytes are None")] - NoBytes, - #[error( - "http_server: JSON payload could not be parsed to HttpClientRequest: {error}. Got {:?}.", - json - )] - BadJson { json: String, error: String }, - #[error("http_server: path binding error: {:?}", error)] - PathBind { error: String }, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct JwtClaims { - pub username: String, - pub expiration: u64, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct WebSocketServerTarget { - pub node: String, - pub id: Option, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct WebSocketPush { - pub target: WebSocketServerTarget, - pub is_text: Option, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct ServerAction { - pub action: String, -} - +/// Request type sent to `http_server:sys:uqbar` in order to configure it. +/// You can also send [`WebSocketPush`], which allows you to push messages +/// across an existing open WebSocket connection. +/// +/// If a response is expected, all HttpServerActions will return a Response +/// with the shape Result<(), HttpServerActionError> serialized to JSON. #[derive(Debug, Serialize, Deserialize)] pub enum HttpServerAction { - BindPath { + /// Bind does not expect payload. + Bind { path: String, authenticated: bool, local_only: bool, }, - WebSocketPush(WebSocketPush), - ServerAction(ServerAction), + /// BindStatic expects a payload containing the static file to serve at this path. + BindStatic { + path: String, + authenticated: bool, + local_only: bool, + }, + /// Expects a payload containing the WebSocket message bytes to send. + WebSocketPush { + channel_id: String, + message_type: WsMessageType, + }, +} + +/// The possible message types for WebSocketPush. Ping and Pong are limited to 125 bytes +/// by the WebSockets protocol. Text will be sent as a Text frame, with the payload bytes +/// being the UTF-8 encoding of the string. Binary will be sent as a Binary frame containing +/// the unmodified payload bytes. +#[derive(Debug, Serialize, Deserialize)] +pub enum WsMessageType { + Text, + Binary, + Ping, + Pong, +} + +/// Part of the Response type issued by http_server +#[derive(Error, Debug, Serialize, Deserialize)] +pub enum HttpServerActionError { + #[error( + "http_server: request could not be parsed to HttpServerAction: {}.", + req + )] + BadRequest { req: String }, + #[error("http_server: action expected payload")] + NoPayload, + #[error("http_server: path binding error: {:?}", error)] + PathBindError { error: String }, + #[error("http_server: WebSocket error: {:?}", error)] + WebSocketPushError { error: String }, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum WebSocketClientMessage { + /// Must be the first message sent along a newly-opened WebSocket connection. + WsRegister(WsRegister), + WsMessage(WsMessage), + EncryptedWsMessage(EncryptedWsMessage), } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -113,9 +143,8 @@ pub struct EncryptedWsMessage { pub nonce: String, // Hex of the 12-byte nonce } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum WebSocketClientMessage { - WsRegister(WsRegister), - WsMessage(WsMessage), - EncryptedWsMessage(EncryptedWsMessage), +#[derive(Debug, Serialize, Deserialize)] +pub struct JwtClaims { + pub username: String, + pub expiration: u64, } diff --git a/src/http/utils.rs b/src/http/utils.rs index 164a50d2..1f5177d7 100644 --- a/src/http/utils.rs +++ b/src/http/utils.rs @@ -1,21 +1,16 @@ use crate::http::types::*; use crate::types::*; -use futures::stream::SplitSink; use hmac::{Hmac, Mac}; use jwt::{Error, VerifyWithKey}; use serde::{Deserialize, Serialize}; use sha2::Sha256; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use tokio::net::TcpListener; use tokio::sync::Mutex; use warp::http::{header::HeaderName, header::HeaderValue, HeaderMap}; use warp::ws::WebSocket; -pub type SharedWriteStream = Arc>>; -pub type WebSockets = Arc>>>>; -pub type WebSocketProxies = Arc>>>; - pub struct BoundPath { pub app: ProcessId, pub authenticated: bool, @@ -52,7 +47,7 @@ pub fn parse_auth_token(auth_token: String, jwt_secret: Vec) -> Result) -> bool { +pub fn auth_cookie_valid(our_node: &str, cookie: &str, jwt_secret: &[u8]) -> bool { let cookie_parts: Vec<&str> = cookie.split("; ").collect(); let mut auth_token = None; @@ -84,19 +79,6 @@ pub fn auth_cookie_valid(our_node: String, cookie: &str, jwt_secret: Vec) -> } } -pub fn remove_process_id(path: &str) -> String { - // Split the string into parts separated by '/' - let mut parts = path.splitn(3, '/'); - // Skip the first two parts (before and after the first '/') - let remaining_path = parts.nth(2).unwrap_or(""); - // If the result is empty, return "/" - if remaining_path.is_empty() { - return "/".to_string(); - } - // Otherwise, return the result with a leading "/" - format!("/{}", remaining_path) -} - pub fn normalize_path(path: &str) -> String { let mut normalized = path.to_string(); if normalized != "/" && normalized.ends_with('/') { @@ -406,15 +388,15 @@ pub async fn send_ws_disconnect( } pub fn make_error_message( - our_name: String, + our_name: &str, id: u64, target: Address, - error: HttpServerError, + error: &HttpServerActionError, ) -> KernelMessage { KernelMessage { id, source: Address { - node: our_name.clone(), + node: our_name.to_string(), process: HTTP_SERVER_PROCESS_ID.clone(), }, target, @@ -422,7 +404,7 @@ pub fn make_error_message( message: Message::Response(( Response { inherit: false, - ipc: serde_json::to_vec(&error).unwrap(), + ipc: serde_json::to_vec(error).unwrap(), metadata: None, }, None, diff --git a/src/types.rs b/src/types.rs index ce8ff2f2..32300eb7 100644 --- a/src/types.rs +++ b/src/types.rs @@ -31,14 +31,63 @@ pub type NodeId = String; // QNS domain name /// the process name can be a random number, or a name chosen by the user. /// the formatting is as follows: /// `[process name]:[package name]:[node ID]` -#[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, Eq, Hash, Serialize, Deserialize)] pub struct ProcessId { process_name: String, package_name: String, publisher_node: NodeId, } -#[allow(dead_code)] +/// PackageId is like a ProcessId, but for a package. Only contains the name +/// of the package and the name of the publisher. +#[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] +pub struct PackageId { + package_name: String, + publisher_node: String, +} + +impl PackageId { + pub fn new(package_name: &str, publisher_node: &str) -> Self { + PackageId { + package_name: package_name.into(), + publisher_node: publisher_node.into(), + } + } + pub fn from_str(input: &str) -> Result { + // split string on colons into 2 segments + let mut segments = input.split(':'); + let package_name = segments + .next() + .ok_or(ProcessIdParseError::MissingField)? + .to_string(); + let publisher_node = segments + .next() + .ok_or(ProcessIdParseError::MissingField)? + .to_string(); + if segments.next().is_some() { + return Err(ProcessIdParseError::TooManyColons); + } + Ok(PackageId { + package_name, + publisher_node, + }) + } + pub fn package(&self) -> &str { + &self.package_name + } + pub fn publisher(&self) -> &str { + &self.publisher_node + } +} + +impl std::fmt::Display for PackageId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}:{}", self.package_name, self.publisher_node) + } +} + +/// ProcessId is defined in the wit bindings, but constructors and methods +/// are defined here. impl ProcessId { /// generates a random u64 number if process_name is not declared pub fn new(process_name: Option<&str>, package_name: &str, publisher_node: &str) -> Self { @@ -99,12 +148,70 @@ impl ProcessId { } } +impl From<(&str, &str, &str)> for ProcessId { + fn from(input: (&str, &str, &str)) -> Self { + ProcessId::new(Some(input.0), input.1, input.2) + } +} + +impl std::fmt::Display for ProcessId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}:{}:{}", + self.process_name, self.package_name, self.publisher_node + ) + } +} + +impl PartialEq for ProcessId { + fn eq(&self, other: &Self) -> bool { + self.process_name == other.process_name + && self.package_name == other.package_name + && self.publisher_node == other.publisher_node + } +} + +impl PartialEq<&str> for ProcessId { + fn eq(&self, other: &&str) -> bool { + &self.to_string() == other + } +} + +impl PartialEq for &str { + fn eq(&self, other: &ProcessId) -> bool { + self == &other.to_string() + } +} + #[derive(Debug)] pub enum ProcessIdParseError { TooManyColons, MissingField, } +impl std::fmt::Display for ProcessIdParseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + ProcessIdParseError::TooManyColons => "Too many colons in ProcessId string", + ProcessIdParseError::MissingField => "Missing field in ProcessId string", + } + ) + } +} + +impl std::error::Error for ProcessIdParseError { + fn description(&self) -> &str { + match self { + ProcessIdParseError::TooManyColons => "Too many colons in ProcessId string", + ProcessIdParseError::MissingField => "Missing field in ProcessId string", + } + } +} + #[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)] pub struct Address { pub node: NodeId, @@ -112,6 +219,51 @@ pub struct Address { } impl Address { + pub fn new(node: &str, process: T) -> Address + where + T: Into, + { + Address { + node: node.to_string(), + process: process.into(), + } + } + pub fn from_str(input: &str) -> Result { + // split string on colons into 4 segments, + // first one with @, next 3 with : + let mut name_rest = input.split('@'); + let node = name_rest + .next() + .ok_or(AddressParseError::MissingField)? + .to_string(); + let mut segments = name_rest + .next() + .ok_or(AddressParseError::MissingNodeId)? + .split(':'); + let process_name = segments + .next() + .ok_or(AddressParseError::MissingField)? + .to_string(); + let package_name = segments + .next() + .ok_or(AddressParseError::MissingField)? + .to_string(); + let publisher_node = segments + .next() + .ok_or(AddressParseError::MissingField)? + .to_string(); + if segments.next().is_some() { + return Err(AddressParseError::TooManyColons); + } + Ok(Address { + node, + process: ProcessId { + process_name, + package_name, + publisher_node, + }, + }) + } pub fn en_wit(&self) -> wit::Address { wit::Address { node: self.node.clone(), @@ -130,6 +282,58 @@ impl Address { } } +impl From<(&str, &str, &str, &str)> for Address { + fn from(input: (&str, &str, &str, &str)) -> Self { + Address::new(input.0, (input.1, input.2, input.3)) + } +} + +impl From<(&str, T)> for Address +where + T: Into, +{ + fn from(input: (&str, T)) -> Self { + Address::new(input.0, input.1) + } +} + +impl std::fmt::Display for Address { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}@{}", self.node, self.process) + } +} + +#[derive(Debug)] +pub enum AddressParseError { + TooManyColons, + MissingNodeId, + MissingField, +} + +impl std::fmt::Display for AddressParseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + AddressParseError::TooManyColons => "Too many colons in ProcessId string", + AddressParseError::MissingNodeId => "Node ID missing", + AddressParseError::MissingField => "Missing field in ProcessId string", + } + ) + } +} + +impl std::error::Error for AddressParseError { + fn description(&self) -> &str { + match self { + AddressParseError::TooManyColons => "Too many colons in ProcessId string", + AddressParseError::MissingNodeId => "Node ID missing", + AddressParseError::MissingField => "Missing field in ProcessId string", + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Payload { pub mime: Option, // MIME type @@ -201,28 +405,6 @@ impl OnPanic { } } -// -// display impls -// - -impl std::fmt::Display for ProcessId { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - f, - "{}:{}:{}", - self.process(), - self.package(), - self.publisher() - ) - } -} - -impl std::fmt::Display for Address { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}@{}", self.node, self.process) - } -} - impl std::fmt::Display for Message { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self {