wip websocket binding

This commit is contained in:
commercium-sys 2023-12-18 17:41:13 -05:00
parent 851f6fac44
commit 724f4837e9
2 changed files with 185 additions and 22 deletions

View File

@ -33,6 +33,7 @@ type WebSocketSenders = Arc<DashMap<u32, (ProcessId, WebSocketSender)>>;
type WebSocketSender = tokio::sync::mpsc::Sender<warp::ws::Message>; type WebSocketSender = tokio::sync::mpsc::Sender<warp::ws::Message>;
type PathBindings = Arc<RwLock<Router<BoundPath>>>; type PathBindings = Arc<RwLock<Router<BoundPath>>>;
type WsPathBindings = Arc<RwLock<Router<BoundWsPath>>>;
struct BoundPath { struct BoundPath {
pub app: ProcessId, pub app: ProcessId,
@ -42,6 +43,12 @@ struct BoundPath {
pub static_content: Option<Payload>, // TODO store in filesystem and cache pub static_content: Option<Payload>, // TODO store in filesystem and cache
} }
struct BoundWsPath {
pub app: ProcessId,
pub authenticated: bool,
pub local_only: bool,
}
/// HTTP server: a runtime module that handles HTTP requests at a given port. /// HTTP server: a runtime module that handles HTTP requests at a given port.
/// The server accepts bindings-requests from apps. These can be used in two ways: /// The server accepts bindings-requests from apps. These can be used in two ways:
/// ///
@ -84,11 +91,17 @@ pub async fn http_server(
bindings_map.add("/rpc:sys:uqbar/message", rpc_bound_path); bindings_map.add("/rpc:sys:uqbar/message", rpc_bound_path);
let path_bindings: PathBindings = Arc::new(RwLock::new(bindings_map)); let path_bindings: PathBindings = Arc::new(RwLock::new(bindings_map));
// ws path bindings
let mut ws_bindings_map: Router<BoundWsPath> = Router::new();
let ws_path_bindings: WsPathBindings = Arc::new(RwLock::new(ws_bindings_map));
tokio::spawn(serve( tokio::spawn(serve(
our_name.clone(), our_name.clone(),
our_port, our_port,
http_response_senders.clone(), http_response_senders.clone(),
path_bindings.clone(), path_bindings.clone(),
ws_path_bindings.clone(),
ws_senders.clone(), ws_senders.clone(),
encoded_keyfile.clone(), encoded_keyfile.clone(),
jwt_secret_bytes.clone(), jwt_secret_bytes.clone(),
@ -102,6 +115,7 @@ pub async fn http_server(
km, km,
http_response_senders.clone(), http_response_senders.clone(),
path_bindings.clone(), path_bindings.clone(),
ws_path_bindings.clone(),
ws_senders.clone(), ws_senders.clone(),
send_to_loop.clone(), send_to_loop.clone(),
) )
@ -117,6 +131,7 @@ async fn serve(
our_port: u16, our_port: u16,
http_response_senders: HttpResponseSenders, http_response_senders: HttpResponseSenders,
path_bindings: PathBindings, path_bindings: PathBindings,
ws_path_bindings: WsPathBindings,
ws_senders: WebSocketSenders, ws_senders: WebSocketSenders,
encoded_keyfile: Arc<Vec<u8>>, encoded_keyfile: Arc<Vec<u8>>,
jwt_secret_bytes: Arc<Vec<u8>>, jwt_secret_bytes: Arc<Vec<u8>>,
@ -135,33 +150,16 @@ async fn serve(
let cloned_our = our.clone(); let cloned_our = our.clone();
let cloned_jwt_secret_bytes = jwt_secret_bytes.clone(); let cloned_jwt_secret_bytes = jwt_secret_bytes.clone();
let cloned_print_tx = print_tx.clone(); let cloned_print_tx = print_tx.clone();
let ws_route = warp::path::end() let ws_route = warp::ws()
.and(warp::ws()) .and(warp::path::full())
.and(warp::filters::header::headers_cloned())
.and(warp::any().map(move || cloned_our.clone())) .and(warp::any().map(move || cloned_our.clone()))
.and(warp::any().map(move || cloned_jwt_secret_bytes.clone())) .and(warp::any().map(move || cloned_jwt_secret_bytes.clone()))
.and(warp::any().map(move || ws_senders.clone())) .and(warp::any().map(move || ws_senders.clone()))
.and(warp::any().map(move || ws_path_bindings.clone()))
.and(warp::any().map(move || cloned_msg_tx.clone())) .and(warp::any().map(move || cloned_msg_tx.clone()))
.and(warp::any().map(move || cloned_print_tx.clone())) .and(warp::any().map(move || cloned_print_tx.clone()))
.map( .map(ws_handler);
|ws_connection: Ws,
our: Arc<String>,
jwt_secret_bytes: Arc<Vec<u8>>,
ws_senders: WebSocketSenders,
send_to_loop: MessageSender,
print_tx: PrintSender| {
ws_connection.on_upgrade(move |ws: WebSocket| async move {
maintain_websocket(
ws,
our,
jwt_secret_bytes,
ws_senders,
send_to_loop,
print_tx,
)
.await
})
},
);
// filter to receive and handle login requests // filter to receive and handle login requests
let cloned_our = our.clone(); let cloned_our = our.clone();
@ -247,6 +245,147 @@ async fn login_handler(
} }
} }
async fn ws_handler (
ws_connection: Ws,
path: warp::path::FullPath,
headers: warp::http::HeaderMap,
our: Arc<String>,
jwt_secret_bytes: Arc<Vec<u8>>,
ws_senders: WebSocketSenders,
ws_path_bindings: WsPathBindings,
send_to_loop: MessageSender,
print_tx: PrintSender,
) {
let original_path = normalize_path(path.as_str());
let _ = print_tx.send(Printout{
verbosity: 1,
content: format!("got ws request for {original_path}")
});
let serialized_headers = serialize_headers(&headers);
let ws_path_bindings = ws_path_bindings.read().await;
if let Ok(route) = ws_path_bindings.recognize(&original_path) {
let bound_path = route.handler();
if bound_path.authenticated {
match serialized_headers.get("cookie") {
Some(auth_token) => {
if let Ok(our_name) = verify_auth_token(&auth_token, &jwt_secret_bytes) {
if our_name != *our {
return;
}
} else {
return;
}
}
None => {
return;
}
}
}
ws_connection.on_upgrade(move |ws: WebSocket| async move {
let (mut write_stream, mut read_stream) = ws.split();
let _ = print_tx.send(Printout{
verbosity: 1,
content: format!("got new client websocket connection")
}).await;
let ws_channel_id: u32 = rand::random();
let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100);
ws_senders.insert(ws_channel_id, (bound_path.app.clone(), ws_sender));
let _ = send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.to_string(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: Address {
node: our.clone().to_string(),
process: bound_path.app.clone(),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
ipc: serde_json::to_vec(&HttpServerRequest::WebSocketOpen(ws_channel_id)).unwrap(),
metadata: Some("ws".into()),
}),
payload: None,
signed_capabilities: None,
}).await;
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("websocket channel {ws_channel_id} opened"),
});
loop {
tokio::select! {
read = read_stream.next() => {
match read {
_ => {
websocket_close(ws_channel_id, bound_path.app.clone(), &ws_senders, &send_to_loop).await;
}
Some(Ok(msg)) => {
let _ = send_to_loop.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.to_string(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: Address {
node: our.to_string(),
process: bound_path.app.clone(),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
ipc: serde_json::to_vec(&HttpServerRequest::WebSocketPush {
channel_id: ws_channel_id,
message_type: WsMessageType::Binary,
}).unwrap(),
metadata: Some("ws".into()),
}),
payload: Some(Payload {
mime: None,
bytes: msg.into_bytes(),
}),
signed_capabilities: None,
});
}
}
}
Some(outgoing) = ws_receiver.recv() => {
match write_stream.send(outgoing).await {
Ok(()) => continue,
Err(_) => {
websocket_close(ws_channel_id, bound_path.app.clone(), &ws_senders, &send_to_loop).await;
}
}
}
}
}
let stream = write_stream.reunite(read_stream).unwrap();
let _ = stream.close().await;
});
} else {
return;
}
}
async fn http_handler( async fn http_handler(
method: warp::http::Method, method: warp::http::Method,
socket_addr: Option<SocketAddr>, socket_addr: Option<SocketAddr>,
@ -521,6 +660,7 @@ async fn maintain_websocket(
our: Arc<String>, our: Arc<String>,
jwt_secret_bytes: Arc<Vec<u8>>, jwt_secret_bytes: Arc<Vec<u8>>,
ws_senders: WebSocketSenders, ws_senders: WebSocketSenders,
path_bindings: PathBindings,
send_to_loop: MessageSender, send_to_loop: MessageSender,
print_tx: PrintSender, print_tx: PrintSender,
) { ) {
@ -754,6 +894,7 @@ async fn handle_app_message(
km: KernelMessage, km: KernelMessage,
http_response_senders: HttpResponseSenders, http_response_senders: HttpResponseSenders,
path_bindings: PathBindings, path_bindings: PathBindings,
ws_path_bindings: WsPathBindings,
ws_senders: WebSocketSenders, ws_senders: WebSocketSenders,
send_to_loop: MessageSender, send_to_loop: MessageSender,
) { ) {
@ -821,6 +962,21 @@ async fn handle_app_message(
return; return;
}; };
match message { match message {
HttpServerAction::BindWs {
mut path,
authenticated,
local_only,
} => {
let mut ws_path_bindings = ws_path_bindings.write().await;
ws_path_bindings.add(
&normalize_path(&path),
BoundWsPath {
app: km.source.process.clone(),
authenticated,
local_only,
}
);
}
HttpServerAction::Bind { HttpServerAction::Bind {
mut path, mut path,
authenticated, authenticated,

View File

@ -83,6 +83,13 @@ pub enum HttpClientError {
/// with the shape Result<(), HttpServerActionError> serialized to JSON. /// with the shape Result<(), HttpServerActionError> serialized to JSON.
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum HttpServerAction { pub enum HttpServerAction {
// Binding an address for websockets. Doesn't need a cache since does not serve assets
// and might not need local_only.
BindWs {
path: String,
authenticated: bool,
local_only: bool,
},
/// Bind expects a payload if and only if `cache` is TRUE. The payload should /// Bind expects a payload if and only if `cache` is TRUE. The payload should
/// be the static file to serve at this path. /// be the static file to serve at this path.
Bind { Bind {