chess working with new websockets!

This commit is contained in:
dr-frmr 2023-12-18 20:24:19 -05:00
parent 45f508a4d9
commit 3775162876
No known key found for this signature in database
7 changed files with 87 additions and 53 deletions

View File

@ -632,7 +632,7 @@ checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]]
name = "uqbar_process_lib"
version = "0.4.0"
source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?rev=1ce0d41#1ce0d412169e795c2a99464563b42ae2a2628d77"
source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?rev=65e07e4#65e07e49553a5fa3340ff7c3d9cf3340a53610c8"
dependencies = [
"anyhow",
"bincode",

View File

@ -16,7 +16,7 @@ pleco = "0.5"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
url = "*"
uqbar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "1ce0d41" }
uqbar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "65e07e4" }
wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "efcc759" }
[lib]

File diff suppressed because one or more lines are too long

View File

@ -147,6 +147,8 @@ fn initialize(our: Address) {
)
.unwrap();
http::bind_http_path("/games", true, false).unwrap();
// Allow websockets to be opened at / (our process ID will be prepended).
http::bind_ws_path("/", true, false).unwrap();
// Grab our state, then enter the main event loop.
let mut state: ChessState = load_chess_state();
@ -178,7 +180,7 @@ fn handle_request(our: &Address, message: &Message, state: &mut ChessState) -> a
// chess protocol request, we *await* its response in-place. This is appropriate
// for direct node<>node comms, less appropriate for other circumstances...
if !message.is_request() {
return Err(anyhow::anyhow!("message was response"));
return Ok(());
}
// If the request is from another node, handle it as an incoming request.
// Note that we can enforce the ProcessId as well, but it shouldn't be a trusted
@ -221,8 +223,10 @@ fn handle_request(our: &Address, message: &Message, state: &mut ChessState) -> a
}
}
}
http::HttpServerRequest::WebSocketOpen(channel_id) => {
// client frontend opened a websocket
http::HttpServerRequest::WebSocketOpen { path, channel_id } => {
// We know this is authenticated and unencrypted because we only
// bound one path, the root path. So we know that client
// frontend opened a websocket and can send updates
state.clients.insert(channel_id);
Ok(())
}
@ -295,7 +299,7 @@ fn handle_chess_request(
// If we don't have a game with them, reject the move.
return Response::new()
.ipc(serde_json::to_vec(&ChessResponse::MoveRejected)?)
.send()
.send();
};
// Convert the saved board to one we can manipulate.
let mut board = Board::from_fen(&game.board).unwrap();
@ -359,9 +363,12 @@ fn handle_local_request(
let Ok(Message::Response { ref ipc, .. }) = Request::new()
.target((game_id.as_ref(), our.process.clone()))
.ipc(serde_json::to_vec(&action)?)
.send_and_await_response(5)? else {
return Err(anyhow::anyhow!("other player did not respond properly to new game request"))
};
.send_and_await_response(5)?
else {
return Err(anyhow::anyhow!(
"other player did not respond properly to new game request"
));
};
// If they accept, create a new game -- otherwise, error out.
if serde_json::from_slice::<ChessResponse>(ipc)? != ChessResponse::NewGameAccepted {
return Err(anyhow::anyhow!("other player rejected new game request!"));
@ -402,9 +409,12 @@ fn handle_local_request(
let Ok(Message::Response { ref ipc, .. }) = Request::new()
.target((game_id.as_ref(), our.process.clone()))
.ipc(serde_json::to_vec(&action)?)
.send_and_await_response(5)? else {
return Err(anyhow::anyhow!("other player did not respond properly to our move"))
};
.send_and_await_response(5)?
else {
return Err(anyhow::anyhow!(
"other player did not respond properly to our move"
));
};
if serde_json::from_slice::<ChessResponse>(ipc)? != ChessResponse::MoveAccepted {
return Err(anyhow::anyhow!("other player rejected our move"));
}
@ -487,9 +497,12 @@ fn handle_http_request(
white: player_white.clone(),
black: player_black.clone(),
})?)
.send_and_await_response(5)? else {
return Err(anyhow::anyhow!("other player did not respond properly to new game request"))
};
.send_and_await_response(5)?
else {
return Err(anyhow::anyhow!(
"other player did not respond properly to new game request"
));
};
// if they accept, create a new game
// otherwise, should surface error to FE...
if serde_json::from_slice::<ChessResponse>(msg.ipc())? != ChessResponse::NewGameAccepted
@ -553,9 +566,12 @@ fn handle_http_request(
game_id: game_id.to_string(),
move_str: move_str.to_string(),
})?)
.send_and_await_response(5)? else {
return Err(anyhow::anyhow!("other player did not respond properly to our move"))
};
.send_and_await_response(5)?
else {
return Err(anyhow::anyhow!(
"other player did not respond properly to our move"
));
};
if serde_json::from_slice::<ChessResponse>(msg.ipc())? != ChessResponse::MoveAccepted {
return Err(anyhow::anyhow!("other player rejected our move"));
}

View File

