WIP not compiling, httpserver rewrite

This commit is contained in:
dr-frmr 2023-11-17 19:28:51 -05:00
parent 0c3655f69d
commit 3fdd605c18
No known key found for this signature in database
4 changed files with 632 additions and 564 deletions

View File

@ -3,22 +3,33 @@ use crate::http::utils::*;
use crate::register;
use crate::types::*;
use anyhow::Result;
use futures::SinkExt;
use futures::StreamExt;
use dashmap::DashMap;
use futures::{SinkExt, StreamExt};
use route_recognizer::Router;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::sync::{Mutex, RwLock};
use warp::http::{header::HeaderValue, StatusCode};
use warp::ws::{WebSocket, Ws};
use warp::{Filter, Reply};
// types and constants
type HttpSender = tokio::sync::oneshot::Sender<HttpResponse>;
type HttpResponseSenders = Arc<Mutex<HashMap<u64, (String, HttpSender)>>>;
const HTTP_SELF_IMPOSED_TIMEOUT: u64 = 15;
/// mapping from a given HTTP request (assigned an ID) to the oneshot
/// channel that will get a response from the app that handles the request,
/// and a string which contains the path that the request was made to.
type HttpResponseSenders = Arc<DashMap<u64, (String, HttpSender)>>;
type HttpSender = tokio::sync::oneshot::Sender<(HttpResponse, Vec<u8>)>;
/// mapping from an open websocket connection to a channel that will ingest
/// WebSocketPush messages from the app that handles the connection, and
/// send them to the connection.
type WebSocketSenders = Arc<DashMap<u64, WebSocketSender>>;
type WebSocketSender = tokio::sync::mpsc::Sender<(WsMessageType, Vec<u8>)>;
type StaticAssets = Arc<DashMap<String, Vec<u8>>>;
type PathBindings = Arc<RwLock<Router<BoundPath>>>;
/// HTTP server: a runtime module that handles HTTP requests at a given port.
@ -44,8 +55,10 @@ pub async fn http_server(
send_to_loop: MessageSender,
print_tx: PrintSender,
) -> Result<()> {
let http_response_senders = Arc::new(Mutex::new(HashMap::new()));
let websockets: WebSockets = Arc::new(Mutex::new(HashMap::new()));
let our_name = Arc::new(our_name);
let jwt_secret_bytes = Arc::new(jwt_secret_bytes);
let http_response_senders: HttpResponseSenders = Arc::new(DashMap::new());
let ws_senders: WebSocketSenders = Arc::new(DashMap::new());
// Add RPC path
let mut bindings_map: Router<BoundPath> = Router::new();
@ -59,85 +72,295 @@ pub async fn http_server(
let path_bindings: PathBindings = Arc::new(RwLock::new(bindings_map));
let _ = tokio::join!(
http_serve(
our_name.clone(),
our_port,
tokio::spawn(serve(
our_name.clone(),
our_port,
http_response_senders.clone(),
path_bindings.clone(),
ws_senders.clone(),
jwt_secret_bytes.clone(),
send_to_loop.clone(),
print_tx.clone(),
));
while let Some(km) = recv_in_server.recv().await {
// we *can* move this into a dedicated task, but it's not necessary
handle_app_message(
km,
http_response_senders.clone(),
path_bindings.clone(),
websockets.clone(),
ws_senders.clone(),
jwt_secret_bytes.clone(),
send_to_loop.clone(),
print_tx.clone()
),
async move {
while let Some(kernel_message) = recv_in_server.recv().await {
let KernelMessage {
id,
source,
message,
payload,
..
} = kernel_message;
if let Err(e) = http_handle_messages(
our_name.clone(),
id,
source.clone(),
message,
payload,
http_response_senders.clone(),
path_bindings.clone(),
websockets.clone(),
jwt_secret_bytes.clone(),
send_to_loop.clone(),
print_tx.clone(),
)
.await
{
send_to_loop
.send(make_error_message(our_name.clone(), id, source.clone(), e))
.await
.unwrap();
}
}
}
);
Err(anyhow::anyhow!("http_server: exited"))
print_tx.clone(),
)
.await;
}
return Err(anyhow::anyhow!("http_server: http_server loop exited"));
}
async fn handle_websocket(
ws: WebSocket,
our: String,
jwt_secret_bytes: Vec<u8>,
websockets: WebSockets,
/// The 'server' part. Listens on a port assigned by runtime, and handles
/// all HTTP requests on it. Also allows incoming websocket connections.
async fn serve(
our: Arc<String>,
our_port: u16,
http_response_senders: HttpResponseSenders,
path_bindings: PathBindings,
ws_senders: WebSocketSenders,
jwt_secret_bytes: Arc<Vec<u8>>,
send_to_loop: MessageSender,
print_tx: PrintSender,
) {
let (write_stream, mut read_stream) = ws.split();
let write_stream = Arc::new(Mutex::new(write_stream));
let _ = print_tx
.send(Printout {
verbosity: 0,
content: format!("http_server: running on port {}", our_port),
})
.await;
// Filter to receive websockets
let cloned_msg_tx = send_to_loop.clone();
let cloned_our = our.clone();
let cloned_jwt_secret_bytes = jwt_secret_bytes.clone();
let ws_route = warp::path::end()
.and(warp::ws())
.and(warp::any().map(move || cloned_our.clone()))
.and(warp::any().map(move || cloned_jwt_secret_bytes.clone()))
.and(warp::any().map(move || ws_senders.clone()))
.and(warp::any().map(move || cloned_msg_tx.clone()))
.map(
|ws_connection: Ws,
our: Arc<String>,
jwt_secret_bytes: Arc<Vec<u8>>,
ws_senders: WebSocketSenders,
send_to_loop: MessageSender| {
ws_connection.on_upgrade(move |ws: WebSocket| async move {
maintain_websocket(ws, our, jwt_secret_bytes, ws_senders, send_to_loop).await
})
},
);
// Filter to receive HTTP requests
let filter = warp::filters::method::method()
.and(warp::addr::remote())
.and(warp::path::full())
.and(warp::filters::header::headers_cloned())
.and(warp::filters::body::bytes())
.and(warp::any().map(move || our.clone()))
.and(warp::any().map(move || http_response_senders.clone()))
.and(warp::any().map(move || path_bindings.clone()))
.and(warp::any().map(move || jwt_secret_bytes.clone()))
.and(warp::any().map(move || send_to_loop.clone()))
.and_then(http_handler);
let filter_with_ws = ws_route.or(filter);
warp::serve(filter_with_ws)
.run(([0, 0, 0, 0], our_port))
.await;
}
async fn http_handler(
method: warp::http::Method,
socket_addr: Option<SocketAddr>,
path: warp::path::FullPath,
headers: warp::http::HeaderMap,
body: warp::hyper::body::Bytes,
our: Arc<String>,
http_response_senders: HttpResponseSenders,
path_bindings: PathBindings,
jwt_secret_bytes: Arc<Vec<u8>>,
send_to_loop: MessageSender,
) -> Result<impl warp::Reply, warp::Rejection> {
// TODO this is all so dirty. Figure out what actually matters.
// trim trailing "/"
let original_path = normalize_path(path.as_str());
let id: u64 = rand::random();
let serialized_headers = serialize_headers(&headers);
let path_bindings = path_bindings.read().await;
let Ok(route) = path_bindings.recognize(&original_path) else {
return Ok(warp::reply::with_status(vec![], StatusCode::NOT_FOUND).into_response());
};
let bound_path = route.handler();
if bound_path.authenticated {
let auth_token = serialized_headers
.get("cookie")
.cloned()
.unwrap_or_default();
if !auth_cookie_valid(&our, &auth_token, &jwt_secret_bytes) {
return Ok(warp::reply::with_status(vec![], StatusCode::UNAUTHORIZED).into_response());
}
}
let is_local = socket_addr
.map(|addr| addr.ip().is_loopback())
.unwrap_or(false);
if bound_path.local_only && !is_local {
return Ok(warp::reply::with_status(vec![], StatusCode::FORBIDDEN).into_response());
}
// RPC functionality: if path is /rpc:sys:uqbar/message,
// we extract message from base64 encoded bytes in data
// and send it to the correct app.
let message = if bound_path.app == "rpc:sys:uqbar" {
match handle_rpc_message(our, id, body).await {
Ok(message) => message,
Err(e) => {
return Ok(warp::reply::with_status(vec![], e).into_response());
}
}
} else {
// otherwise, make a message to the correct app
KernelMessage {
id,
source: Address {
node: our.to_string(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: Address {
node: our.to_string(),
process: bound_path.app.clone(),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: Some(HTTP_SELF_IMPOSED_TIMEOUT),
ipc: serde_json::to_vec(&IncomingHttpRequest {
source_socket_addr: socket_addr.map(|addr| addr.to_string()),
method: method.to_string(),
raw_path: original_path.clone(),
headers: serialized_headers,
})
.unwrap(),
metadata: None,
}),
payload: Some(Payload {
mime: None,
bytes: body.to_vec(),
}),
signed_capabilities: None,
}
};
let (response_sender, response_receiver) = tokio::sync::oneshot::channel();
http_response_senders.insert(id, (original_path, response_sender));
match send_to_loop.send(message).await {
Ok(_) => {}
Err(_) => {
return Ok(
warp::reply::with_status(vec![], StatusCode::INTERNAL_SERVER_ERROR).into_response(),
);
}
}
let timeout_duration = tokio::time::Duration::from_secs(HTTP_SELF_IMPOSED_TIMEOUT);
let result = tokio::time::timeout(timeout_duration, response_receiver).await;
let (http_response, body) = match result {
Ok(Ok(res)) => res,
Ok(Err(_)) => {
return Ok(
warp::reply::with_status(vec![], StatusCode::INTERNAL_SERVER_ERROR).into_response(),
);
}
Err(_) => {
return Ok(
warp::reply::with_status(vec![], StatusCode::REQUEST_TIMEOUT).into_response(),
);
}
};
let reply = warp::reply::with_status(
body,
StatusCode::from_u16(http_response.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
);
let mut response = reply.into_response();
// Merge the deserialized headers into the existing headers
let existing_headers = response.headers_mut();
for (header_name, header_value) in deserialize_headers(http_response.headers).iter() {
if header_name == "set-cookie" || header_name == "Set-Cookie" {
if let Ok(cookie) = header_value.to_str() {
let cookie_headers: Vec<&str> = cookie
.split("; ")
.filter(|&cookie| !cookie.is_empty())
.collect();
for cookie_header in cookie_headers {
if let Ok(valid_cookie) = HeaderValue::from_str(cookie_header) {
existing_headers.append(header_name, valid_cookie);
}
}
}
} else {
existing_headers.insert(header_name.to_owned(), header_value.to_owned());
}
}
Ok(response)
}
async fn handle_rpc_message(
our: Arc<String>,
id: u64,
body: warp::hyper::body::Bytes,
) -> Result<KernelMessage, StatusCode> {
let Ok(rpc_message) = serde_json::from_slice::<RpcMessage>(&body) else {
return Err(StatusCode::BAD_REQUEST);
};
let Ok(target_process) = ProcessId::from_str(&rpc_message.process) else {
return Err(StatusCode::BAD_REQUEST);
};
Ok(KernelMessage {
id,
source: Address {
node: our.to_string(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: Address {
node: rpc_message.node.unwrap_or(our.to_string()),
process: target_process,
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: Some(15), // no effect on runtime
ipc: match rpc_message.ipc {
Some(ipc_string) => ipc_string.into_bytes(),
None => Vec::new(),
},
metadata: rpc_message.metadata,
}),
payload: match base64::decode(rpc_message.data.unwrap_or("".to_string())) {
Ok(bytes) => Some(Payload {
mime: rpc_message.mime,
bytes,
}),
Err(_) => None,
},
signed_capabilities: None,
})
}
async fn maintain_websocket(
ws: WebSocket,
our: Arc<String>,
jwt_secret_bytes: Arc<Vec<u8>>,
ws_senders: WebSocketSenders,
send_to_loop: MessageSender,
) {
let (write_stream, mut read_stream) = ws.split();
// How do we handle authentication?
let ws_id: u64 = rand::random();
let
while let Some(Ok(msg)) = read_stream.next().await {
if msg.is_binary() {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: "GOT WEBSOCKET BYTES".to_string(),
})
.await;
let bytes = msg.as_bytes();
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!(
"WEBSOCKET MESSAGE (BYTES) {}",
String::from_utf8(bytes.to_vec()).unwrap_or_default()
),
})
.await;
match serde_json::from_slice::<WebSocketClientMessage>(bytes) {
Ok(parsed_msg) => {
handle_incoming_ws(
@ -152,23 +375,10 @@ async fn handle_websocket(
)
.await;
}
Err(e) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("Failed to parse WebSocket message: {}", e),
})
.await;
}
Err(e) => {}
}
} else if msg.is_text() {
if let Ok(msg_str) = msg.to_str() {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("WEBSOCKET MESSAGE (TEXT): {}", msg_str),
})
.await;
if let Ok(parsed_msg) = serde_json::from_str(msg_str) {
handle_incoming_ws(
parsed_msg,
@ -185,6 +395,7 @@ async fn handle_websocket(
}
} else if msg.is_close() {
// Delete the websocket from the map
ws_senders.remove(&ws_id);
let mut ws_map = websockets.lock().await;
for (node, node_map) in ws_map.iter_mut() {
for (channel_id, id_map) in node_map.iter_mut() {
@ -205,53 +416,57 @@ async fn handle_websocket(
}
}
async fn http_handle_messages(
our: String,
id: u64,
source: Address,
message: Message,
payload: Option<Payload>,
async fn handle_app_message(
km: KernelMessage,
http_response_senders: HttpResponseSenders,
path_bindings: PathBindings,
websockets: WebSockets,
ws_proxies: WebSocketProxies,
jwt_secret_bytes: Vec<u8>,
ws_senders: WebSocketSenders,
jwt_secret_bytes: Arc<Vec<u8>>,
send_to_loop: MessageSender,
print_tx: PrintSender,
) -> Result<(), HttpServerError> {
match message {
Message::Response((ref response, _)) => {
) {
// when we get a Response, try to match it to an outstanding HTTP
// request and send it there.
// when we get a Request, parse it into an HttpServerAction and perform it.
match km.message {
Message::Response((ref response, _context)) => {
let Some((_id, (path, sender))) = http_response_senders.remove(&km.id) else {
return
};
// if path is /rpc/message, return accordingly with base64 encoded payload
if path == "/rpc:sys:uqbar/message" {
let payload = km.payload.map(|p| {
Payload {
mime: p.mime,
bytes: base64::encode(p.bytes).into_bytes(),
}
});
let mut default_headers = HashMap::new();
default_headers.insert("Content-Type".to_string(), "text/html".to_string());
let _ = sender.send((HttpResponse {
status: 200,
headers: default_headers,
},
serde_json::to_vec(&RpcResponseBody {
ipc: response.ipc,
payload,
}).unwrap(),
));
} else {
let Ok(response) = serde_json::from_slice::<HttpResponse>(&response.ipc) else {
// the receiver will automatically trigger a 503 when sender is dropped.
return
};
}
let mut senders = http_response_senders.lock().await;
match senders.remove(&id) {
// if no corresponding entry, nowhere to send response
None => {}
Some((path, channel)) => {
// if path is /rpc/message, return accordingly with base64 encoded payload
if path == *"/rpc:sys:uqbar/message" {
let payload = payload.map(|p| {
let bytes = p.bytes;
let base64_bytes = base64::encode(bytes);
Payload {
mime: p.mime,
bytes: base64_bytes.into_bytes(),
}
});
let body = serde_json::json!({
"ipc": response.ipc,
"payload": payload
})
.to_string()
.as_bytes()
.to_vec();
let mut default_headers = HashMap::new();
default_headers.insert("Content-Type".to_string(), "text/html".to_string());
let _ = channel.send(HttpResponse {
status: 200,
headers: default_headers,
body: Some(body),
});
// error case here?
} else {
// else try deserializing ipc into a HttpResponse
let json = serde_json::from_slice::<HttpResponse>(&response.ipc);
@ -506,343 +721,3 @@ async fn http_handle_messages(
Ok(())
}
// TODO: add a way to register a websocket connection (should be a Vector of websockets)
// Then forward websocket messages to the correct place
async fn http_serve(
our: String,
our_port: u16,
http_response_senders: HttpResponseSenders,
path_bindings: PathBindings,
websockets: WebSockets,
jwt_secret_bytes: Vec<u8>,
send_to_loop: MessageSender,
print_tx: PrintSender,
) {
let cloned_msg_tx = send_to_loop.clone();
let cloned_print_tx = print_tx.clone();
let cloned_our = our.clone();
let cloned_jwt_secret_bytes = jwt_secret_bytes.clone();
let ws_route = warp::path::end()
.and(warp::ws())
.and(warp::any().map(move || cloned_our.clone()))
.and(warp::any().map(move || cloned_jwt_secret_bytes.clone()))
.and(warp::any().map(move || websockets.clone()))
.and(warp::any().map(move || cloned_msg_tx.clone()))
.and(warp::any().map(move || cloned_print_tx.clone()))
.map(
|ws_connection: Ws,
our: String,
jwt_secret_bytes: Vec<u8>,
websockets: WebSockets,
send_to_loop: MessageSender,
print_tx: PrintSender| {
ws_connection.on_upgrade(move |ws: WebSocket| async move {
handle_websocket(
ws,
our,
jwt_secret_bytes,
websockets,
send_to_loop,
print_tx,
)
.await
})
},
);
let print_tx_move = print_tx.clone();
let filter = warp::filters::method::method()
.and(warp::addr::remote())
.and(warp::path::full())
.and(warp::filters::header::headers_cloned())
.and(
warp::filters::query::raw()
.or(warp::any().map(String::default))
.unify()
.map(|query_string: String| {
if query_string.is_empty() {
HashMap::new()
} else {
match serde_urlencoded::from_str(&query_string) {
Ok(map) => map,
Err(_) => HashMap::new(),
}
}
}),
)
.and(warp::filters::body::bytes())
.and(warp::any().map(move || our.clone()))
.and(warp::any().map(move || http_response_senders.clone()))
.and(warp::any().map(move || path_bindings.clone()))
.and(warp::any().map(move || jwt_secret_bytes.clone()))
.and(warp::any().map(move || send_to_loop.clone()))
.and(warp::any().map(move || print_tx_move.clone()))
.and_then(handler);
let filter_with_ws = ws_route.or(filter);
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("http_server: running on: {}", our_port),
})
.await;
warp::serve(filter_with_ws)
.run(([0, 0, 0, 0], our_port))
.await;
}
async fn handler(
method: warp::http::Method,
address: Option<SocketAddr>,
path: warp::path::FullPath,
headers: warp::http::HeaderMap,
query_params: HashMap<String, String>,
body: warp::hyper::body::Bytes,
our: String,
http_response_senders: HttpResponseSenders,
path_bindings: PathBindings,
jwt_secret_bytes: Vec<u8>,
send_to_loop: MessageSender,
_print_tx: PrintSender,
) -> Result<impl warp::Reply, warp::Rejection> {
let address = match address {
Some(a) => a.to_string(),
None => "".to_string(),
};
// trim trailing "/"
let original_path = normalize_path(path.as_str());
let id: u64 = rand::random();
let real_headers = serialize_headers(&headers);
let path_bindings = path_bindings.read().await;
let Ok(route) = path_bindings.recognize(&original_path) else {
return Ok(warp::reply::with_status(vec![], StatusCode::NOT_FOUND).into_response());
};
let bound_path = route.handler();
let app = bound_path.app.to_string();
let url_params: HashMap<&str, &str> = route.params().into_iter().collect();
let raw_path = remove_process_id(&original_path);
let path = remove_process_id(&bound_path.original_path);
if bound_path.authenticated {
let auth_token = real_headers.get("cookie").cloned().unwrap_or_default();
if !auth_cookie_valid(our.clone(), &auth_token, jwt_secret_bytes) {
// send 401
return Ok(warp::reply::with_status(vec![], StatusCode::UNAUTHORIZED).into_response());
}
}
if bound_path.local_only && !address.starts_with("127.0.0.1:") {
// send 403
return Ok(warp::reply::with_status(vec![], StatusCode::FORBIDDEN).into_response());
}
// RPC functionality: if path is /rpc:sys:uqbar/message,
// we extract message from base64 encoded bytes in data
// and send it to the correct app.
let message = if app == *"rpc:sys:uqbar" {
let rpc_message: RpcMessage = match serde_json::from_slice(&body) {
// to_vec()?
Ok(v) => v,
Err(_) => {
return Ok(
warp::reply::with_status(vec![], StatusCode::BAD_REQUEST).into_response()
);
}
};
let target_process = match ProcessId::from_str(&rpc_message.process) {
Ok(p) => p,
Err(_) => {
return Ok(
warp::reply::with_status(vec![], StatusCode::BAD_REQUEST).into_response()
);
}
};
let payload = match base64::decode(rpc_message.data.unwrap_or("".to_string())) {
Ok(bytes) => Some(Payload {
mime: rpc_message.mime,
bytes,
}),
Err(_) => None,
};
let node = match rpc_message.node {
Some(node_str) => node_str,
None => our.clone(),
};
let ipc_bytes: Vec<u8> = match rpc_message.ipc {
Some(ipc_string) => ipc_string.into_bytes(),
None => Vec::new(),
};
KernelMessage {
id,
source: Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: Address {
node,
process: target_process,
},
rsvp: Some(Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
}),
message: Message::Request(Request {
inherit: false,
expects_response: Some(15), // no effect on runtime
ipc: ipc_bytes,
metadata: rpc_message.metadata,
}),
payload,
signed_capabilities: None,
}
} else if app == *"encryptor:sys:uqbar" {
let body_json = match String::from_utf8(body.to_vec()) {
Ok(s) => s,
Err(_) => {
return Ok(
warp::reply::with_status(vec![], StatusCode::BAD_REQUEST).into_response()
);
}
};
let body: serde_json::Value = match serde_json::from_str(&body_json) {
Ok(v) => v,
Err(_) => {
return Ok(
warp::reply::with_status(vec![], StatusCode::BAD_REQUEST).into_response()
);
}
};
let channel_id = body["channel_id"].as_str().unwrap_or("");
let public_key_hex = body["public_key_hex"].as_str().unwrap_or("");
KernelMessage {
id,
source: Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: Address {
node: our.clone(),
process: ProcessId::from_str("encryptor:sys:uqbar").unwrap(),
},
rsvp: None, //?
message: Message::Request(Request {
inherit: false,
expects_response: None,
ipc: serde_json::json!({
"GetKeyAction": {
"channel_id": channel_id,
"public_key_hex": public_key_hex,
}
})
.to_string()
.into_bytes(),
metadata: None,
}),
payload: None,
signed_capabilities: None,
}
} else {
// otherwise, make a message, to the correct app.
KernelMessage {
id,
source: Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: Address {
node: our.clone(),
process: bound_path.app.clone(),
},
rsvp: Some(Address {
node: our.clone(),
process: HTTP_SERVER_PROCESS_ID.clone(),
}),
message: Message::Request(Request {
inherit: false,
expects_response: Some(15), // no effect on runtime
ipc: serde_json::json!({
"address": address,
"method": method.to_string(),
"raw_path": raw_path.clone(),
"path": path.clone(),
"headers": serialize_headers(&headers),
"query_params": query_params,
"url_params": url_params,
})
.to_string()
.into_bytes(),
metadata: None,
}),
payload: Some(Payload {
mime: Some("application/octet-stream".to_string()), // TODO adjust MIME type as needed
bytes: body.to_vec(),
}),
signed_capabilities: None,
}
};
let (response_sender, response_receiver) = oneshot::channel();
http_response_senders
.lock()
.await
.insert(id, (original_path.clone(), response_sender));
send_to_loop.send(message).await.unwrap();
let timeout_duration = tokio::time::Duration::from_secs(15); // adjust as needed
let result = tokio::time::timeout(timeout_duration, response_receiver).await;
let from_channel = match result {
Ok(Ok(from_channel)) => from_channel,
Ok(Err(_)) => {
return Ok(
warp::reply::with_status(vec![], StatusCode::INTERNAL_SERVER_ERROR).into_response(),
);
}
Err(_) => {
return Ok(
warp::reply::with_status(vec![], StatusCode::REQUEST_TIMEOUT).into_response(),
);
}
};
let reply = warp::reply::with_status(
match from_channel.body {
Some(val) => val,
None => vec![],
},
StatusCode::from_u16(from_channel.status).unwrap(),
);
let mut response = reply.into_response();
// Merge the deserialized headers into the existing headers
let existing_headers = response.headers_mut();
for (header_name, header_value) in deserialize_headers(from_channel.headers).iter() {
if header_name == "set-cookie" || header_name == "Set-Cookie" {
if let Ok(cookie) = header_value.to_str() {
let cookie_headers: Vec<&str> = cookie
.split("; ")
.filter(|&cookie| !cookie.is_empty())
.collect();
for cookie_header in cookie_headers {
if let Ok(valid_cookie) = HeaderValue::from_str(cookie_header) {
existing_headers.append(header_name, valid_cookie);
}
}
}
} else {
existing_headers.insert(header_name.clone(), header_value.clone());
}
}
Ok(response)
}

View File

@ -1,11 +1,23 @@
use crate::types::Address;
use crate::types::{Address, Payload};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use thiserror::Error;
/// HTTP Request type that can be shared over WASM boundary to apps.
/// This is the one you receive from the http_server:sys:uqbar service.
#[derive(Debug, Serialize, Deserialize)]
pub struct HttpRequest {
pub struct IncomingHttpRequest {
pub source_socket_addr: Option<String>, // will parse to SocketAddr
pub method: String, // will parse to http::Method
pub raw_path: String,
pub headers: HashMap<String, String>,
// BODY is stored in the payload, as bytes
}
/// HTTP Request type that can be shared over WASM boundary to apps.
/// This is the one you send to the http_client:sys:uqbar service.
#[derive(Debug, Serialize, Deserialize)]
pub struct OutgoingHttpRequest {
pub method: String, // must parse to http::Method
pub version: Option<String>, // must parse to http::Version
pub url: String, // must parse to url::Url
@ -22,6 +34,12 @@ pub struct HttpResponse {
// BODY is stored in the payload, as bytes
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RpcResponseBody {
pub ipc: Vec<u8>,
pub payload: Option<Payload>,
}
#[derive(Error, Debug, Serialize, Deserialize)]
pub enum HttpClientError {
#[error("http_client: request could not be parsed to HttpRequest: {}.", req)]
@ -36,55 +54,67 @@ pub enum HttpClientError {
RequestFailed { error: String },
}
#[derive(Error, Debug, Serialize, Deserialize)]
pub enum HttpServerError {
#[error("http_server: json is None")]
NoJson,
#[error("http_server: response not ok")]
ResponseError,
#[error("http_server: bytes are None")]
NoBytes,
#[error(
"http_server: JSON payload could not be parsed to HttpClientRequest: {error}. Got {:?}.",
json
)]
BadJson { json: String, error: String },
#[error("http_server: path binding error: {:?}", error)]
PathBind { error: String },
}
#[derive(Debug, Serialize, Deserialize)]
pub struct JwtClaims {
pub username: String,
pub expiration: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct WebSocketServerTarget {
pub node: String,
pub id: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct WebSocketPush {
pub target: WebSocketServerTarget,
pub is_text: Option<bool>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ServerAction {
pub action: String,
}
/// Request type sent to `http_server:sys:uqbar` in order to configure it.
/// You can also send [`WebSocketPush`], which allows you to push messages
/// across an existing open WebSocket connection.
///
/// If a response is expected, all HttpServerActions will return a Response
/// with the shape Result<(), HttpServerActionError> serialized to JSON.
#[derive(Debug, Serialize, Deserialize)]
pub enum HttpServerAction {
BindPath {
/// Bind does not expect payload.
Bind {
path: String,
authenticated: bool,
local_only: bool,
},
WebSocketPush(WebSocketPush),
ServerAction(ServerAction),
/// BindStatic expects a payload containing the static file to serve at this path.
BindStatic {
path: String,
authenticated: bool,
local_only: bool,
},
/// Expects a payload containing the WebSocket message bytes to send.
WebSocketPush {
channel_id: String,
message_type: WsMessageType,
},
}
/// The possible message types for WebSocketPush. Ping and Pong are limited to 125 bytes
/// by the WebSockets protocol. Text will be sent as a Text frame, with the payload bytes
/// being the UTF-8 encoding of the string. Binary will be sent as a Binary frame containing
/// the unmodified payload bytes.
#[derive(Debug, Serialize, Deserialize)]
pub enum WsMessageType {
Text,
Binary,
Ping,
Pong,
}
/// Part of the Response type issued by http_server
#[derive(Error, Debug, Serialize, Deserialize)]
pub enum HttpServerActionError {
#[error(
"http_server: request could not be parsed to HttpServerAction: {}.",
req
)]
BadRequest { req: String },
#[error("http_server: action expected payload")]
NoPayload,
#[error("http_server: path binding error: {:?}", error)]
PathBindError { error: String },
#[error("http_server: WebSocket error: {:?}", error)]
WebSocketPushError { error: String },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum WebSocketClientMessage {
/// Must be the first message sent along a newly-opened WebSocket connection.
WsRegister(WsRegister),
WsMessage(WsMessage),
EncryptedWsMessage(EncryptedWsMessage),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -113,9 +143,8 @@ pub struct EncryptedWsMessage {
pub nonce: String, // Hex of the 12-byte nonce
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum WebSocketClientMessage {
WsRegister(WsRegister),
WsMessage(WsMessage),
EncryptedWsMessage(EncryptedWsMessage),
#[derive(Debug, Serialize, Deserialize)]
pub struct JwtClaims {
pub username: String,
pub expiration: u64,
}

View File

@ -1,21 +1,16 @@
use crate::http::types::*;
use crate::types::*;
use futures::stream::SplitSink;
use hmac::{Hmac, Mac};
use jwt::{Error, VerifyWithKey};
use serde::{Deserialize, Serialize};
use sha2::Sha256;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use warp::http::{header::HeaderName, header::HeaderValue, HeaderMap};
use warp::ws::WebSocket;
pub type SharedWriteStream = Arc<Mutex<SplitSink<WebSocket, warp::ws::Message>>>;
pub type WebSockets = Arc<Mutex<HashMap<String, HashMap<String, HashMap<u64, SharedWriteStream>>>>>;
pub type WebSocketProxies = Arc<Mutex<HashMap<String, HashSet<String>>>>;
pub struct BoundPath {
pub app: ProcessId,
pub authenticated: bool,
@ -52,7 +47,7 @@ pub fn parse_auth_token(auth_token: String, jwt_secret: Vec<u8>) -> Result<Strin
}
}
pub fn auth_cookie_valid(our_node: String, cookie: &str, jwt_secret: Vec<u8>) -> bool {
pub fn auth_cookie_valid(our_node: &str, cookie: &str, jwt_secret: &[u8]) -> bool {
let cookie_parts: Vec<&str> = cookie.split("; ").collect();
let mut auth_token = None;
@ -84,19 +79,6 @@ pub fn auth_cookie_valid(our_node: String, cookie: &str, jwt_secret: Vec<u8>) ->
}
}
pub fn remove_process_id(path: &str) -> String {
// Split the string into parts separated by '/'
let mut parts = path.splitn(3, '/');
// Skip the first two parts (before and after the first '/')
let remaining_path = parts.nth(2).unwrap_or("");
// If the result is empty, return "/"
if remaining_path.is_empty() {
return "/".to_string();
}
// Otherwise, return the result with a leading "/"
format!("/{}", remaining_path)
}
pub fn normalize_path(path: &str) -> String {
let mut normalized = path.to_string();
if normalized != "/" && normalized.ends_with('/') {
@ -406,15 +388,15 @@ pub async fn send_ws_disconnect(
}
pub fn make_error_message(
our_name: String,
our_name: &str,
id: u64,
target: Address,
error: HttpServerError,
error: &HttpServerActionError,
) -> KernelMessage {
KernelMessage {
id,
source: Address {
node: our_name.clone(),
node: our_name.to_string(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target,
@ -422,7 +404,7 @@ pub fn make_error_message(
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::to_vec(&error).unwrap(),
ipc: serde_json::to_vec(error).unwrap(),
metadata: None,
},
None,

View File

@ -31,14 +31,63 @@ pub type NodeId = String; // QNS domain name
/// the process name can be a random number, or a name chosen by the user.
/// the formatting is as follows:
/// `[process name]:[package name]:[node ID]`
#[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, Eq, Hash, Serialize, Deserialize)]
pub struct ProcessId {
process_name: String,
package_name: String,
publisher_node: NodeId,
}
#[allow(dead_code)]
/// PackageId is like a ProcessId, but for a package. Only contains the name
/// of the package and the name of the publisher.
#[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct PackageId {
package_name: String,
publisher_node: String,
}
impl PackageId {
pub fn new(package_name: &str, publisher_node: &str) -> Self {
PackageId {
package_name: package_name.into(),
publisher_node: publisher_node.into(),
}
}
pub fn from_str(input: &str) -> Result<Self, ProcessIdParseError> {
// split string on colons into 2 segments
let mut segments = input.split(':');
let package_name = segments
.next()
.ok_or(ProcessIdParseError::MissingField)?
.to_string();
let publisher_node = segments
.next()
.ok_or(ProcessIdParseError::MissingField)?
.to_string();
if segments.next().is_some() {
return Err(ProcessIdParseError::TooManyColons);
}
Ok(PackageId {
package_name,
publisher_node,
})
}
pub fn package(&self) -> &str {
&self.package_name
}
pub fn publisher(&self) -> &str {
&self.publisher_node
}
}
impl std::fmt::Display for PackageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}", self.package_name, self.publisher_node)
}
}
/// ProcessId is defined in the wit bindings, but constructors and methods
/// are defined here.
impl ProcessId {
/// generates a random u64 number if process_name is not declared
pub fn new(process_name: Option<&str>, package_name: &str, publisher_node: &str) -> Self {
@ -99,12 +148,70 @@ impl ProcessId {
}
}
impl From<(&str, &str, &str)> for ProcessId {
fn from(input: (&str, &str, &str)) -> Self {
ProcessId::new(Some(input.0), input.1, input.2)
}
}
impl std::fmt::Display for ProcessId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}:{}:{}",
self.process_name, self.package_name, self.publisher_node
)
}
}
impl PartialEq for ProcessId {
fn eq(&self, other: &Self) -> bool {
self.process_name == other.process_name
&& self.package_name == other.package_name
&& self.publisher_node == other.publisher_node
}
}
impl PartialEq<&str> for ProcessId {
fn eq(&self, other: &&str) -> bool {
&self.to_string() == other
}
}
impl PartialEq<ProcessId> for &str {
fn eq(&self, other: &ProcessId) -> bool {
self == &other.to_string()
}
}
#[derive(Debug)]
pub enum ProcessIdParseError {
TooManyColons,
MissingField,
}
impl std::fmt::Display for ProcessIdParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
ProcessIdParseError::TooManyColons => "Too many colons in ProcessId string",
ProcessIdParseError::MissingField => "Missing field in ProcessId string",
}
)
}
}
impl std::error::Error for ProcessIdParseError {
fn description(&self) -> &str {
match self {
ProcessIdParseError::TooManyColons => "Too many colons in ProcessId string",
ProcessIdParseError::MissingField => "Missing field in ProcessId string",
}
}
}
#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)]
pub struct Address {
pub node: NodeId,
@ -112,6 +219,51 @@ pub struct Address {
}
impl Address {
pub fn new<T>(node: &str, process: T) -> Address
where
T: Into<ProcessId>,
{
Address {
node: node.to_string(),
process: process.into(),
}
}
pub fn from_str(input: &str) -> Result<Self, AddressParseError> {
// split string on colons into 4 segments,
// first one with @, next 3 with :
let mut name_rest = input.split('@');
let node = name_rest
.next()
.ok_or(AddressParseError::MissingField)?
.to_string();
let mut segments = name_rest
.next()
.ok_or(AddressParseError::MissingNodeId)?
.split(':');
let process_name = segments
.next()
.ok_or(AddressParseError::MissingField)?
.to_string();
let package_name = segments
.next()
.ok_or(AddressParseError::MissingField)?
.to_string();
let publisher_node = segments
.next()
.ok_or(AddressParseError::MissingField)?
.to_string();
if segments.next().is_some() {
return Err(AddressParseError::TooManyColons);
}
Ok(Address {
node,
process: ProcessId {
process_name,
package_name,
publisher_node,
},
})
}
pub fn en_wit(&self) -> wit::Address {
wit::Address {
node: self.node.clone(),
@ -130,6 +282,58 @@ impl Address {
}
}
impl From<(&str, &str, &str, &str)> for Address {
fn from(input: (&str, &str, &str, &str)) -> Self {
Address::new(input.0, (input.1, input.2, input.3))
}
}
impl<T> From<(&str, T)> for Address
where
T: Into<ProcessId>,
{
fn from(input: (&str, T)) -> Self {
Address::new(input.0, input.1)
}
}
impl std::fmt::Display for Address {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}@{}", self.node, self.process)
}
}
#[derive(Debug)]
pub enum AddressParseError {
TooManyColons,
MissingNodeId,
MissingField,
}
impl std::fmt::Display for AddressParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
AddressParseError::TooManyColons => "Too many colons in ProcessId string",
AddressParseError::MissingNodeId => "Node ID missing",
AddressParseError::MissingField => "Missing field in ProcessId string",
}
)
}
}
impl std::error::Error for AddressParseError {
fn description(&self) -> &str {
match self {
AddressParseError::TooManyColons => "Too many colons in ProcessId string",
AddressParseError::MissingNodeId => "Node ID missing",
AddressParseError::MissingField => "Missing field in ProcessId string",
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Payload {
pub mime: Option<String>, // MIME type
@ -201,28 +405,6 @@ impl OnPanic {
}
}
//
// display impls
//
impl std::fmt::Display for ProcessId {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{}:{}:{}",
self.process(),
self.package(),
self.publisher()
)
}
}
impl std::fmt::Display for Address {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}@{}", self.node, self.process)
}
}
impl std::fmt::Display for Message {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {