switch networking protocol to messagepack!

This commit is contained in:
dr-frmr 2023-10-31 16:42:46 -04:00
parent 2ab3bf4e4b
commit 64ced6e659
No known key found for this signature in database
8 changed files with 84 additions and 15 deletions

23
Cargo.lock generated
View File

@ -3759,6 +3759,28 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "rmp"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f9860a6cc38ed1da53456442089b4dfa35e7cedaa326df63017af88385e6b20"
dependencies = [
"byteorder",
"num-traits",
"paste",
]
[[package]]
name = "rmp-serde"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a"
dependencies = [
"byteorder",
"rmp",
"serde",
]
[[package]]
name = "route-recognizer"
version = "0.3.1"
@ -5056,6 +5078,7 @@ dependencies = [
"rand",
"reqwest",
"ring",
"rmp-serde",
"route-recognizer",
"rsa",
"rusoto_core",

View File

@ -48,6 +48,7 @@ public-ip = "0.2.2"
rand = "0.8.4"
reqwest = "0.11.18"
ring = "0.16.20"
rmp-serde = "1.1.2"
route-recognizer = "0.3.1"
rsa = "0.9"
rusoto_core = "0.48.0"

View File

@ -453,6 +453,7 @@ dependencies = [
"bincode",
"cargo-component-bindings",
"hex",
"rmp-serde",
"serde",
"serde_json",
"thiserror",
@ -528,6 +529,28 @@ version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "rmp"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f9860a6cc38ed1da53456442089b4dfa35e7cedaa326df63017af88385e6b20"
dependencies = [
"byteorder",
"num-traits",
"paste",
]
[[package]]
name = "rmp-serde"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a"
dependencies = [
"byteorder",
"rmp",
"serde",
]
[[package]]
name = "ruint"
version = "1.10.1"

View File

@ -12,6 +12,7 @@ lto = true
[dependencies]
cargo-component-bindings = { git = "https://github.com/bytecodealliance/cargo-component" }
rmp-serde = "1.1.2"
serde_json = "1.0"
serde = {version = "1.0", features = ["derive"] }
wit-bindgen = { version = "0.11.0", default_features = false }

View File

@ -129,7 +129,7 @@ impl UqProcess for Component {
inherit: false,
expects_response: None,
metadata: None,
ipc: serde_json::to_vec(&NetActions::QnsBatchUpdate(
ipc: rmp_serde::to_vec(&NetActions::QnsBatchUpdate(
state.nodes.values().cloned().collect::<Vec<_>>(),
))
.unwrap(),
@ -326,7 +326,7 @@ impl UqProcess for Component {
inherit: false,
expects_response: None,
metadata: None,
ipc: serde_json::to_vec(&NetActions::QnsUpdate(update.clone()))
ipc: rmp_serde::to_vec(&NetActions::QnsUpdate(update.clone()))
.unwrap(),
},
None,

View File

@ -708,7 +708,7 @@ async fn recv_connection_via_router(
let (mut write_stream, mut read_stream) = websocket.split();
// before beginning XX handshake pattern, send a routing request
let message = bincode::serialize(&RoutingRequest {
let message = rmp_serde::to_vec(&RoutingRequest {
source: our.name.clone(),
signature: keypair
.sign([their_name, router.name.as_str()].concat().as_bytes())
@ -810,7 +810,7 @@ async fn init_connection(
// if this is a routed request, before starting XX handshake pattern, send a
// routing request message over socket
if use_router.is_some() {
let message = bincode::serialize(&RoutingRequest {
let message = rmp_serde::to_vec(&RoutingRequest {
source: our.name.clone(),
signature: keypair
.sign(
@ -890,8 +890,8 @@ async fn handle_local_message(
Message::Response((response, _context)) => {
// these are received as a router, when we send ConnectionRequests
// to a node we do routing for.
match serde_json::from_slice::<NetResponses>(&response.ipc)? {
NetResponses::Attempting(_) => {
match rmp_serde::from_slice::<NetResponses>(&response.ipc)? {
NetResponses::Accepted(_) => {
// TODO anything here?
}
NetResponses::Rejected(to) => {
@ -907,7 +907,7 @@ async fn handle_local_message(
};
if km.source.node != our.name {
if let Ok(act) = serde_json::from_slice::<NetActions>(&ipc) {
if let Ok(act) = rmp_serde::from_slice::<NetActions>(&ipc) {
match act {
NetActions::QnsBatchUpdate(_) | NetActions::QnsUpdate(_) => {
// for now, we don't get these from remote.
@ -949,6 +949,27 @@ async fn handle_local_message(
kernel_message_tx.clone(),
print_tx.clone(),
));
kernel_message_tx
.send(KernelMessage {
id: km.id,
source: Address {
node: our.name.clone(),
process: ProcessId::from_str("net:sys:uqbar").unwrap(),
},
target: km.rsvp.unwrap_or(km.source),
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: rmp_serde::to_vec(&NetResponses::Accepted(from))?,
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
})
.await?;
} else {
kernel_message_tx
.send(KernelMessage {
@ -962,7 +983,7 @@ async fn handle_local_message(
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::to_vec(&NetResponses::Rejected(from))?,
ipc: rmp_serde::to_vec(&NetResponses::Rejected(from))?,
metadata: None,
},
None,

View File

@ -99,7 +99,7 @@ pub enum NetActions {
/// For now, only sent in response to a ConnectionRequest
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NetResponses {
Attempting(NodeId),
Accepted(NodeId),
Rejected(NodeId),
}

View File

@ -68,7 +68,7 @@ pub async fn create_passthrough(
message: Message::Request(Request {
inherit: false,
expects_response: Some(5),
ipc: serde_json::to_vec(&NetActions::ConnectionRequest(from_id.name.clone()))?,
ipc: rmp_serde::to_vec(&NetActions::ConnectionRequest(from_id.name.clone()))?,
metadata: None,
}),
payload: None,
@ -111,7 +111,7 @@ pub fn validate_routing_request(
pki: &OnchainPKI,
) -> Result<(Identity, NodeId)> {
println!("validate_routing_request\r");
let routing_request: RoutingRequest = bincode::deserialize(buf)?;
let routing_request: RoutingRequest = rmp_serde::from_slice(buf)?;
println!("routing request: {:?}\r", routing_request);
let their_id = pki
.get(&routing_request.source)
@ -149,7 +149,7 @@ pub fn validate_handshake(
}
pub async fn send_uqbar_message(km: &KernelMessage, conn: &mut PeerConnection) -> Result<()> {
let serialized = bincode::serialize(km)?;
let serialized = rmp_serde::to_vec(km)?;
if serialized.len() > MESSAGE_MAX_SIZE as usize {
return Err(anyhow!("message too large"));
}
@ -186,7 +186,7 @@ pub async fn recv_uqbar_message(conn: &mut PeerConnection) -> Result<KernelMessa
msg.extend_from_slice(&conn.buf[..len]);
}
Ok(bincode::deserialize(&msg)?)
Ok(rmp_serde::from_slice(&msg)?)
}
pub async fn send_uqbar_handshake(
@ -199,7 +199,7 @@ pub async fn send_uqbar_handshake(
proxy_request: bool,
) -> Result<()> {
println!("send_uqbar_handshake\r");
let our_hs = bincode::serialize(&HandshakePayload {
let our_hs = rmp_serde::to_vec(&HandshakePayload {
name: our.name.clone(),
signature: keypair.sign(noise_static_key).as_ref().to_vec(),
protocol_version: 1,
@ -226,7 +226,7 @@ pub async fn recv_uqbar_handshake(
// 2. a signature by their published networking key that signs the
// static key they will be using for this handshake
// 3. the version number of the networking protocol (so we can upgrade it)
Ok(bincode::deserialize(&buf[..len])?)
Ok(rmp_serde::from_slice(&buf[..len])?)
}
pub async fn ws_recv(