Merge pull request #81 from uqbar-dao/da/ws-fix

websockets fixed
This commit is contained in:
tadad 2023-11-30 22:18:08 +02:00 committed by GitHub
commit dbcf5757d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 9 deletions

View File

@ -25,7 +25,7 @@ type HttpSender = tokio::sync::oneshot::Sender<(HttpResponse, Vec<u8>)>;
/// mapping from an open websocket connection to a channel that will ingest /// mapping from an open websocket connection to a channel that will ingest
/// WebSocketPush messages from the app that handles the connection, and /// WebSocketPush messages from the app that handles the connection, and
/// send them to the connection. /// send them to the connection.
type WebSocketSenders = Arc<DashMap<u64, (ProcessId, WebSocketSender)>>; type WebSocketSenders = Arc<DashMap<u32, (ProcessId, WebSocketSender)>>;
type WebSocketSender = tokio::sync::mpsc::Sender<warp::ws::Message>; type WebSocketSender = tokio::sync::mpsc::Sender<warp::ws::Message>;
type PathBindings = Arc<RwLock<Router<BoundPath>>>; type PathBindings = Arc<RwLock<Router<BoundPath>>>;
@ -127,20 +127,31 @@ async fn serve(
let cloned_msg_tx = send_to_loop.clone(); let cloned_msg_tx = send_to_loop.clone();
let cloned_our = our.clone(); let cloned_our = our.clone();
let cloned_jwt_secret_bytes = jwt_secret_bytes.clone(); let cloned_jwt_secret_bytes = jwt_secret_bytes.clone();
let cloned_print_tx = print_tx.clone();
let ws_route = warp::path::end() let ws_route = warp::path::end()
.and(warp::ws()) .and(warp::ws())
.and(warp::any().map(move || cloned_our.clone())) .and(warp::any().map(move || cloned_our.clone()))
.and(warp::any().map(move || cloned_jwt_secret_bytes.clone())) .and(warp::any().map(move || cloned_jwt_secret_bytes.clone()))
.and(warp::any().map(move || ws_senders.clone())) .and(warp::any().map(move || ws_senders.clone()))
.and(warp::any().map(move || cloned_msg_tx.clone())) .and(warp::any().map(move || cloned_msg_tx.clone()))
.and(warp::any().map(move || cloned_print_tx.clone()))
.map( .map(
|ws_connection: Ws, |ws_connection: Ws,
our: Arc<String>, our: Arc<String>,
jwt_secret_bytes: Arc<Vec<u8>>, jwt_secret_bytes: Arc<Vec<u8>>,
ws_senders: WebSocketSenders, ws_senders: WebSocketSenders,
send_to_loop: MessageSender| { send_to_loop: MessageSender,
print_tx: PrintSender| {
ws_connection.on_upgrade(move |ws: WebSocket| async move { ws_connection.on_upgrade(move |ws: WebSocket| async move {
maintain_websocket(ws, our, jwt_secret_bytes, ws_senders, send_to_loop).await maintain_websocket(
ws,
our,
jwt_secret_bytes,
ws_senders,
send_to_loop,
print_tx,
)
.await
}) })
}, },
); );
@ -379,6 +390,7 @@ async fn maintain_websocket(
jwt_secret_bytes: Arc<Vec<u8>>, jwt_secret_bytes: Arc<Vec<u8>>,
ws_senders: WebSocketSenders, ws_senders: WebSocketSenders,
send_to_loop: MessageSender, send_to_loop: MessageSender,
_print_tx: PrintSender,
) { ) {
let (mut write_stream, mut read_stream) = ws.split(); let (mut write_stream, mut read_stream) = ws.split();
@ -420,7 +432,7 @@ async fn maintain_websocket(
return; return;
} }
let ws_channel_id: u64 = rand::random(); let ws_channel_id: u32 = rand::random();
let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100); let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100);
ws_senders.insert(ws_channel_id, (owner_process.clone(), ws_sender)); ws_senders.insert(ws_channel_id, (owner_process.clone(), ws_sender));
@ -505,7 +517,7 @@ async fn maintain_websocket(
} }
async fn websocket_close( async fn websocket_close(
channel_id: u64, channel_id: u32,
process: ProcessId, process: ProcessId,
ws_senders: &WebSocketSenders, ws_senders: &WebSocketSenders,
send_to_loop: &MessageSender, send_to_loop: &MessageSender,

View File

@ -74,17 +74,17 @@ pub enum HttpServerAction {
/// Processes will RECEIVE this kind of request when a client connects to them. /// Processes will RECEIVE this kind of request when a client connects to them.
/// If a process does not want this websocket open, they can respond with an /// If a process does not want this websocket open, they can respond with an
/// [`enum@HttpServerAction::WebSocketClose`] message. /// [`enum@HttpServerAction::WebSocketClose`] message.
WebSocketOpen(u64), WebSocketOpen(u32),
/// Processes can both SEND and RECEIVE this kind of request. /// Processes can both SEND and RECEIVE this kind of request.
/// When sent, expects a payload containing the WebSocket message bytes to send. /// When sent, expects a payload containing the WebSocket message bytes to send.
WebSocketPush { WebSocketPush {
channel_id: u64, channel_id: u32,
message_type: WsMessageType, message_type: WsMessageType,
}, },
/// Processes can both SEND and RECEIVE this kind of request. Sending will /// Processes can both SEND and RECEIVE this kind of request. Sending will
/// close a socket the process controls. Receiving will indicate that the /// close a socket the process controls. Receiving will indicate that the
/// client closed the socket. /// client closed the socket.
WebSocketClose(u64), WebSocketClose(u32),
} }
/// The possible message types for WebSocketPush. Ping and Pong are limited to 125 bytes /// The possible message types for WebSocketPush. Ping and Pong are limited to 125 bytes
@ -129,7 +129,7 @@ pub struct WsRegister {
/// Structure sent from this server to client websocket upon opening a new connection. /// Structure sent from this server to client websocket upon opening a new connection.
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct WsRegisterResponse { pub struct WsRegisterResponse {
pub channel_id: u64, pub channel_id: u32,
// TODO symmetric key exchange here // TODO symmetric key exchange here
} }