add WebSocketBind, WebSocketSecureBind

This commit is contained in:
dr-frmr 2023-12-18 19:25:31 -05:00
parent 7dbffd80a7
commit cdb051981c
No known key found for this signature in database
2 changed files with 153 additions and 287 deletions

View File

@ -45,8 +45,9 @@ struct BoundPath {
struct BoundWsPath { struct BoundWsPath {
pub app: ProcessId, pub app: ProcessId,
pub secure_subdomain: Option<String>,
pub authenticated: bool, pub authenticated: bool,
pub local_only: bool, pub encrypted: bool, // TODO use
} }
/// HTTP server: a runtime module that handles HTTP requests at a given port. /// HTTP server: a runtime module that handles HTTP requests at a given port.
@ -91,10 +92,8 @@ pub async fn http_server(
bindings_map.add("/rpc:sys:uqbar/message", rpc_bound_path); bindings_map.add("/rpc:sys:uqbar/message", rpc_bound_path);
let path_bindings: PathBindings = Arc::new(RwLock::new(bindings_map)); let path_bindings: PathBindings = Arc::new(RwLock::new(bindings_map));
// ws path bindings // ws path bindings
let mut ws_bindings_map: Router<BoundWsPath> = Router::new(); let ws_path_bindings: WsPathBindings = Arc::new(RwLock::new(Router::new()));
let ws_path_bindings: WsPathBindings = Arc::new(RwLock::new(ws_bindings_map));
tokio::spawn(serve( tokio::spawn(serve(
our_name.clone(), our_name.clone(),
@ -152,6 +151,7 @@ async fn serve(
let cloned_print_tx = print_tx.clone(); let cloned_print_tx = print_tx.clone();
let ws_route = warp::ws() let ws_route = warp::ws()
.and(warp::path::full()) .and(warp::path::full())
.and(warp::filters::host::optional())
.and(warp::filters::header::headers_cloned()) .and(warp::filters::header::headers_cloned())
.and(warp::any().map(move || cloned_our.clone())) .and(warp::any().map(move || cloned_our.clone()))
.and(warp::any().map(move || cloned_jwt_secret_bytes.clone())) .and(warp::any().map(move || cloned_jwt_secret_bytes.clone()))
@ -159,7 +159,7 @@ async fn serve(
.and(warp::any().map(move || ws_path_bindings.clone())) .and(warp::any().map(move || ws_path_bindings.clone()))
.and(warp::any().map(move || cloned_msg_tx.clone())) .and(warp::any().map(move || cloned_msg_tx.clone()))
.and(warp::any().map(move || cloned_print_tx.clone())) .and(warp::any().map(move || cloned_print_tx.clone()))
.map(ws_handler); .and_then(ws_handler);
// filter to receive and handle login requests // filter to receive and handle login requests
let cloned_our = our.clone(); let cloned_our = our.clone();
@ -245,9 +245,10 @@ async fn login_handler(
} }
} }
async fn ws_handler ( async fn ws_handler(
ws_connection: Ws, ws_connection: Ws,
path: warp::path::FullPath, path: warp::path::FullPath,
host: Option<Authority>,
headers: warp::http::HeaderMap, headers: warp::http::HeaderMap,
our: Arc<String>, our: Arc<String>,
jwt_secret_bytes: Arc<Vec<u8>>, jwt_secret_bytes: Arc<Vec<u8>>,
@ -255,135 +256,67 @@ async fn ws_handler (
ws_path_bindings: WsPathBindings, ws_path_bindings: WsPathBindings,
send_to_loop: MessageSender, send_to_loop: MessageSender,
print_tx: PrintSender, print_tx: PrintSender,
) { ) -> Result<impl warp::Reply, warp::Rejection> {
let original_path = normalize_path(path.as_str()); let original_path = normalize_path(path.as_str());
let _ = print_tx.send(Printout{ let _ = print_tx.send(Printout {
verbosity: 1, verbosity: 1,
content: format!("got ws request for {original_path}") content: format!("got ws request for {original_path}"),
}); });
let serialized_headers = serialize_headers(&headers); let serialized_headers = serialize_headers(&headers);
let ws_path_bindings = ws_path_bindings.read().await; let ws_path_bindings = ws_path_bindings.read().await;
if let Ok(route) = ws_path_bindings.recognize(&original_path) { let Ok(route) = ws_path_bindings.recognize(&original_path) else {
let bound_path = route.handler(); return Err(warp::reject::not_found());
};
if bound_path.authenticated { let bound_path = route.handler();
match serialized_headers.get("cookie") { if let Some(ref subdomain) = bound_path.secure_subdomain {
Some(auth_token) => { let _ = print_tx
if let Ok(our_name) = verify_auth_token(&auth_token, &jwt_secret_bytes) { .send(Printout {
if our_name != *our { verbosity: 1,
return; content: format!(
} "got request for path {original_path} bound by subdomain {subdomain}"
} else { ),
return; })
} .await;
} // assert that host matches what this app wants it to be
None => { if host.is_none() {
return; return Err(warp::reject::not_found());
} }
} let host = host.as_ref().unwrap();
// parse out subdomain from host (there can only be one)
let request_subdomain = host.host().split('.').next().unwrap_or("");
if request_subdomain != subdomain {
return Err(warp::reject::not_found());
} }
ws_connection.on_upgrade(move |ws: WebSocket| async move {
let (mut write_stream, mut read_stream) = ws.split();
let _ = print_tx.send(Printout{
verbosity: 1,
content: format!("got new client websocket connection")
}).await;
let ws_channel_id: u32 = rand::random();
let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100);
ws_senders.insert(ws_channel_id, (bound_path.app.clone(), ws_sender));
let _ = send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.to_string(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: Address {
node: our.clone().to_string(),
process: bound_path.app.clone(),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
ipc: serde_json::to_vec(&HttpServerRequest::WebSocketOpen(ws_channel_id)).unwrap(),
metadata: Some("ws".into()),
}),
payload: None,
signed_capabilities: None,
}).await;
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("websocket channel {ws_channel_id} opened"),
});
loop {
tokio::select! {
read = read_stream.next() => {
match read {
_ => {
websocket_close(ws_channel_id, bound_path.app.clone(), &ws_senders, &send_to_loop).await;
}
Some(Ok(msg)) => {
let _ = send_to_loop.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.to_string(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: Address {
node: our.to_string(),
process: bound_path.app.clone(),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
ipc: serde_json::to_vec(&HttpServerRequest::WebSocketPush {
channel_id: ws_channel_id,
message_type: WsMessageType::Binary,
}).unwrap(),
metadata: Some("ws".into()),
}),
payload: Some(Payload {
mime: None,
bytes: msg.into_bytes(),
}),
signed_capabilities: None,
});
}
}
}
Some(outgoing) = ws_receiver.recv() => {
match write_stream.send(outgoing).await {
Ok(()) => continue,
Err(_) => {
websocket_close(ws_channel_id, bound_path.app.clone(), &ws_senders, &send_to_loop).await;
}
}
}
}
}
let stream = write_stream.reunite(read_stream).unwrap();
let _ = stream.close().await;
});
} else {
return;
} }
if bound_path.authenticated {
let Some(auth_token) = serialized_headers.get("cookie") else {
return Err(warp::reject::not_found());
};
let Ok(our_name) = verify_auth_token(&auth_token, &jwt_secret_bytes) else {
return Err(warp::reject::not_found());
};
if our_name != *our {
return Err(warp::reject::not_found());
}
}
let app = bound_path.app.clone();
Ok(ws_connection.on_upgrade(move |ws: WebSocket| async move {
maintain_websocket(
ws,
our.clone(),
app,
jwt_secret_bytes.clone(),
ws_senders.clone(),
send_to_loop.clone(),
print_tx.clone(),
)
.await;
}))
} }
async fn http_handler( async fn http_handler(
@ -658,17 +591,13 @@ async fn handle_rpc_message(
async fn maintain_websocket( async fn maintain_websocket(
ws: WebSocket, ws: WebSocket,
our: Arc<String>, our: Arc<String>,
jwt_secret_bytes: Arc<Vec<u8>>, app: ProcessId,
_jwt_secret_bytes: Arc<Vec<u8>>, // TODO use for encrypted channels
ws_senders: WebSocketSenders, ws_senders: WebSocketSenders,
path_bindings: PathBindings,
send_to_loop: MessageSender, send_to_loop: MessageSender,
print_tx: PrintSender, print_tx: PrintSender,
) { ) {
let (mut write_stream, mut read_stream) = ws.split(); let (mut write_stream, mut read_stream) = ws.split();
// first, receive a message from client that contains the target process
// and the auth token
let _ = print_tx let _ = print_tx
.send(Printout { .send(Printout {
verbosity: 1, verbosity: 1,
@ -676,72 +605,10 @@ async fn maintain_websocket(
}) })
.await; .await;
let Some(Ok(register_msg)) = read_stream.next().await else {
// stream closed, exit
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("client failed to send registration message"),
})
.await;
let stream = write_stream.reunite(read_stream).unwrap();
let _ = stream.close().await;
return;
};
let Ok(ws_register) = serde_json::from_slice::<WsRegister>(register_msg.as_bytes()) else {
// stream error, exit
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("couldn't parse registration message from client"),
})
.await;
let stream = write_stream.reunite(read_stream).unwrap();
let _ = stream.close().await;
return;
};
let Ok(owner_process) = ProcessId::from_str(&ws_register.target_process) else {
// invalid process id, exit
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("client sent malformed process ID"),
})
.await;
let stream = write_stream.reunite(read_stream).unwrap();
let _ = stream.close().await;
return;
};
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("channel is intended for {owner_process}"),
})
.await;
let Ok(our_name) = verify_auth_token(&ws_register.auth_token, &jwt_secret_bytes) else {
// invalid auth token, exit
let stream = write_stream.reunite(read_stream).unwrap();
let _ = stream.close().await;
return;
};
if our_name != *our {
// invalid auth token, exit
let stream = write_stream.reunite(read_stream).unwrap();
let _ = stream.close().await;
return;
}
let ws_channel_id: u32 = rand::random(); let ws_channel_id: u32 = rand::random();
let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100); let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100);
ws_senders.insert(ws_channel_id, (owner_process.clone(), ws_sender)); ws_senders.insert(ws_channel_id, (app.clone(), ws_sender));
// send a message to the process associated with this channel
// notifying them that the channel is now open
let _ = send_to_loop let _ = send_to_loop
.send(KernelMessage { .send(KernelMessage {
id: rand::random(), id: rand::random(),
@ -750,8 +617,8 @@ async fn maintain_websocket(
process: HTTP_SERVER_PROCESS_ID.clone(), process: HTTP_SERVER_PROCESS_ID.clone(),
}, },
target: Address { target: Address {
node: our.to_string(), node: our.clone().to_string(),
process: owner_process.clone(), process: app.clone(),
}, },
rsvp: None, rsvp: None,
message: Message::Request(Request { message: Message::Request(Request {
@ -765,83 +632,53 @@ async fn maintain_websocket(
}) })
.await; .await;
// respond to the client notifying them that the channel is now open let _ = print_tx.send(Printout {
let Ok(()) = write_stream verbosity: 1,
.send(warp::ws::Message::text( content: format!("websocket channel {ws_channel_id} opened"),
serde_json::to_string(&WsRegisterResponse { });
channel_id: ws_channel_id,
})
.unwrap(),
))
.await
else {
// stream error, exit
let stream = write_stream.reunite(read_stream).unwrap();
let _ = stream.close().await;
return;
};
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("websocket channel {ws_channel_id} opened"),
})
.await;
loop { loop {
tokio::select! { tokio::select! {
read = read_stream.next() => { read = read_stream.next() => {
match read { match read {
None => {
// stream closed, remove and exit
websocket_close(ws_channel_id, owner_process, &ws_senders, &send_to_loop).await;
break;
}
Some(Err(_e)) => {
// stream error, remove and exit
websocket_close(ws_channel_id, owner_process, &ws_senders, &send_to_loop).await;
break;
}
Some(Ok(msg)) => { Some(Ok(msg)) => {
// forward message to process associated with this channel let _ = send_to_loop.send(KernelMessage {
let _ = send_to_loop id: rand::random(),
.send(KernelMessage { source: Address {
id: rand::random(), node: our.to_string(),
source: Address { process: HTTP_SERVER_PROCESS_ID.clone(),
node: our.to_string(), },
process: HTTP_SERVER_PROCESS_ID.clone(), target: Address {
}, node: our.to_string(),
target: Address { process: app.clone(),
node: our.to_string(), },
process: owner_process.clone(), rsvp: None,
}, message: Message::Request(Request {
rsvp: None, inherit: false,
message: Message::Request(Request { expects_response: None,
inherit: false, ipc: serde_json::to_vec(&HttpServerRequest::WebSocketPush {
expects_response: None, channel_id: ws_channel_id,
ipc: serde_json::to_vec(&HttpServerRequest::WebSocketPush { message_type: WsMessageType::Binary,
channel_id: ws_channel_id, }).unwrap(),
message_type: WsMessageType::Binary, metadata: Some("ws".into()),
}).unwrap(), }),
metadata: Some("ws".into()), payload: Some(Payload {
}), mime: None,
payload: Some(Payload { bytes: msg.into_bytes(),
mime: None, }),
bytes: msg.into_bytes(), signed_capabilities: None,
}), });
signed_capabilities: None, }
}) _ => {
.await; websocket_close(ws_channel_id, app.clone(), &ws_senders, &send_to_loop).await;
break;
} }
} }
} }
Some(outgoing) = ws_receiver.recv() => { Some(outgoing) = ws_receiver.recv() => {
// forward message to websocket
match write_stream.send(outgoing).await { match write_stream.send(outgoing).await {
Ok(()) => continue, Ok(()) => continue,
Err(_e) => { Err(_) => {
// stream error, remove and exit websocket_close(ws_channel_id, app.clone(), &ws_senders, &send_to_loop).await;
websocket_close(ws_channel_id, owner_process, &ws_senders, &send_to_loop).await;
break; break;
} }
} }
@ -962,21 +799,6 @@ async fn handle_app_message(
return; return;
}; };
match message { match message {
HttpServerAction::BindWs {
mut path,
authenticated,
local_only,
} => {
let mut ws_path_bindings = ws_path_bindings.write().await;
ws_path_bindings.add(
&normalize_path(&path),
BoundWsPath {
app: km.source.process.clone(),
authenticated,
local_only,
}
);
}
HttpServerAction::Bind { HttpServerAction::Bind {
mut path, mut path,
authenticated, authenticated,
@ -1074,7 +896,38 @@ async fn handle_app_message(
} }
send_action_response(km.id, km.source, &send_to_loop, Ok(())).await; send_action_response(km.id, km.source, &send_to_loop, Ok(())).await;
} }
HttpServerAction::WebSocketOpen(_) => { HttpServerAction::WebSocketBind {
path,
authenticated,
encrypted,
} => {
let mut ws_path_bindings = ws_path_bindings.write().await;
ws_path_bindings.add(
&normalize_path(&path),
BoundWsPath {
app: km.source.process.clone(),
secure_subdomain: None,
authenticated,
encrypted,
},
);
}
HttpServerAction::WebSocketSecureBind { path, encrypted } => {
let process_id_hash =
format!("{:x}", Sha256::digest(km.source.process.to_string()));
let subdomain = process_id_hash.split_at(32).0.to_owned();
let mut ws_path_bindings = ws_path_bindings.write().await;
ws_path_bindings.add(
&normalize_path(&path),
BoundWsPath {
app: km.source.process.clone(),
secure_subdomain: Some(subdomain),
authenticated: true,
encrypted,
},
);
}
HttpServerAction::WebSocketOpen { .. } => {
// we cannot receive these, only send them to processes // we cannot receive these, only send them to processes
send_action_response( send_action_response(
km.id, km.id,

View File

@ -83,13 +83,6 @@ pub enum HttpClientError {
/// with the shape Result<(), HttpServerActionError> serialized to JSON. /// with the shape Result<(), HttpServerActionError> serialized to JSON.
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum HttpServerAction { pub enum HttpServerAction {
// Binding an address for websockets. Doesn't need a cache since does not serve assets
// and might not need local_only.
BindWs {
path: String,
authenticated: bool,
local_only: bool,
},
/// Bind expects a payload if and only if `cache` is TRUE. The payload should /// Bind expects a payload if and only if `cache` is TRUE. The payload should
/// be the static file to serve at this path. /// be the static file to serve at this path.
Bind { Bind {
@ -118,10 +111,30 @@ pub enum HttpServerAction {
/// payload bytes and serve them as the response to any request to this path. /// payload bytes and serve them as the response to any request to this path.
cache: bool, cache: bool,
}, },
/// Bind a path to receive incoming WebSocket connections.
/// Doesn't need a cache since does not serve assets.
WebSocketBind {
path: String,
authenticated: bool,
encrypted: bool,
},
/// SecureBind is the same as Bind, except that it forces new connections to be made
/// from the unique subdomain of the process that bound the path. These are *always*
/// authenticated. Since the subdomain is unique, it will require the user to be
/// logged in separately to the general domain authentication.
WebSocketSecureBind {
path: String,
encrypted: bool,
},
/// Processes will RECEIVE this kind of request when a client connects to them. /// Processes will RECEIVE this kind of request when a client connects to them.
/// If a process does not want this websocket open, they should issue a *request* /// If a process does not want this websocket open, they should issue a *request*
/// containing a [`type@HttpServerAction::WebSocketClose`] message and this channel ID. /// containing a [`type@HttpServerAction::WebSocketClose`] message and this channel ID.
WebSocketOpen(u32), WebSocketOpen {
path: String,
channel_id: u32,
authenticated: bool,
encrypted: bool,
},
/// When sent, expects a payload containing the WebSocket message bytes to send. /// When sent, expects a payload containing the WebSocket message bytes to send.
WebSocketPush { WebSocketPush {
channel_id: u32, channel_id: u32,