mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-26 10:06:44 +03:00
Merge branch 'main' into bp/fs2
This commit is contained in:
commit
5265bc536f
@ -25,7 +25,7 @@ type HttpSender = tokio::sync::oneshot::Sender<(HttpResponse, Vec<u8>)>;
|
|||||||
/// mapping from an open websocket connection to a channel that will ingest
|
/// mapping from an open websocket connection to a channel that will ingest
|
||||||
/// WebSocketPush messages from the app that handles the connection, and
|
/// WebSocketPush messages from the app that handles the connection, and
|
||||||
/// send them to the connection.
|
/// send them to the connection.
|
||||||
type WebSocketSenders = Arc<DashMap<u32, (ProcessId, WebSocketSender)>>;
|
type WebSocketSenders = Arc<DashMap<u64, (ProcessId, WebSocketSender)>>;
|
||||||
type WebSocketSender = tokio::sync::mpsc::Sender<warp::ws::Message>;
|
type WebSocketSender = tokio::sync::mpsc::Sender<warp::ws::Message>;
|
||||||
|
|
||||||
type PathBindings = Arc<RwLock<Router<BoundPath>>>;
|
type PathBindings = Arc<RwLock<Router<BoundPath>>>;
|
||||||
@ -127,31 +127,20 @@ async fn serve(
|
|||||||
let cloned_msg_tx = send_to_loop.clone();
|
let cloned_msg_tx = send_to_loop.clone();
|
||||||
let cloned_our = our.clone();
|
let cloned_our = our.clone();
|
||||||
let cloned_jwt_secret_bytes = jwt_secret_bytes.clone();
|
let cloned_jwt_secret_bytes = jwt_secret_bytes.clone();
|
||||||
let cloned_print_tx = print_tx.clone();
|
|
||||||
let ws_route = warp::path::end()
|
let ws_route = warp::path::end()
|
||||||
.and(warp::ws())
|
.and(warp::ws())
|
||||||
.and(warp::any().map(move || cloned_our.clone()))
|
.and(warp::any().map(move || cloned_our.clone()))
|
||||||
.and(warp::any().map(move || cloned_jwt_secret_bytes.clone()))
|
.and(warp::any().map(move || cloned_jwt_secret_bytes.clone()))
|
||||||
.and(warp::any().map(move || ws_senders.clone()))
|
.and(warp::any().map(move || ws_senders.clone()))
|
||||||
.and(warp::any().map(move || cloned_msg_tx.clone()))
|
.and(warp::any().map(move || cloned_msg_tx.clone()))
|
||||||
.and(warp::any().map(move || cloned_print_tx.clone()))
|
|
||||||
.map(
|
.map(
|
||||||
|ws_connection: Ws,
|
|ws_connection: Ws,
|
||||||
our: Arc<String>,
|
our: Arc<String>,
|
||||||
jwt_secret_bytes: Arc<Vec<u8>>,
|
jwt_secret_bytes: Arc<Vec<u8>>,
|
||||||
ws_senders: WebSocketSenders,
|
ws_senders: WebSocketSenders,
|
||||||
send_to_loop: MessageSender,
|
send_to_loop: MessageSender| {
|
||||||
print_tx: PrintSender| {
|
|
||||||
ws_connection.on_upgrade(move |ws: WebSocket| async move {
|
ws_connection.on_upgrade(move |ws: WebSocket| async move {
|
||||||
maintain_websocket(
|
maintain_websocket(ws, our, jwt_secret_bytes, ws_senders, send_to_loop).await
|
||||||
ws,
|
|
||||||
our,
|
|
||||||
jwt_secret_bytes,
|
|
||||||
ws_senders,
|
|
||||||
send_to_loop,
|
|
||||||
print_tx,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
@ -390,7 +379,6 @@ async fn maintain_websocket(
|
|||||||
jwt_secret_bytes: Arc<Vec<u8>>,
|
jwt_secret_bytes: Arc<Vec<u8>>,
|
||||||
ws_senders: WebSocketSenders,
|
ws_senders: WebSocketSenders,
|
||||||
send_to_loop: MessageSender,
|
send_to_loop: MessageSender,
|
||||||
_print_tx: PrintSender,
|
|
||||||
) {
|
) {
|
||||||
let (mut write_stream, mut read_stream) = ws.split();
|
let (mut write_stream, mut read_stream) = ws.split();
|
||||||
|
|
||||||
@ -432,7 +420,7 @@ async fn maintain_websocket(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let ws_channel_id: u32 = rand::random();
|
let ws_channel_id: u64 = rand::random();
|
||||||
let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100);
|
let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100);
|
||||||
ws_senders.insert(ws_channel_id, (owner_process.clone(), ws_sender));
|
ws_senders.insert(ws_channel_id, (owner_process.clone(), ws_sender));
|
||||||
|
|
||||||
@ -542,7 +530,7 @@ async fn maintain_websocket(
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn websocket_close(
|
async fn websocket_close(
|
||||||
channel_id: u32,
|
channel_id: u64,
|
||||||
process: ProcessId,
|
process: ProcessId,
|
||||||
ws_senders: &WebSocketSenders,
|
ws_senders: &WebSocketSenders,
|
||||||
send_to_loop: &MessageSender,
|
send_to_loop: &MessageSender,
|
||||||
|
@ -72,19 +72,19 @@ pub enum HttpServerAction {
|
|||||||
cache: bool,
|
cache: bool,
|
||||||
},
|
},
|
||||||
/// Processes will RECEIVE this kind of request when a client connects to them.
|
/// Processes will RECEIVE this kind of request when a client connects to them.
|
||||||
/// If a process does not want this websocket open, they should issue a *request*
|
/// If a process does not want this websocket open, they can respond with an
|
||||||
/// containing a [`enum@HttpServerAction::WebSocketClose`] message and this channel ID.
|
/// [`enum@HttpServerAction::WebSocketClose`] message.
|
||||||
WebSocketOpen(u32),
|
WebSocketOpen(u64),
|
||||||
/// Processes can both SEND and RECEIVE this kind of request.
|
/// Processes can both SEND and RECEIVE this kind of request.
|
||||||
/// When sent, expects a payload containing the WebSocket message bytes to send.
|
/// When sent, expects a payload containing the WebSocket message bytes to send.
|
||||||
WebSocketPush {
|
WebSocketPush {
|
||||||
channel_id: u32,
|
channel_id: u64,
|
||||||
message_type: WsMessageType,
|
message_type: WsMessageType,
|
||||||
},
|
},
|
||||||
/// Processes can both SEND and RECEIVE this kind of request. Sending will
|
/// Processes can both SEND and RECEIVE this kind of request. Sending will
|
||||||
/// close a socket the process controls. Receiving will indicate that the
|
/// close a socket the process controls. Receiving will indicate that the
|
||||||
/// client closed the socket.
|
/// client closed the socket.
|
||||||
WebSocketClose(u32),
|
WebSocketClose(u64),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The possible message types for WebSocketPush. Ping and Pong are limited to 125 bytes
|
/// The possible message types for WebSocketPush. Ping and Pong are limited to 125 bytes
|
||||||
@ -129,7 +129,7 @@ pub struct WsRegister {
|
|||||||
/// Structure sent from this server to client websocket upon opening a new connection.
|
/// Structure sent from this server to client websocket upon opening a new connection.
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct WsRegisterResponse {
|
pub struct WsRegisterResponse {
|
||||||
pub channel_id: u32,
|
pub channel_id: u64,
|
||||||
// TODO symmetric key exchange here
|
// TODO symmetric key exchange here
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user