mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-11-23 03:44:04 +03:00
Merge branch 'v0.4.0' into bp/fs2
This commit is contained in:
commit
7141338b45
@ -334,15 +334,21 @@ fn handle_local_request(
|
|||||||
|
|
||||||
// add zip bytes
|
// add zip bytes
|
||||||
payload.mime = Some("application/zip".to_string());
|
payload.mime = Some("application/zip".to_string());
|
||||||
Request::new()
|
let response = Request::new()
|
||||||
.target(Address::from_str("our@vfs:sys:uqbar")?)
|
.target(Address::from_str("our@vfs:sys:uqbar")?)
|
||||||
.ipc(serde_json::to_vec(&kt::VfsRequest {
|
.ipc(serde_json::to_vec(&kt::VfsRequest {
|
||||||
path: drive.clone(),
|
path: drive.clone(),
|
||||||
action: kt::VfsAction::AddZip,
|
action: kt::VfsAction::AddZip,
|
||||||
})?)
|
})?)
|
||||||
.payload(payload.clone())
|
.payload(payload.clone())
|
||||||
.send_and_await_response(5)?
|
.send_and_await_response(5)?.unwrap();
|
||||||
.unwrap();
|
let Message::Response { ipc: ref vfs_ipc, .. } = response else {
|
||||||
|
panic!("app_store: send_and_await_response must return Response");
|
||||||
|
};
|
||||||
|
let vfs_ipc = serde_json::from_slice::<serde_json::Value>(vfs_ipc)?;
|
||||||
|
if vfs_ipc == serde_json::json!({"Err": "NoCap"}) {
|
||||||
|
return Err(anyhow::anyhow!("cannot add NewPackage: do not have capability to access vfs"));
|
||||||
|
}
|
||||||
|
|
||||||
// save the zip file itself in VFS for sharing with other nodes
|
// save the zip file itself in VFS for sharing with other nodes
|
||||||
// call it <package>.zip
|
// call it <package>.zip
|
||||||
|
2
modules/chess/Cargo.lock
generated
2
modules/chess/Cargo.lock
generated
@ -632,7 +632,7 @@ checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "uqbar_process_lib"
|
name = "uqbar_process_lib"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?rev=1ce0d41#1ce0d412169e795c2a99464563b42ae2a2628d77"
|
source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?rev=65e07e4#65e07e49553a5fa3340ff7c3d9cf3340a53610c8"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"bincode",
|
"bincode",
|
||||||
|
@ -16,7 +16,7 @@ pleco = "0.5"
|
|||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
url = "*"
|
url = "*"
|
||||||
uqbar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "1ce0d41" }
|
uqbar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "65e07e4" }
|
||||||
wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "efcc759" }
|
wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "efcc759" }
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
|
File diff suppressed because one or more lines are too long
@ -147,6 +147,8 @@ fn initialize(our: Address) {
|
|||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
http::bind_http_path("/games", true, false).unwrap();
|
http::bind_http_path("/games", true, false).unwrap();
|
||||||
|
// Allow websockets to be opened at / (our process ID will be prepended).
|
||||||
|
http::bind_ws_path("/", true, false).unwrap();
|
||||||
|
|
||||||
// Grab our state, then enter the main event loop.
|
// Grab our state, then enter the main event loop.
|
||||||
let mut state: ChessState = load_chess_state();
|
let mut state: ChessState = load_chess_state();
|
||||||
@ -178,7 +180,7 @@ fn handle_request(our: &Address, message: &Message, state: &mut ChessState) -> a
|
|||||||
// chess protocol request, we *await* its response in-place. This is appropriate
|
// chess protocol request, we *await* its response in-place. This is appropriate
|
||||||
// for direct node<>node comms, less appropriate for other circumstances...
|
// for direct node<>node comms, less appropriate for other circumstances...
|
||||||
if !message.is_request() {
|
if !message.is_request() {
|
||||||
return Err(anyhow::anyhow!("message was response"));
|
return Ok(());
|
||||||
}
|
}
|
||||||
// If the request is from another node, handle it as an incoming request.
|
// If the request is from another node, handle it as an incoming request.
|
||||||
// Note that we can enforce the ProcessId as well, but it shouldn't be a trusted
|
// Note that we can enforce the ProcessId as well, but it shouldn't be a trusted
|
||||||
@ -221,8 +223,10 @@ fn handle_request(our: &Address, message: &Message, state: &mut ChessState) -> a
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
http::HttpServerRequest::WebSocketOpen(channel_id) => {
|
http::HttpServerRequest::WebSocketOpen { path, channel_id } => {
|
||||||
// client frontend opened a websocket
|
// We know this is authenticated and unencrypted because we only
|
||||||
|
// bound one path, the root path. So we know that client
|
||||||
|
// frontend opened a websocket and can send updates
|
||||||
state.clients.insert(channel_id);
|
state.clients.insert(channel_id);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -295,7 +299,7 @@ fn handle_chess_request(
|
|||||||
// If we don't have a game with them, reject the move.
|
// If we don't have a game with them, reject the move.
|
||||||
return Response::new()
|
return Response::new()
|
||||||
.ipc(serde_json::to_vec(&ChessResponse::MoveRejected)?)
|
.ipc(serde_json::to_vec(&ChessResponse::MoveRejected)?)
|
||||||
.send()
|
.send();
|
||||||
};
|
};
|
||||||
// Convert the saved board to one we can manipulate.
|
// Convert the saved board to one we can manipulate.
|
||||||
let mut board = Board::from_fen(&game.board).unwrap();
|
let mut board = Board::from_fen(&game.board).unwrap();
|
||||||
@ -359,9 +363,12 @@ fn handle_local_request(
|
|||||||
let Ok(Message::Response { ref ipc, .. }) = Request::new()
|
let Ok(Message::Response { ref ipc, .. }) = Request::new()
|
||||||
.target((game_id.as_ref(), our.process.clone()))
|
.target((game_id.as_ref(), our.process.clone()))
|
||||||
.ipc(serde_json::to_vec(&action)?)
|
.ipc(serde_json::to_vec(&action)?)
|
||||||
.send_and_await_response(5)? else {
|
.send_and_await_response(5)?
|
||||||
return Err(anyhow::anyhow!("other player did not respond properly to new game request"))
|
else {
|
||||||
};
|
return Err(anyhow::anyhow!(
|
||||||
|
"other player did not respond properly to new game request"
|
||||||
|
));
|
||||||
|
};
|
||||||
// If they accept, create a new game -- otherwise, error out.
|
// If they accept, create a new game -- otherwise, error out.
|
||||||
if serde_json::from_slice::<ChessResponse>(ipc)? != ChessResponse::NewGameAccepted {
|
if serde_json::from_slice::<ChessResponse>(ipc)? != ChessResponse::NewGameAccepted {
|
||||||
return Err(anyhow::anyhow!("other player rejected new game request!"));
|
return Err(anyhow::anyhow!("other player rejected new game request!"));
|
||||||
@ -402,9 +409,12 @@ fn handle_local_request(
|
|||||||
let Ok(Message::Response { ref ipc, .. }) = Request::new()
|
let Ok(Message::Response { ref ipc, .. }) = Request::new()
|
||||||
.target((game_id.as_ref(), our.process.clone()))
|
.target((game_id.as_ref(), our.process.clone()))
|
||||||
.ipc(serde_json::to_vec(&action)?)
|
.ipc(serde_json::to_vec(&action)?)
|
||||||
.send_and_await_response(5)? else {
|
.send_and_await_response(5)?
|
||||||
return Err(anyhow::anyhow!("other player did not respond properly to our move"))
|
else {
|
||||||
};
|
return Err(anyhow::anyhow!(
|
||||||
|
"other player did not respond properly to our move"
|
||||||
|
));
|
||||||
|
};
|
||||||
if serde_json::from_slice::<ChessResponse>(ipc)? != ChessResponse::MoveAccepted {
|
if serde_json::from_slice::<ChessResponse>(ipc)? != ChessResponse::MoveAccepted {
|
||||||
return Err(anyhow::anyhow!("other player rejected our move"));
|
return Err(anyhow::anyhow!("other player rejected our move"));
|
||||||
}
|
}
|
||||||
@ -487,9 +497,12 @@ fn handle_http_request(
|
|||||||
white: player_white.clone(),
|
white: player_white.clone(),
|
||||||
black: player_black.clone(),
|
black: player_black.clone(),
|
||||||
})?)
|
})?)
|
||||||
.send_and_await_response(5)? else {
|
.send_and_await_response(5)?
|
||||||
return Err(anyhow::anyhow!("other player did not respond properly to new game request"))
|
else {
|
||||||
};
|
return Err(anyhow::anyhow!(
|
||||||
|
"other player did not respond properly to new game request"
|
||||||
|
));
|
||||||
|
};
|
||||||
// if they accept, create a new game
|
// if they accept, create a new game
|
||||||
// otherwise, should surface error to FE...
|
// otherwise, should surface error to FE...
|
||||||
if serde_json::from_slice::<ChessResponse>(msg.ipc())? != ChessResponse::NewGameAccepted
|
if serde_json::from_slice::<ChessResponse>(msg.ipc())? != ChessResponse::NewGameAccepted
|
||||||
@ -553,9 +566,12 @@ fn handle_http_request(
|
|||||||
game_id: game_id.to_string(),
|
game_id: game_id.to_string(),
|
||||||
move_str: move_str.to_string(),
|
move_str: move_str.to_string(),
|
||||||
})?)
|
})?)
|
||||||
.send_and_await_response(5)? else {
|
.send_and_await_response(5)?
|
||||||
return Err(anyhow::anyhow!("other player did not respond properly to our move"))
|
else {
|
||||||
};
|
return Err(anyhow::anyhow!(
|
||||||
|
"other player did not respond properly to our move"
|
||||||
|
));
|
||||||
|
};
|
||||||
if serde_json::from_slice::<ChessResponse>(msg.ipc())? != ChessResponse::MoveAccepted {
|
if serde_json::from_slice::<ChessResponse>(msg.ipc())? != ChessResponse::MoveAccepted {
|
||||||
return Err(anyhow::anyhow!("other player rejected our move"));
|
return Err(anyhow::anyhow!("other player rejected our move"));
|
||||||
}
|
}
|
||||||
|
@ -36,6 +36,7 @@ type WebSocketSenders = Arc<DashMap<u32, (ProcessId, WebSocketSender)>>;
|
|||||||
type WebSocketSender = tokio::sync::mpsc::Sender<warp::ws::Message>;
|
type WebSocketSender = tokio::sync::mpsc::Sender<warp::ws::Message>;
|
||||||
|
|
||||||
type PathBindings = Arc<RwLock<Router<BoundPath>>>;
|
type PathBindings = Arc<RwLock<Router<BoundPath>>>;
|
||||||
|
type WsPathBindings = Arc<RwLock<Router<BoundWsPath>>>;
|
||||||
|
|
||||||
struct BoundPath {
|
struct BoundPath {
|
||||||
pub app: ProcessId,
|
pub app: ProcessId,
|
||||||
@ -45,6 +46,13 @@ struct BoundPath {
|
|||||||
pub static_content: Option<Payload>, // TODO store in filesystem and cache
|
pub static_content: Option<Payload>, // TODO store in filesystem and cache
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct BoundWsPath {
|
||||||
|
pub app: ProcessId,
|
||||||
|
pub secure_subdomain: Option<String>,
|
||||||
|
pub authenticated: bool,
|
||||||
|
pub encrypted: bool, // TODO use
|
||||||
|
}
|
||||||
|
|
||||||
/// HTTP server: a runtime module that handles HTTP requests at a given port.
|
/// HTTP server: a runtime module that handles HTTP requests at a given port.
|
||||||
/// The server accepts bindings-requests from apps. These can be used in two ways:
|
/// The server accepts bindings-requests from apps. These can be used in two ways:
|
||||||
///
|
///
|
||||||
@ -87,11 +95,15 @@ pub async fn http_server(
|
|||||||
bindings_map.add("/rpc:sys:uqbar/message", rpc_bound_path);
|
bindings_map.add("/rpc:sys:uqbar/message", rpc_bound_path);
|
||||||
let path_bindings: PathBindings = Arc::new(RwLock::new(bindings_map));
|
let path_bindings: PathBindings = Arc::new(RwLock::new(bindings_map));
|
||||||
|
|
||||||
|
// ws path bindings
|
||||||
|
let ws_path_bindings: WsPathBindings = Arc::new(RwLock::new(Router::new()));
|
||||||
|
|
||||||
tokio::spawn(serve(
|
tokio::spawn(serve(
|
||||||
our_name.clone(),
|
our_name.clone(),
|
||||||
our_port,
|
our_port,
|
||||||
http_response_senders.clone(),
|
http_response_senders.clone(),
|
||||||
path_bindings.clone(),
|
path_bindings.clone(),
|
||||||
|
ws_path_bindings.clone(),
|
||||||
ws_senders.clone(),
|
ws_senders.clone(),
|
||||||
encoded_keyfile.clone(),
|
encoded_keyfile.clone(),
|
||||||
jwt_secret_bytes.clone(),
|
jwt_secret_bytes.clone(),
|
||||||
@ -105,6 +117,7 @@ pub async fn http_server(
|
|||||||
km,
|
km,
|
||||||
http_response_senders.clone(),
|
http_response_senders.clone(),
|
||||||
path_bindings.clone(),
|
path_bindings.clone(),
|
||||||
|
ws_path_bindings.clone(),
|
||||||
ws_senders.clone(),
|
ws_senders.clone(),
|
||||||
send_to_loop.clone(),
|
send_to_loop.clone(),
|
||||||
)
|
)
|
||||||
@ -120,6 +133,7 @@ async fn serve(
|
|||||||
our_port: u16,
|
our_port: u16,
|
||||||
http_response_senders: HttpResponseSenders,
|
http_response_senders: HttpResponseSenders,
|
||||||
path_bindings: PathBindings,
|
path_bindings: PathBindings,
|
||||||
|
ws_path_bindings: WsPathBindings,
|
||||||
ws_senders: WebSocketSenders,
|
ws_senders: WebSocketSenders,
|
||||||
encoded_keyfile: Arc<Vec<u8>>,
|
encoded_keyfile: Arc<Vec<u8>>,
|
||||||
jwt_secret_bytes: Arc<Vec<u8>>,
|
jwt_secret_bytes: Arc<Vec<u8>>,
|
||||||
@ -138,33 +152,17 @@ async fn serve(
|
|||||||
let cloned_our = our.clone();
|
let cloned_our = our.clone();
|
||||||
let cloned_jwt_secret_bytes = jwt_secret_bytes.clone();
|
let cloned_jwt_secret_bytes = jwt_secret_bytes.clone();
|
||||||
let cloned_print_tx = print_tx.clone();
|
let cloned_print_tx = print_tx.clone();
|
||||||
let ws_route = warp::path::end()
|
let ws_route = warp::ws()
|
||||||
.and(warp::ws())
|
.and(warp::path::full())
|
||||||
|
.and(warp::filters::host::optional())
|
||||||
|
.and(warp::filters::header::headers_cloned())
|
||||||
.and(warp::any().map(move || cloned_our.clone()))
|
.and(warp::any().map(move || cloned_our.clone()))
|
||||||
.and(warp::any().map(move || cloned_jwt_secret_bytes.clone()))
|
.and(warp::any().map(move || cloned_jwt_secret_bytes.clone()))
|
||||||
.and(warp::any().map(move || ws_senders.clone()))
|
.and(warp::any().map(move || ws_senders.clone()))
|
||||||
|
.and(warp::any().map(move || ws_path_bindings.clone()))
|
||||||
.and(warp::any().map(move || cloned_msg_tx.clone()))
|
.and(warp::any().map(move || cloned_msg_tx.clone()))
|
||||||
.and(warp::any().map(move || cloned_print_tx.clone()))
|
.and(warp::any().map(move || cloned_print_tx.clone()))
|
||||||
.map(
|
.and_then(ws_handler);
|
||||||
|ws_connection: Ws,
|
|
||||||
our: Arc<String>,
|
|
||||||
jwt_secret_bytes: Arc<Vec<u8>>,
|
|
||||||
ws_senders: WebSocketSenders,
|
|
||||||
send_to_loop: MessageSender,
|
|
||||||
print_tx: PrintSender| {
|
|
||||||
ws_connection.on_upgrade(move |ws: WebSocket| async move {
|
|
||||||
maintain_websocket(
|
|
||||||
ws,
|
|
||||||
our,
|
|
||||||
jwt_secret_bytes,
|
|
||||||
ws_senders,
|
|
||||||
send_to_loop,
|
|
||||||
print_tx,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
})
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// filter to receive and handle login requests
|
// filter to receive and handle login requests
|
||||||
let cloned_our = our.clone();
|
let cloned_our = our.clone();
|
||||||
@ -250,6 +248,84 @@ async fn login_handler(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn ws_handler(
|
||||||
|
ws_connection: Ws,
|
||||||
|
path: warp::path::FullPath,
|
||||||
|
host: Option<Authority>,
|
||||||
|
headers: warp::http::HeaderMap,
|
||||||
|
our: Arc<String>,
|
||||||
|
jwt_secret_bytes: Arc<Vec<u8>>,
|
||||||
|
ws_senders: WebSocketSenders,
|
||||||
|
ws_path_bindings: WsPathBindings,
|
||||||
|
send_to_loop: MessageSender,
|
||||||
|
print_tx: PrintSender,
|
||||||
|
) -> Result<impl warp::Reply, warp::Rejection> {
|
||||||
|
let original_path = normalize_path(path.as_str());
|
||||||
|
let _ = print_tx.send(Printout {
|
||||||
|
verbosity: 1,
|
||||||
|
content: format!("got ws request for {original_path}"),
|
||||||
|
});
|
||||||
|
|
||||||
|
let serialized_headers = serialize_headers(&headers);
|
||||||
|
let ws_path_bindings = ws_path_bindings.read().await;
|
||||||
|
|
||||||
|
let Ok(route) = ws_path_bindings.recognize(&original_path) else {
|
||||||
|
return Err(warp::reject::not_found());
|
||||||
|
};
|
||||||
|
|
||||||
|
let bound_path = route.handler();
|
||||||
|
if let Some(ref subdomain) = bound_path.secure_subdomain {
|
||||||
|
let _ = print_tx
|
||||||
|
.send(Printout {
|
||||||
|
verbosity: 1,
|
||||||
|
content: format!(
|
||||||
|
"got request for path {original_path} bound by subdomain {subdomain}"
|
||||||
|
),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
// assert that host matches what this app wants it to be
|
||||||
|
if host.is_none() {
|
||||||
|
return Err(warp::reject::not_found());
|
||||||
|
}
|
||||||
|
let host = host.as_ref().unwrap();
|
||||||
|
// parse out subdomain from host (there can only be one)
|
||||||
|
let request_subdomain = host.host().split('.').next().unwrap_or("");
|
||||||
|
if request_subdomain != subdomain {
|
||||||
|
return Err(warp::reject::not_found());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if bound_path.authenticated {
|
||||||
|
let Some(auth_token) = serialized_headers.get("cookie") else {
|
||||||
|
return Err(warp::reject::not_found());
|
||||||
|
};
|
||||||
|
if !auth_cookie_valid(&our, &auth_token, &jwt_secret_bytes) {
|
||||||
|
return Err(warp::reject::not_found());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let app = bound_path.app.clone();
|
||||||
|
Ok(ws_connection.on_upgrade(move |ws: WebSocket| async move {
|
||||||
|
maintain_websocket(
|
||||||
|
ws,
|
||||||
|
our.clone(),
|
||||||
|
app,
|
||||||
|
// remove process id from beginning of path by splitting into segments
|
||||||
|
// separated by "/" and taking all but the first
|
||||||
|
original_path
|
||||||
|
.split('/')
|
||||||
|
.skip(1)
|
||||||
|
.collect::<Vec<&str>>()
|
||||||
|
.join("/"),
|
||||||
|
jwt_secret_bytes.clone(),
|
||||||
|
ws_senders.clone(),
|
||||||
|
send_to_loop.clone(),
|
||||||
|
print_tx.clone(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
async fn http_handler(
|
async fn http_handler(
|
||||||
method: warp::http::Method,
|
method: warp::http::Method,
|
||||||
socket_addr: Option<SocketAddr>,
|
socket_addr: Option<SocketAddr>,
|
||||||
@ -522,16 +598,14 @@ async fn handle_rpc_message(
|
|||||||
async fn maintain_websocket(
|
async fn maintain_websocket(
|
||||||
ws: WebSocket,
|
ws: WebSocket,
|
||||||
our: Arc<String>,
|
our: Arc<String>,
|
||||||
jwt_secret_bytes: Arc<Vec<u8>>,
|
app: ProcessId,
|
||||||
|
path: String,
|
||||||
|
_jwt_secret_bytes: Arc<Vec<u8>>, // TODO use for encrypted channels
|
||||||
ws_senders: WebSocketSenders,
|
ws_senders: WebSocketSenders,
|
||||||
send_to_loop: MessageSender,
|
send_to_loop: MessageSender,
|
||||||
print_tx: PrintSender,
|
print_tx: PrintSender,
|
||||||
) {
|
) {
|
||||||
let (mut write_stream, mut read_stream) = ws.split();
|
let (mut write_stream, mut read_stream) = ws.split();
|
||||||
|
|
||||||
// first, receive a message from client that contains the target process
|
|
||||||
// and the auth token
|
|
||||||
|
|
||||||
let _ = print_tx
|
let _ = print_tx
|
||||||
.send(Printout {
|
.send(Printout {
|
||||||
verbosity: 1,
|
verbosity: 1,
|
||||||
@ -539,72 +613,10 @@ async fn maintain_websocket(
|
|||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let Some(Ok(register_msg)) = read_stream.next().await else {
|
let channel_id: u32 = rand::random();
|
||||||
// stream closed, exit
|
|
||||||
let _ = print_tx
|
|
||||||
.send(Printout {
|
|
||||||
verbosity: 1,
|
|
||||||
content: format!("client failed to send registration message"),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
let stream = write_stream.reunite(read_stream).unwrap();
|
|
||||||
let _ = stream.close().await;
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
let Ok(ws_register) = serde_json::from_slice::<WsRegister>(register_msg.as_bytes()) else {
|
|
||||||
// stream error, exit
|
|
||||||
let _ = print_tx
|
|
||||||
.send(Printout {
|
|
||||||
verbosity: 1,
|
|
||||||
content: format!("couldn't parse registration message from client"),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
let stream = write_stream.reunite(read_stream).unwrap();
|
|
||||||
let _ = stream.close().await;
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
let Ok(owner_process) = ProcessId::from_str(&ws_register.target_process) else {
|
|
||||||
// invalid process id, exit
|
|
||||||
let _ = print_tx
|
|
||||||
.send(Printout {
|
|
||||||
verbosity: 1,
|
|
||||||
content: format!("client sent malformed process ID"),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
let stream = write_stream.reunite(read_stream).unwrap();
|
|
||||||
let _ = stream.close().await;
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
let _ = print_tx
|
|
||||||
.send(Printout {
|
|
||||||
verbosity: 1,
|
|
||||||
content: format!("channel is intended for {owner_process}"),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let Ok(our_name) = verify_auth_token(&ws_register.auth_token, &jwt_secret_bytes) else {
|
|
||||||
// invalid auth token, exit
|
|
||||||
let stream = write_stream.reunite(read_stream).unwrap();
|
|
||||||
let _ = stream.close().await;
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
if our_name != *our {
|
|
||||||
// invalid auth token, exit
|
|
||||||
let stream = write_stream.reunite(read_stream).unwrap();
|
|
||||||
let _ = stream.close().await;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let ws_channel_id: u32 = rand::random();
|
|
||||||
let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100);
|
let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100);
|
||||||
ws_senders.insert(ws_channel_id, (owner_process.clone(), ws_sender));
|
ws_senders.insert(channel_id, (app.clone(), ws_sender));
|
||||||
|
|
||||||
// send a message to the process associated with this channel
|
|
||||||
// notifying them that the channel is now open
|
|
||||||
let _ = send_to_loop
|
let _ = send_to_loop
|
||||||
.send(KernelMessage {
|
.send(KernelMessage {
|
||||||
id: rand::random(),
|
id: rand::random(),
|
||||||
@ -613,14 +625,15 @@ async fn maintain_websocket(
|
|||||||
process: HTTP_SERVER_PROCESS_ID.clone(),
|
process: HTTP_SERVER_PROCESS_ID.clone(),
|
||||||
},
|
},
|
||||||
target: Address {
|
target: Address {
|
||||||
node: our.to_string(),
|
node: our.clone().to_string(),
|
||||||
process: owner_process.clone(),
|
process: app.clone(),
|
||||||
},
|
},
|
||||||
rsvp: None,
|
rsvp: None,
|
||||||
message: Message::Request(Request {
|
message: Message::Request(Request {
|
||||||
inherit: false,
|
inherit: false,
|
||||||
expects_response: None,
|
expects_response: None,
|
||||||
ipc: serde_json::to_vec(&HttpServerRequest::WebSocketOpen(ws_channel_id)).unwrap(),
|
ipc: serde_json::to_vec(&HttpServerRequest::WebSocketOpen { path, channel_id })
|
||||||
|
.unwrap(),
|
||||||
metadata: Some("ws".into()),
|
metadata: Some("ws".into()),
|
||||||
}),
|
}),
|
||||||
payload: None,
|
payload: None,
|
||||||
@ -628,83 +641,53 @@ async fn maintain_websocket(
|
|||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// respond to the client notifying them that the channel is now open
|
let _ = print_tx.send(Printout {
|
||||||
let Ok(()) = write_stream
|
verbosity: 1,
|
||||||
.send(warp::ws::Message::text(
|
content: format!("websocket channel {channel_id} opened"),
|
||||||
serde_json::to_string(&WsRegisterResponse {
|
});
|
||||||
channel_id: ws_channel_id,
|
|
||||||
})
|
|
||||||
.unwrap(),
|
|
||||||
))
|
|
||||||
.await
|
|
||||||
else {
|
|
||||||
// stream error, exit
|
|
||||||
let stream = write_stream.reunite(read_stream).unwrap();
|
|
||||||
let _ = stream.close().await;
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
let _ = print_tx
|
|
||||||
.send(Printout {
|
|
||||||
verbosity: 1,
|
|
||||||
content: format!("websocket channel {ws_channel_id} opened"),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
read = read_stream.next() => {
|
read = read_stream.next() => {
|
||||||
match read {
|
match read {
|
||||||
None => {
|
|
||||||
// stream closed, remove and exit
|
|
||||||
websocket_close(ws_channel_id, owner_process, &ws_senders, &send_to_loop).await;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Some(Err(_e)) => {
|
|
||||||
// stream error, remove and exit
|
|
||||||
websocket_close(ws_channel_id, owner_process, &ws_senders, &send_to_loop).await;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Some(Ok(msg)) => {
|
Some(Ok(msg)) => {
|
||||||
// forward message to process associated with this channel
|
let _ = send_to_loop.send(KernelMessage {
|
||||||
let _ = send_to_loop
|
id: rand::random(),
|
||||||
.send(KernelMessage {
|
source: Address {
|
||||||
id: rand::random(),
|
node: our.to_string(),
|
||||||
source: Address {
|
process: HTTP_SERVER_PROCESS_ID.clone(),
|
||||||
node: our.to_string(),
|
},
|
||||||
process: HTTP_SERVER_PROCESS_ID.clone(),
|
target: Address {
|
||||||
},
|
node: our.to_string(),
|
||||||
target: Address {
|
process: app.clone(),
|
||||||
node: our.to_string(),
|
},
|
||||||
process: owner_process.clone(),
|
rsvp: None,
|
||||||
},
|
message: Message::Request(Request {
|
||||||
rsvp: None,
|
inherit: false,
|
||||||
message: Message::Request(Request {
|
expects_response: None,
|
||||||
inherit: false,
|
ipc: serde_json::to_vec(&HttpServerRequest::WebSocketPush {
|
||||||
expects_response: None,
|
channel_id,
|
||||||
ipc: serde_json::to_vec(&HttpServerRequest::WebSocketPush {
|
message_type: WsMessageType::Binary,
|
||||||
channel_id: ws_channel_id,
|
}).unwrap(),
|
||||||
message_type: WsMessageType::Binary,
|
metadata: Some("ws".into()),
|
||||||
}).unwrap(),
|
}),
|
||||||
metadata: Some("ws".into()),
|
payload: Some(Payload {
|
||||||
}),
|
mime: None,
|
||||||
payload: Some(Payload {
|
bytes: msg.into_bytes(),
|
||||||
mime: None,
|
}),
|
||||||
bytes: msg.into_bytes(),
|
signed_capabilities: None,
|
||||||
}),
|
});
|
||||||
signed_capabilities: None,
|
}
|
||||||
})
|
_ => {
|
||||||
.await;
|
websocket_close(channel_id, app.clone(), &ws_senders, &send_to_loop).await;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(outgoing) = ws_receiver.recv() => {
|
Some(outgoing) = ws_receiver.recv() => {
|
||||||
// forward message to websocket
|
|
||||||
match write_stream.send(outgoing).await {
|
match write_stream.send(outgoing).await {
|
||||||
Ok(()) => continue,
|
Ok(()) => continue,
|
||||||
Err(_e) => {
|
Err(_) => {
|
||||||
// stream error, remove and exit
|
websocket_close(channel_id, app.clone(), &ws_senders, &send_to_loop).await;
|
||||||
websocket_close(ws_channel_id, owner_process, &ws_senders, &send_to_loop).await;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -757,6 +740,7 @@ async fn handle_app_message(
|
|||||||
km: KernelMessage,
|
km: KernelMessage,
|
||||||
http_response_senders: HttpResponseSenders,
|
http_response_senders: HttpResponseSenders,
|
||||||
path_bindings: PathBindings,
|
path_bindings: PathBindings,
|
||||||
|
ws_path_bindings: WsPathBindings,
|
||||||
ws_senders: WebSocketSenders,
|
ws_senders: WebSocketSenders,
|
||||||
send_to_loop: MessageSender,
|
send_to_loop: MessageSender,
|
||||||
) {
|
) {
|
||||||
@ -921,7 +905,53 @@ async fn handle_app_message(
|
|||||||
}
|
}
|
||||||
send_action_response(km.id, km.source, &send_to_loop, Ok(())).await;
|
send_action_response(km.id, km.source, &send_to_loop, Ok(())).await;
|
||||||
}
|
}
|
||||||
HttpServerAction::WebSocketOpen(_) => {
|
HttpServerAction::WebSocketBind {
|
||||||
|
mut path,
|
||||||
|
authenticated,
|
||||||
|
encrypted,
|
||||||
|
} => {
|
||||||
|
path = if path.starts_with('/') {
|
||||||
|
format!("/{}{}", km.source.process, path)
|
||||||
|
} else {
|
||||||
|
format!("/{}/{}", km.source.process, path)
|
||||||
|
};
|
||||||
|
let mut ws_path_bindings = ws_path_bindings.write().await;
|
||||||
|
ws_path_bindings.add(
|
||||||
|
&normalize_path(&path),
|
||||||
|
BoundWsPath {
|
||||||
|
app: km.source.process.clone(),
|
||||||
|
secure_subdomain: None,
|
||||||
|
authenticated,
|
||||||
|
encrypted,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
send_action_response(km.id, km.source, &send_to_loop, Ok(())).await;
|
||||||
|
}
|
||||||
|
HttpServerAction::WebSocketSecureBind {
|
||||||
|
mut path,
|
||||||
|
encrypted,
|
||||||
|
} => {
|
||||||
|
path = if path.starts_with('/') {
|
||||||
|
format!("/{}{}", km.source.process, path)
|
||||||
|
} else {
|
||||||
|
format!("/{}/{}", km.source.process, path)
|
||||||
|
};
|
||||||
|
let process_id_hash =
|
||||||
|
format!("{:x}", Sha256::digest(km.source.process.to_string()));
|
||||||
|
let subdomain = process_id_hash.split_at(32).0.to_owned();
|
||||||
|
let mut ws_path_bindings = ws_path_bindings.write().await;
|
||||||
|
ws_path_bindings.add(
|
||||||
|
&normalize_path(&path),
|
||||||
|
BoundWsPath {
|
||||||
|
app: km.source.process.clone(),
|
||||||
|
secure_subdomain: Some(subdomain),
|
||||||
|
authenticated: true,
|
||||||
|
encrypted,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
send_action_response(km.id, km.source, &send_to_loop, Ok(())).await;
|
||||||
|
}
|
||||||
|
HttpServerAction::WebSocketOpen { .. } => {
|
||||||
// we cannot receive these, only send them to processes
|
// we cannot receive these, only send them to processes
|
||||||
send_action_response(
|
send_action_response(
|
||||||
km.id,
|
km.id,
|
||||||
|
@ -11,7 +11,10 @@ pub enum HttpServerRequest {
|
|||||||
/// Processes will receive this kind of request when a client connects to them.
|
/// Processes will receive this kind of request when a client connects to them.
|
||||||
/// If a process does not want this websocket open, they should issue a *request*
|
/// If a process does not want this websocket open, they should issue a *request*
|
||||||
/// containing a [`type@HttpServerAction::WebSocketClose`] message and this channel ID.
|
/// containing a [`type@HttpServerAction::WebSocketClose`] message and this channel ID.
|
||||||
WebSocketOpen(u32),
|
WebSocketOpen {
|
||||||
|
path: String,
|
||||||
|
channel_id: u32,
|
||||||
|
},
|
||||||
/// Processes can both SEND and RECEIVE this kind of request
|
/// Processes can both SEND and RECEIVE this kind of request
|
||||||
/// (send as [`type@HttpServerAction::WebSocketPush`]).
|
/// (send as [`type@HttpServerAction::WebSocketPush`]).
|
||||||
/// When received, will contain the message bytes as payload.
|
/// When received, will contain the message bytes as payload.
|
||||||
@ -111,11 +114,23 @@ pub enum HttpServerAction {
|
|||||||
/// payload bytes and serve them as the response to any request to this path.
|
/// payload bytes and serve them as the response to any request to this path.
|
||||||
cache: bool,
|
cache: bool,
|
||||||
},
|
},
|
||||||
|
/// Bind a path to receive incoming WebSocket connections.
|
||||||
|
/// Doesn't need a cache since does not serve assets.
|
||||||
|
WebSocketBind {
|
||||||
|
path: String,
|
||||||
|
authenticated: bool,
|
||||||
|
encrypted: bool,
|
||||||
|
},
|
||||||
|
/// SecureBind is the same as Bind, except that it forces new connections to be made
|
||||||
|
/// from the unique subdomain of the process that bound the path. These are *always*
|
||||||
|
/// authenticated. Since the subdomain is unique, it will require the user to be
|
||||||
|
/// logged in separately to the general domain authentication.
|
||||||
|
WebSocketSecureBind { path: String, encrypted: bool },
|
||||||
/// Processes will RECEIVE this kind of request when a client connects to them.
|
/// Processes will RECEIVE this kind of request when a client connects to them.
|
||||||
|
|
||||||
/// If a process does not want this websocket open, they should issue a *request*
|
/// If a process does not want this websocket open, they should issue a *request*
|
||||||
/// containing a [`type@HttpServerAction::WebSocketClose`] message and this channel ID.
|
/// containing a [`type@HttpServerAction::WebSocketClose`] message and this channel ID.
|
||||||
WebSocketOpen(u32),
|
WebSocketOpen { path: String, channel_id: u32 },
|
||||||
/// When sent, expects a payload containing the WebSocket message bytes to send.
|
/// When sent, expects a payload containing the WebSocket message bytes to send.
|
||||||
WebSocketPush {
|
WebSocketPush {
|
||||||
channel_id: u32,
|
channel_id: u32,
|
||||||
|
@ -21,11 +21,13 @@ pub struct RpcMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Ingest an auth token given from client and return the node name or an error.
|
/// Ingest an auth token given from client and return the node name or an error.
|
||||||
pub fn verify_auth_token(auth_token: &str, jwt_secret: &[u8]) -> Result<String, jwt::Error> {
|
pub fn _verify_auth_token(auth_token: &str, jwt_secret: &[u8]) -> Result<String, jwt::Error> {
|
||||||
let Ok(secret) = Hmac::<Sha256>::new_from_slice(jwt_secret) else {
|
let Ok(secret) = Hmac::<Sha256>::new_from_slice(jwt_secret) else {
|
||||||
return Err(jwt::Error::Format);
|
return Err(jwt::Error::Format);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
println!("hello\r");
|
||||||
|
|
||||||
let claims: Result<JwtClaims, jwt::Error> = auth_token.verify_with_key(&secret);
|
let claims: Result<JwtClaims, jwt::Error> = auth_token.verify_with_key(&secret);
|
||||||
|
|
||||||
match claims {
|
match claims {
|
||||||
|
Loading…
Reference in New Issue
Block a user