@ -299,10 +299,7 @@ async fn ws_handler(
let Some(auth_token) = serialized_headers.get("cookie") else {
return Err(warp::reject::not_found());
};
let Ok(our_name) = verify_auth_token(&auth_token, &jwt_secret_bytes) else {
return Err(warp::reject::not_found());
};
if our_name != *our {
if !auth_cookie_valid(&our, &auth_token, &jwt_secret_bytes) {
return Err(warp::reject::not_found());
}
}
@ -313,6 +310,13 @@ async fn ws_handler(
ws,
our.clone(),
app,
// remove process id from beginning of path by splitting into segments
// separated by "/" and taking all but the first
original_path
.split('/')
.skip(1)
.collect::<Vec<&str>>()
.join("/"),
jwt_secret_bytes.clone(),
ws_senders.clone(),
send_to_loop.clone(),
@ -595,6 +599,7 @@ async fn maintain_websocket(
ws: WebSocket,
our: Arc<String>,
app: ProcessId,
path: String,
_jwt_secret_bytes: Arc<Vec<u8>>, // TODO use for encrypted channels
ws_senders: WebSocketSenders,
send_to_loop: MessageSender,
@ -608,9 +613,9 @@ async fn maintain_websocket(
})
.await;
let ws_channel_id: u32 = rand::random();
let channel_id: u32 = rand::random();
let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100);
ws_senders.insert(ws_channel_id, (app.clone(), ws_sender));
ws_senders.insert(channel_id, (app.clone(), ws_sender));
let _ = send_to_loop
.send(KernelMessage {
@ -627,7 +632,8 @@ async fn maintain_websocket(
message: Message::Request(Request {
inherit: false,
expects_response: None,
ipc: serde_json::to_vec(&HttpServerRequest::WebSocketOpen(ws_channel_id)).unwrap(),
ipc: serde_json::to_vec(&HttpServerRequest::WebSocketOpen { path, channel_id })
.unwrap(),
metadata: Some("ws".into()),
}),
payload: None,
@ -637,7 +643,7 @@ async fn maintain_websocket(
let _ = print_tx.send(Printout {
verbosity: 1,
content: format!("websocket channel {ws_channel_id} opened"),
content: format!("websocket channel {channel_id} opened"),
});
loop {
tokio::select! {
@ -659,7 +665,7 @@ async fn maintain_websocket(
inherit: false,
expects_response: None,
ipc: serde_json::to_vec(&HttpServerRequest::WebSocketPush {
channel_id: ws_channel_id,
channel_id,
message_type: WsMessageType::Binary,
}).unwrap(),
metadata: Some("ws".into()),
@ -672,7 +678,7 @@ async fn maintain_websocket(
});
}
_ => {
websocket_close(ws_channel_id, app.clone(), &ws_senders, &send_to_loop).await;
websocket_close(channel_id, app.clone(), &ws_senders, &send_to_loop).await;
break;
}
}
@ -681,7 +687,7 @@ async fn maintain_websocket(
match write_stream.send(outgoing).await {
Ok(()) => continue,
Err(_) => {
websocket_close(ws_channel_id, app.clone(), &ws_senders, &send_to_loop).await;
websocket_close(channel_id, app.clone(), &ws_senders, &send_to_loop).await;
break;
}
}
@ -900,10 +906,15 @@ async fn handle_app_message(
send_action_response(km.id, km.source, &send_to_loop, Ok(())).await;
}
HttpServerAction::WebSocketBind {
path,
mut path,
authenticated,
encrypted,
} => {
path = if path.starts_with('/') {
format!("/{}{}", km.source.process, path)
} else {
format!("/{}/{}", km.source.process, path)
};
let mut ws_path_bindings = ws_path_bindings.write().await;
ws_path_bindings.add(
&normalize_path(&path),
@ -914,8 +925,17 @@ async fn handle_app_message(
encrypted,
},
);
send_action_response(km.id, km.source, &send_to_loop, Ok(())).await;
}
HttpServerAction::WebSocketSecureBind { path, encrypted } => {
HttpServerAction::WebSocketSecureBind {
mut path,
encrypted,
} => {
path = if path.starts_with('/') {
format!("/{}{}", km.source.process, path)
} else {
format!("/{}/{}", km.source.process, path)
};
let process_id_hash =
format!("{:x}", Sha256::digest(km.source.process.to_string()));
let subdomain = process_id_hash.split_at(32).0.to_owned();
@ -929,6 +949,7 @@ async fn handle_app_message(
encrypted,
},
);
send_action_response(km.id, km.source, &send_to_loop, Ok(())).await;
}
HttpServerAction::WebSocketOpen { .. } => {
// we cannot receive these, only send them to processes

View File

@ -11,7 +11,7 @@ pub enum HttpServerRequest {
/// Processes will receive this kind of request when a client connects to them.
/// If a process does not want this websocket open, they should issue a *request*
/// containing a [`type@HttpServerAction::WebSocketClose`] message and this channel ID.
WebSocketOpen(u32),
WebSocketOpen { path: String, channel_id: u32 },
/// Processes can both SEND and RECEIVE this kind of request
/// (send as [`type@HttpServerAction::WebSocketPush`]).
/// When received, will contain the message bytes as payload.
@ -126,12 +126,7 @@ pub enum HttpServerAction {
/// Processes will RECEIVE this kind of request when a client connects to them.
/// If a process does not want this websocket open, they should issue a *request*
/// containing a [`type@HttpServerAction::WebSocketClose`] message and this channel ID.
WebSocketOpen {
path: String,
channel_id: u32,
authenticated: bool,
encrypted: bool,
},
WebSocketOpen { path: String, channel_id: u32 },
/// When sent, expects a payload containing the WebSocket message bytes to send.
WebSocketPush {
channel_id: u32,

View File

@ -21,11 +21,13 @@ pub struct RpcMessage {
}
/// Ingest an auth token given from client and return the node name or an error.
pub fn verify_auth_token(auth_token: &str, jwt_secret: &[u8]) -> Result<String, jwt::Error> {
pub fn _verify_auth_token(auth_token: &str, jwt_secret: &[u8]) -> Result<String, jwt::Error> {
let Ok(secret) = Hmac::<Sha256>::new_from_slice(jwt_secret) else {
return Err(jwt::Error::Format);
};
println!("hello\r");
let claims: Result<JwtClaims, jwt::Error> = auth_token.verify_with_key(&secret);
match claims {