diff --git a/Cargo.lock b/Cargo.lock index 7b02bfbd..e8d96dd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3759,6 +3759,28 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "rmp" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9860a6cc38ed1da53456442089b4dfa35e7cedaa326df63017af88385e6b20" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rmp-serde" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + [[package]] name = "route-recognizer" version = "0.3.1" @@ -5056,6 +5078,7 @@ dependencies = [ "rand", "reqwest", "ring", + "rmp-serde", "route-recognizer", "rsa", "rusoto_core", diff --git a/Cargo.toml b/Cargo.toml index 5686b52d..a9d2f704 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ public-ip = "0.2.2" rand = "0.8.4" reqwest = "0.11.18" ring = "0.16.20" +rmp-serde = "1.1.2" route-recognizer = "0.3.1" rsa = "0.9" rusoto_core = "0.48.0" diff --git a/modules/qns_indexer/Cargo.lock b/modules/qns_indexer/Cargo.lock index 1e1724b7..dac49915 100644 --- a/modules/qns_indexer/Cargo.lock +++ b/modules/qns_indexer/Cargo.lock @@ -453,6 +453,7 @@ dependencies = [ "bincode", "cargo-component-bindings", "hex", + "rmp-serde", "serde", "serde_json", "thiserror", @@ -528,6 +529,28 @@ version = "0.6.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" +[[package]] +name = "rmp" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9860a6cc38ed1da53456442089b4dfa35e7cedaa326df63017af88385e6b20" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rmp-serde" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + [[package]] name = "ruint" version = "1.10.1" diff --git a/modules/qns_indexer/Cargo.toml b/modules/qns_indexer/Cargo.toml index f9dbd05a..88206382 100644 --- a/modules/qns_indexer/Cargo.toml +++ b/modules/qns_indexer/Cargo.toml @@ -12,6 +12,7 @@ lto = true [dependencies] cargo-component-bindings = { git = "https://github.com/bytecodealliance/cargo-component" } +rmp-serde = "1.1.2" serde_json = "1.0" serde = {version = "1.0", features = ["derive"] } wit-bindgen = { version = "0.11.0", default_features = false } diff --git a/modules/qns_indexer/src/lib.rs b/modules/qns_indexer/src/lib.rs index 66265913..d9a14a11 100644 --- a/modules/qns_indexer/src/lib.rs +++ b/modules/qns_indexer/src/lib.rs @@ -129,7 +129,7 @@ impl UqProcess for Component { inherit: false, expects_response: None, metadata: None, - ipc: serde_json::to_vec(&NetActions::QnsBatchUpdate( + ipc: rmp_serde::to_vec(&NetActions::QnsBatchUpdate( state.nodes.values().cloned().collect::>(), )) .unwrap(), @@ -326,7 +326,7 @@ impl UqProcess for Component { inherit: false, expects_response: None, metadata: None, - ipc: serde_json::to_vec(&NetActions::QnsUpdate(update.clone())) + ipc: rmp_serde::to_vec(&NetActions::QnsUpdate(update.clone())) .unwrap(), }, None, diff --git a/src/net/mod.rs b/src/net/mod.rs index 36563344..436fe75f 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -708,7 +708,7 @@ async fn recv_connection_via_router( let (mut write_stream, mut read_stream) = websocket.split(); // before beginning XX handshake pattern, send a routing request - let message = bincode::serialize(&RoutingRequest { + let message = rmp_serde::to_vec(&RoutingRequest { source: our.name.clone(), signature: keypair .sign([their_name, router.name.as_str()].concat().as_bytes()) @@ -810,7 +810,7 @@ async fn init_connection( // if this is a routed request, before starting XX handshake pattern, send a // routing request message over socket if use_router.is_some() { - let message = bincode::serialize(&RoutingRequest { + let message = rmp_serde::to_vec(&RoutingRequest { source: our.name.clone(), signature: keypair .sign( @@ -890,8 +890,8 @@ async fn handle_local_message( Message::Response((response, _context)) => { // these are received as a router, when we send ConnectionRequests // to a node we do routing for. - match serde_json::from_slice::(&response.ipc)? { - NetResponses::Attempting(_) => { + match rmp_serde::from_slice::(&response.ipc)? { + NetResponses::Accepted(_) => { // TODO anything here? } NetResponses::Rejected(to) => { @@ -907,7 +907,7 @@ async fn handle_local_message( }; if km.source.node != our.name { - if let Ok(act) = serde_json::from_slice::(&ipc) { + if let Ok(act) = rmp_serde::from_slice::(&ipc) { match act { NetActions::QnsBatchUpdate(_) | NetActions::QnsUpdate(_) => { // for now, we don't get these from remote. @@ -949,6 +949,27 @@ async fn handle_local_message( kernel_message_tx.clone(), print_tx.clone(), )); + kernel_message_tx + .send(KernelMessage { + id: km.id, + source: Address { + node: our.name.clone(), + process: ProcessId::from_str("net:sys:uqbar").unwrap(), + }, + target: km.rsvp.unwrap_or(km.source), + rsvp: None, + message: Message::Response(( + Response { + inherit: false, + ipc: rmp_serde::to_vec(&NetResponses::Accepted(from))?, + metadata: None, + }, + None, + )), + payload: None, + signed_capabilities: None, + }) + .await?; } else { kernel_message_tx .send(KernelMessage { @@ -962,7 +983,7 @@ async fn handle_local_message( message: Message::Response(( Response { inherit: false, - ipc: serde_json::to_vec(&NetResponses::Rejected(from))?, + ipc: rmp_serde::to_vec(&NetResponses::Rejected(from))?, metadata: None, }, None, diff --git a/src/net/types.rs b/src/net/types.rs index 6a7737c1..bce52e09 100644 --- a/src/net/types.rs +++ b/src/net/types.rs @@ -99,7 +99,7 @@ pub enum NetActions { /// For now, only sent in response to a ConnectionRequest #[derive(Clone, Debug, Serialize, Deserialize)] pub enum NetResponses { - Attempting(NodeId), + Accepted(NodeId), Rejected(NodeId), } diff --git a/src/net/utils.rs b/src/net/utils.rs index 25c69f52..8f33ffa0 100644 --- a/src/net/utils.rs +++ b/src/net/utils.rs @@ -68,7 +68,7 @@ pub async fn create_passthrough( message: Message::Request(Request { inherit: false, expects_response: Some(5), - ipc: serde_json::to_vec(&NetActions::ConnectionRequest(from_id.name.clone()))?, + ipc: rmp_serde::to_vec(&NetActions::ConnectionRequest(from_id.name.clone()))?, metadata: None, }), payload: None, @@ -111,7 +111,7 @@ pub fn validate_routing_request( pki: &OnchainPKI, ) -> Result<(Identity, NodeId)> { println!("validate_routing_request\r"); - let routing_request: RoutingRequest = bincode::deserialize(buf)?; + let routing_request: RoutingRequest = rmp_serde::from_slice(buf)?; println!("routing request: {:?}\r", routing_request); let their_id = pki .get(&routing_request.source) @@ -149,7 +149,7 @@ pub fn validate_handshake( } pub async fn send_uqbar_message(km: &KernelMessage, conn: &mut PeerConnection) -> Result<()> { - let serialized = bincode::serialize(km)?; + let serialized = rmp_serde::to_vec(km)?; if serialized.len() > MESSAGE_MAX_SIZE as usize { return Err(anyhow!("message too large")); } @@ -186,7 +186,7 @@ pub async fn recv_uqbar_message(conn: &mut PeerConnection) -> Result Result<()> { println!("send_uqbar_handshake\r"); - let our_hs = bincode::serialize(&HandshakePayload { + let our_hs = rmp_serde::to_vec(&HandshakePayload { name: our.name.clone(), signature: keypair.sign(noise_static_key).as_ref().to_vec(), protocol_version: 1, @@ -226,7 +226,7 @@ pub async fn recv_uqbar_handshake( // 2. a signature by their published networking key that signs the // static key they will be using for this handshake // 3. the version number of the networking protocol (so we can upgrade it) - Ok(bincode::deserialize(&buf[..len])?) + Ok(rmp_serde::from_slice(&buf[..len])?) } pub async fn ws_recv(