diff --git a/Cargo.lock b/Cargo.lock index c9e8c7fe..576a4b35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,6 +36,15 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aead" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b613b8e1e3cf911a086f53f03bf286f52fd7a7258e4fa606f0ef220d39d8877" +dependencies = [ + "generic-array", +] + [[package]] name = "aead" version = "0.5.2" @@ -46,6 +55,18 @@ dependencies = [ "generic-array", ] +[[package]] +name = "aes" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8" +dependencies = [ + "cfg-if", + "cipher 0.3.0", + "cpufeatures", + "opaque-debug", +] + [[package]] name = "aes" version = "0.8.3" @@ -53,21 +74,35 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac1f845298e95f983ff1944b728ae08b8cebab80d684f0a832ed0fc74dfa27e2" dependencies = [ "cfg-if", - "cipher", + "cipher 0.4.4", "cpufeatures", ] +[[package]] +name = "aes-gcm" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df5f85a83a7d8b0442b6aa7b504b8212c1733da07b98aae43d4bc21b2cb3cdf6" +dependencies = [ + "aead 0.4.3", + "aes 0.7.5", + "cipher 0.3.0", + "ctr 0.8.0", + "ghash 0.4.4", + "subtle", +] + [[package]] name = "aes-gcm" version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "209b47e8954a928e1d72e86eca7000ebb6655fe1436d33eefc2201cad027e237" dependencies = [ - "aead", - "aes", - "cipher", - "ctr", - "ghash", + "aead 0.5.2", + "aes 0.8.3", + "cipher 0.4.4", + "ctr 0.9.2", + "ghash 0.5.0", "subtle", ] @@ -154,17 +189,6 @@ dependencies = [ "term", ] -[[package]] -name = "async-recursion" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.32", -] - [[package]] name = "async-trait" version = "0.1.73" @@ -298,6 +322,15 @@ dependencies = [ "wyz", ] +[[package]] +name = "blake2" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" +dependencies = [ + "digest 0.10.7", +] + [[package]] name = "blake3" version = "1.4.1" @@ -499,6 +532,18 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chacha20" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c80e5460aa66fe3b91d40bcbdab953a597b60053e34d684ac6903f863b680a6" +dependencies = [ + "cfg-if", + "cipher 0.3.0", + "cpufeatures", + "zeroize", +] + [[package]] name = "chacha20" version = "0.9.1" @@ -506,20 +551,33 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3613f74bd2eac03dad61bd53dbe620703d4371614fe0bc3b9f04dd36fe4e818" dependencies = [ "cfg-if", - "cipher", + "cipher 0.4.4", "cpufeatures", ] +[[package]] +name = "chacha20poly1305" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a18446b09be63d457bbec447509e85f662f32952b035ce892290396bc0b0cff5" +dependencies = [ + "aead 0.4.3", + "chacha20 0.8.2", + "cipher 0.3.0", + "poly1305 0.7.2", + "zeroize", +] + [[package]] name = "chacha20poly1305" version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "10cd79432192d1c0f4e1a0fef9527696cc039165d729fb41b3f4f4f354c2dc35" dependencies = [ - "aead", - "chacha20", - "cipher", - "poly1305", + "aead 0.5.2", + "chacha20 0.9.1", + "cipher 0.4.4", + "poly1305 0.8.0", "zeroize", ] @@ -538,6 +596,15 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "cipher" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7" +dependencies = [ + "generic-array", +] + [[package]] name = "cipher" version = "0.4.4" @@ -549,17 +616,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "cita_trie" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8c3d2abadaa28e0d277f9f6d07a2052544f045d929cd4d6f7bcfb43567c9767" -dependencies = [ - "hasher", - "parking_lot", - "rlp", -] - [[package]] name = "coins-bip32" version = "0.8.7" @@ -901,13 +957,49 @@ dependencies = [ "subtle", ] +[[package]] +name = "ctr" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "049bb91fb4aaf0e3c7efa6cd5ef877dbbbd15b39dad06d9948de4ec8a75761ea" +dependencies = [ + "cipher 0.3.0", +] + [[package]] name = "ctr" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" dependencies = [ - "cipher", + "cipher 0.4.4", +] + +[[package]] +name = "curve25519-dalek" +version = "4.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89b8c6a2e4b1f45971ad09761aafb85514a84744b67a95e32c3cc1352d1f65c" +dependencies = [ + "cfg-if", + "cpufeatures", + "curve25519-dalek-derive", + "fiat-crypto", + "platforms", + "rustc_version", + "subtle", + "zeroize", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fdaf97f4804dcebfa5862639bc9ce4121e82140bec2a987ac5140294865b5b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.32", ] [[package]] @@ -1277,8 +1369,8 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fda3bf123be441da5260717e0661c25a2fd9cb2b2c1d20bf2e05580047158ab" dependencies = [ - "aes", - "ctr", + "aes 0.8.3", + "ctr 0.9.2", "digest 0.10.7", "hex", "hmac 0.12.1", @@ -1632,6 +1724,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "fiat-crypto" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a481586acf778f1b1455424c343f71124b048ffa5f4fc3f8f6ae9dc432dcb3c7" + [[package]] name = "file-per-thread-logger" version = "0.2.0" @@ -1880,6 +1978,16 @@ dependencies = [ "wasi", ] +[[package]] +name = "ghash" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1583cc1656d7839fd3732b80cf4f38850336cdb9b8ded1cd399ca62958de3c99" +dependencies = [ + "opaque-debug", + "polyval 0.5.3", +] + [[package]] name = "ghash" version = "0.5.0" @@ -1887,7 +1995,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d930750de5717d2dd0b8c0d42c076c0e884c81a73e6cab859bbd2339c71e3e40" dependencies = [ "opaque-debug", - "polyval", + "polyval 0.6.1", ] [[package]] @@ -1976,15 +2084,6 @@ version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" -[[package]] -name = "hasher" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbbba678b6567f27ce22870d951f4208e1dc2fef64993bd4521b1d497ef8a3aa" -dependencies = [ - "tiny-keccak", -] - [[package]] name = "hashers" version = "1.0.1" @@ -3205,6 +3304,23 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "platforms" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4503fa043bf02cee09a9582e9554b4c6403b2ef55e4612e96561d294419429f8" + +[[package]] +name = "poly1305" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "048aeb476be11a4b6ca432ca569e375810de9294ae78f4774e78ea98a9246ede" +dependencies = [ + "cpufeatures", + "opaque-debug", + "universal-hash 0.4.1", +] + [[package]] name = "poly1305" version = "0.8.0" @@ -3213,7 +3329,19 @@ checksum = "8159bd90725d2df49889a078b54f4f79e87f1f8a8444194cdca81d38f5393abf" dependencies = [ "cpufeatures", "opaque-debug", - "universal-hash", + "universal-hash 0.5.1", +] + +[[package]] +name = "polyval" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8419d2b623c7c0896ff2d5d96e2cb4ede590fed28fcc34934f4c33c036e620a1" +dependencies = [ + "cfg-if", + "cpufeatures", + "opaque-debug", + "universal-hash 0.4.1", ] [[package]] @@ -3225,7 +3353,7 @@ dependencies = [ "cfg-if", "cpufeatures", "opaque-debug", - "universal-hash", + "universal-hash 0.5.1", ] [[package]] @@ -3600,6 +3728,28 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "rmp" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9860a6cc38ed1da53456442089b4dfa35e7cedaa326df63017af88385e6b20" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rmp-serde" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + [[package]] name = "route-recognizer" version = "0.3.1" @@ -3825,7 +3975,7 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213" dependencies = [ - "cipher", + "cipher 0.4.4", ] [[package]] @@ -4173,6 +4323,23 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +[[package]] +name = "snow" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c9d1425eb528a21de2755c75af4c9b5d57f50a0d4c3b7f1828a4cd03f8ba155" +dependencies = [ + "aes-gcm 0.9.4", + "blake2", + "chacha20poly1305 0.9.1", + "curve25519-dalek", + "rand_core", + "ring", + "rustc_version", + "sha2 0.10.7", + "subtle", +] + [[package]] name = "socket2" version = "0.4.9" @@ -4814,6 +4981,16 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +[[package]] +name = "universal-hash" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f214e8f697e925001e66ec2c6e37a4ef93f0f78c2eed7814394e10c62025b05" +dependencies = [ + "generic-array", + "subtle", +] + [[package]] name = "universal-hash" version = "0.5.1" @@ -4834,18 +5011,16 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" name = "uqbar" version = "0.1.0" dependencies = [ - "aes-gcm", + "aes-gcm 0.10.2", "anyhow", - "async-recursion", "async-trait", "base64 0.13.1", "bincode", "blake3", "bytes", "cap-std", - "chacha20poly1305", + "chacha20poly1305 0.10.1", "chrono", - "cita_trie", "crossterm", "digest 0.10.7", "dotenv", @@ -4856,7 +5031,6 @@ dependencies = [ "futures", "generic-array", "getrandom", - "hasher", "hex", "hkdf", "hmac 0.12.1", @@ -4870,6 +5044,7 @@ dependencies = [ "rand", "reqwest", "ring", + "rmp-serde", "route-recognizer", "rsa", "rusoto_core", @@ -4879,6 +5054,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "sha2 0.10.7", + "snow", "thiserror", "tokio", "tokio-tungstenite 0.20.0", @@ -5808,7 +5984,7 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "760394e246e4c28189f19d488c058bf16f564016aefac5d32bb1f3b51d5e9261" dependencies = [ - "aes", + "aes 0.8.3", "byteorder", "bzip2", "constant_time_eq 0.1.5", diff --git a/Cargo.toml b/Cargo.toml index 733d3121..de9dc564 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,6 @@ zip = "0.6" [dependencies] aes-gcm = "0.10.2" anyhow = "1.0.71" -async-recursion = "1.0.4" async-trait = "0.1.71" base64 = "0.13" bincode = "1.3.3" @@ -23,7 +22,6 @@ bytes = "1.4.0" cap-std = "2.0.0" chacha20poly1305 = "0.10.1" chrono = "0.4.31" -cita_trie = "4.0.0" crossterm = { version = "0.26.1", features = ["event-stream", "bracketed-paste"] } digest = "0.10" dotenv = "0.15.0" @@ -34,7 +32,6 @@ flate2 = "1.0" futures = "0.3" generic-array = "0.14" getrandom = "0.2.10" -hasher = "*" hex = "0.4.3" hkdf = "0.12.3" hmac = "0.12" @@ -48,6 +45,7 @@ public-ip = "0.2.2" rand = "0.8.4" reqwest = "0.11.18" ring = "0.16.20" +rmp-serde = "1.1.2" route-recognizer = "0.3.1" rsa = "0.9" rusoto_core = "0.48.0" @@ -57,6 +55,7 @@ serde = {version = "1.0", features = ["derive"] } serde_json = "1.0" serde_urlencoded = "0.7" sha2 = "0.10" +snow = { version = "0.9.3", features = ["ring-resolver"] } thiserror = "1.0.43" tokio = { version = "1.28", features = ["fs", "macros", "rt-multi-thread", "sync"] } tokio-tungstenite = "*" diff --git a/README.md b/README.md index 0de1952b..a0b3275a 100644 --- a/README.md +++ b/README.md @@ -63,4 +63,8 @@ On boot you will be prompted to navigate to `localhost:8080`. Make sure your eth ## Example usage -TODO +Download and install an app: +``` +!message our main:app_store:uqbar {"Download": {"package": {"package_name": "", "publisher_node": ""}, "install_from": ""}} +!message our main:app_store:uqbar {"Install": {"package_name": "", "publisher_node": ""}} +``` diff --git a/modules/qns_indexer/Cargo.lock b/modules/qns_indexer/Cargo.lock index 1e1724b7..dac49915 100644 --- a/modules/qns_indexer/Cargo.lock +++ b/modules/qns_indexer/Cargo.lock @@ -453,6 +453,7 @@ dependencies = [ "bincode", "cargo-component-bindings", "hex", + "rmp-serde", "serde", "serde_json", "thiserror", @@ -528,6 +529,28 @@ version = "0.6.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" +[[package]] +name = "rmp" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9860a6cc38ed1da53456442089b4dfa35e7cedaa326df63017af88385e6b20" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rmp-serde" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + [[package]] name = "ruint" version = "1.10.1" diff --git a/modules/qns_indexer/Cargo.toml b/modules/qns_indexer/Cargo.toml index f9dbd05a..88206382 100644 --- a/modules/qns_indexer/Cargo.toml +++ b/modules/qns_indexer/Cargo.toml @@ -12,6 +12,7 @@ lto = true [dependencies] cargo-component-bindings = { git = "https://github.com/bytecodealliance/cargo-component" } +rmp-serde = "1.1.2" serde_json = "1.0" serde = {version = "1.0", features = ["derive"] } wit-bindgen = { version = "0.11.0", default_features = false } diff --git a/modules/qns_indexer/src/lib.rs b/modules/qns_indexer/src/lib.rs index 66265913..d9a14a11 100644 --- a/modules/qns_indexer/src/lib.rs +++ b/modules/qns_indexer/src/lib.rs @@ -129,7 +129,7 @@ impl UqProcess for Component { inherit: false, expects_response: None, metadata: None, - ipc: serde_json::to_vec(&NetActions::QnsBatchUpdate( + ipc: rmp_serde::to_vec(&NetActions::QnsBatchUpdate( state.nodes.values().cloned().collect::>(), )) .unwrap(), @@ -326,7 +326,7 @@ impl UqProcess for Component { inherit: false, expects_response: None, metadata: None, - ipc: serde_json::to_vec(&NetActions::QnsUpdate(update.clone())) + ipc: rmp_serde::to_vec(&NetActions::QnsUpdate(update.clone())) .unwrap(), }, None, diff --git a/src/main.rs b/src/main.rs index 8d101ad9..c2bf5030 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,5 @@ use anyhow::Result; use dotenv; -use ethers::prelude::namehash; use std::env; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; @@ -37,6 +36,11 @@ const CAP_CHANNEL_CAPACITY: usize = 1_000; const VERSION: &str = env!("CARGO_PKG_VERSION"); +/// This can and should be an environment variable / setting. It configures networking +/// such that indirect nodes always use routers, even when target is a direct node, +/// such that only their routers can ever see their physical networking details. +const REVEAL_IP: bool = true; + #[tokio::main] async fn main() { // For use with https://github.com/tokio-rs/console @@ -265,6 +269,7 @@ async fn main() { print_sender.clone(), net_message_sender, net_message_receiver, + REVEAL_IP, )); tasks.spawn(filesystem::fs_sender( our.name.clone(), @@ -322,10 +327,10 @@ async fn main() { "\x1b[38;5;196muh oh, a kernel process crashed: {}\x1b[0m", e ) - // TODO restart the task + // TODO restart the task? } else { format!("what does this mean???") - // TODO restart the task + // TODO restart the task? } } quit = terminal::terminal( diff --git a/src/net/connections.rs b/src/net/connections.rs deleted file mode 100644 index d6b70f85..00000000 --- a/src/net/connections.rs +++ /dev/null @@ -1,599 +0,0 @@ -use crate::net::*; -use chacha20poly1305::{ - aead::{Aead, AeadCore, KeyInit, OsRng}, - XChaCha20Poly1305, XNonce, -}; -use elliptic_curve::ecdh::SharedSecret; -use futures::{SinkExt, StreamExt}; -use ring::signature::Ed25519KeyPair; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::task::JoinHandle; -use tokio_tungstenite::tungstenite; - -pub async fn build_connection( - our: Identity, - keypair: Arc, - pki: OnchainPKI, - keys: PeerKeys, - peers: Peers, - websocket: WebSocket, - kernel_message_tx: MessageSender, - net_message_tx: MessageSender, - network_error_tx: NetworkErrorSender, - with: Option, -) -> ( - UnboundedSender<(NetworkMessage, Option)>, - JoinHandle>, -) { - // println!("building new connection\r"); - let (message_tx, message_rx) = unbounded_channel::<(NetworkMessage, Option)>(); - let handle = tokio::spawn(maintain_connection( - our, - with, - keypair, - pki, - keys, - peers, - websocket, - message_tx.clone(), - message_rx, - kernel_message_tx, - net_message_tx, - network_error_tx, - )); - return (message_tx, handle); -} - -/// Keeps a connection alive and handles sending and receiving of NetworkMessages through it. -/// TODO add a keepalive PING/PONG system -/// TODO kill this after a certain amount of inactivity -pub async fn maintain_connection( - our: Identity, - with: Option, - keypair: Arc, - pki: OnchainPKI, - keys: PeerKeys, - peers: Peers, - websocket: WebSocket, - message_tx: UnboundedSender<(NetworkMessage, Option)>, - mut message_rx: UnboundedReceiver<(NetworkMessage, Option)>, - kernel_message_tx: MessageSender, - net_message_tx: MessageSender, - network_error_tx: NetworkErrorSender, -) -> Option { - // let conn_id: u64 = rand::random(); - // println!("maintaining connection {conn_id}\r"); - - // accept messages on the websocket in one task, and send messages in another - let (mut write_stream, mut read_stream) = websocket.split(); - - let (forwarding_ack_tx, mut forwarding_ack_rx) = unbounded_channel::(); - // manage outstanding ACKs from messages sent over the connection - // TODO replace with more performant data structure - let ack_map = Arc::new(RwLock::new(HashMap::::new())); - let sender_ack_map = ack_map.clone(); - - let last_pong = Arc::new(RwLock::new(tokio::time::Instant::now())); - let ping_last_pong = last_pong.clone(); - let ping_tx = message_tx.clone(); - - // Ping/Pong keepalive task - let ping_task = tokio::spawn(async move { - loop { - tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; - if ping_last_pong.read().await.elapsed() > tokio::time::Duration::from_secs(60) { - break; - } - if let Err(_) = ping_tx.send((NetworkMessage::Ping, None)) { - // Failed to send Ping, kill the connection - break; - } - } - }); - - let forwarder_message_tx = message_tx.clone(); - let ack_forwarder = tokio::spawn(async move { - while let Some(result) = forwarding_ack_rx.recv().await { - match result { - Ok(NetworkMessage::Ack(id)) => { - // println!("net: got forwarding ack for message {}\r", id); - forwarder_message_tx - .send((NetworkMessage::Ack(id), None)) - .unwrap(); - } - Ok(NetworkMessage::Nack(id)) => { - // println!("net: got forwarding nack for message {}\r", id); - forwarder_message_tx - .send((NetworkMessage::Nack(id), None)) - .unwrap(); - } - Ok(NetworkMessage::HandshakeAck(handshake)) => { - // println!( - // "net: got forwarding handshakeAck for message {}\r", - // handshake.id - // ); - forwarder_message_tx - .send((NetworkMessage::HandshakeAck(handshake), None)) - .unwrap(); - } - Err((message_id, _e)) => { - // println!("net: got forwarding error from ack_rx: {:?}\r", e); - // what do we do here? - forwarder_message_tx - .send((NetworkMessage::Nack(message_id), None)) - .unwrap(); - } - _ => { - // println!("net: weird none ack\r"); - } - } - } - }); - - // receive messages from over the websocket and route them to the correct peer handler, - // or create it, if necessary. - let ws_receiver = tokio::spawn(async move { - while let Some(incoming) = read_stream.next().await { - let Ok(tungstenite::Message::Binary(bin)) = incoming else { - if let Ok(tungstenite::Message::Ping(_)) = incoming { - // let _ = write_stream.send(tungstenite::Message::Pong(vec![])).await; - } - continue; - }; - // TODO use a language-netural serialization format here! - let Ok(net_message) = bincode::deserialize::(&bin) else { - // just kill the connection if we get a non-Uqbar message - break; - }; - match net_message { - NetworkMessage::Pong => { - *last_pong.write().await = tokio::time::Instant::now(); - continue; - } - NetworkMessage::Ping => { - // respond with a Pong - let _ = message_tx.send((NetworkMessage::Pong, None)); - continue; - } - NetworkMessage::Ack(id) => { - let Some(result_tx) = ack_map.write().await.remove(&id) else { - // println!("conn {conn_id}: got unexpected Ack {id}\r"); - continue; - }; - // println!("conn {conn_id}: got Ack {id}\r"); - let _ = result_tx.send(Ok(net_message)); - continue; - } - NetworkMessage::Nack(id) => { - let Some(result_tx) = ack_map.write().await.remove(&id) else { - // println!("net: got unexpected Nack\r"); - continue; - }; - let _ = result_tx.send(Ok(net_message)); - continue; - } - NetworkMessage::Msg { - ref id, - ref from, - ref to, - ref contents, - } => { - // println!("conn {conn_id}: handling msg {id}\r"); - // if the message is *directed to us*, try to handle with the - // matching peer handler "decrypter". - // - if to == &our.name { - // if we have the peer, send the message to them. - if let Some(peer) = peers.read().await.get(from) { - let _ = peer - .decrypter - .send((contents.to_owned(), forwarding_ack_tx.clone())); - continue; - } - // if we don't have the peer, see if we have the keys to create them. - // if we don't have their keys, throw a nack. - if let Some((peer_id, secret)) = keys.read().await.get(from) { - let new_peer = create_new_peer( - our.clone(), - peer_id.clone(), - peers.clone(), - keys.clone(), - secret.clone(), - message_tx.clone(), - kernel_message_tx.clone(), - net_message_tx.clone(), - network_error_tx.clone(), - ); - let _ = new_peer - .decrypter - .send((contents.to_owned(), forwarding_ack_tx.clone())); - peers.write().await.insert(peer_id.name.clone(), new_peer); - } else { - // println!("net: nacking message {id}\r"); - message_tx.send((NetworkMessage::Nack(*id), None)).unwrap(); - } - } else { - // if the message is *directed to someone else*, try to handle - // with the matching peer handler "sender". - // - if let Some(peer) = peers.read().await.get(to) { - let id = *id; - let to = to.clone(); - match peer.sender.send(( - PeerMessage::Net(net_message), - Some(forwarding_ack_tx.clone()), - )) { - Ok(_) => {} - Err(_) => { - peers.write().await.remove(&to); - message_tx.send((NetworkMessage::Nack(id), None)).unwrap(); - } - } - } else { - // if we don't have the peer, throw a nack. - // println!("net: nacking message with id {id}\r"); - message_tx.send((NetworkMessage::Nack(*id), None)).unwrap(); - } - } - } - NetworkMessage::Handshake(ref handshake) => { - // when we get a handshake, if we are the target, - // 1. verify it against the PKI - // 2. send a response handshakeAck - // 3. create a Peer and save, replacing old one if it existed - // as long as we are the target, we also get to kill this connection - // if the handshake is invalid, since it must be directly "to" us. - if handshake.target == our.name { - let Some(peer_id) = pki.read().await.get(&handshake.from).cloned() else { - // println!( - // "net: failed handshake with unknown node {}\r", - // handshake.from - // ); - message_tx - .send((NetworkMessage::Nack(handshake.id), None)) - .unwrap(); - break; - }; - let their_ephemeral_pk = match validate_handshake(&handshake, &peer_id) { - Ok(pk) => pk, - Err(e) => { - println!("net: invalid handshake from {}: {}\r", handshake.from, e); - message_tx - .send((NetworkMessage::Nack(handshake.id), None)) - .unwrap(); - break; - } - }; - let (secret, handshake) = make_secret_and_handshake( - &our, - keypair.clone(), - &handshake.from, - Some(handshake.id), - ); - message_tx - .send((NetworkMessage::HandshakeAck(handshake), None)) - .unwrap(); - let secret = Arc::new(secret.diffie_hellman(&their_ephemeral_pk)); - // save the handshake to our Keys map - keys.write() - .await - .insert(peer_id.name.clone(), (peer_id.clone(), secret.clone())); - let new_peer = create_new_peer( - our.clone(), - peer_id.clone(), - peers.clone(), - keys.clone(), - secret, - message_tx.clone(), - kernel_message_tx.clone(), - net_message_tx.clone(), - network_error_tx.clone(), - ); - // we might be replacing an old peer, so we need to remove it first - // we can't rely on the hashmap for this, because the dropped peer - // will trigger a drop of the sender, which will kill the peer_handler - peers.write().await.remove(&peer_id.name); - peers.write().await.insert(peer_id.name.clone(), new_peer); - } else { - // if we are NOT the target, - // try to send it to the matching peer handler "sender" - if let Some(peer) = peers.read().await.get(&handshake.target) { - let _ = peer.sender.send(( - PeerMessage::Net(net_message), - Some(forwarding_ack_tx.clone()), - )); - } else { - // if we don't have the peer, throw a nack. - // println!("net: nacking handshake with id {}\r", handshake.id); - message_tx - .send((NetworkMessage::Nack(handshake.id), None)) - .unwrap(); - } - } - } - NetworkMessage::HandshakeAck(ref handshake) => { - let Some(result_tx) = ack_map.write().await.remove(&handshake.id) else { - continue; - }; - let _ = result_tx.send(Ok(net_message)); - } - } - } - }); - - tokio::select! { - _ = ws_receiver => { - // println!("ws_receiver died\r"); - }, - _ = ack_forwarder => { - // println!("ack_forwarder died\r"); - }, - _ = ping_task => { - // println!("ping_task died\r"); - }, - // receive messages we would like to send to peers along this connection - // and send them to the websocket - _ = async { - while let Some((message, result_tx)) = message_rx.recv().await { - // TODO use a language-netural serialization format here! - if let Ok(bytes) = bincode::serialize::(&message) { - match &message { - NetworkMessage::Msg { id, .. } => { - // println!("conn {conn_id}: piping msg {id}\r"); - sender_ack_map.write().await.insert(*id, result_tx.unwrap()); - } - NetworkMessage::Handshake(h) => { - sender_ack_map.write().await.insert(h.id, result_tx.unwrap()); - } - _ => {} - } - match write_stream.send(tungstenite::Message::Binary(bytes)).await { - Ok(()) => {} - Err(e) => { - // println!("net: send error: {:?}\r", e); - let id = match &message { - NetworkMessage::Msg { id, .. } => id, - NetworkMessage::Handshake(h) => &h.id, - _ => continue, - }; - let Some(result_tx) = sender_ack_map.write().await.remove(&id) else { - continue; - }; - // TODO learn how to handle other non-fatal websocket errors. - match e { - tungstenite::error::Error::Capacity(_) - | tungstenite::Error::Io(_) => { - let _ = result_tx.send(Err((*id, SendErrorKind::Timeout))); - } - _ => { - let _ = result_tx.send(Ok(NetworkMessage::Nack(*id))); - } - } - } - } - } - } - } => { - // println!("ws_sender died\r"); - }, - }; - return with; -} - -/// After a successful handshake, use information to spawn a new `peer_handler` task -/// and save a `Peer` in our peers mapping. Returns a sender to use for sending messages -/// to this peer, which will also be saved in its Peer struct. -pub fn create_new_peer( - our: Identity, - new_peer_id: Identity, - peers: Peers, - keys: PeerKeys, - secret: Arc>, - conn_sender: UnboundedSender<(NetworkMessage, Option)>, - kernel_message_tx: MessageSender, - net_message_tx: MessageSender, - network_error_tx: NetworkErrorSender, -) -> Peer { - let (message_tx, message_rx) = unbounded_channel::<(PeerMessage, Option)>(); - let (decrypter_tx, decrypter_rx) = unbounded_channel::<(Vec, ErrorShuttle)>(); - let peer_id_name = new_peer_id.name.clone(); - let peer_conn_sender = conn_sender.clone(); - tokio::spawn(async move { - match peer_handler( - our, - peer_id_name.clone(), - secret, - message_rx, - decrypter_rx, - peer_conn_sender, - kernel_message_tx, - network_error_tx, - ) - .await - { - None => { - // println!("net: dropping peer handler but not deleting\r"); - } - Some(km) => { - // println!("net: ok actually deleting peer+keys now and retrying send\r"); - peers.write().await.remove(&peer_id_name); - keys.write().await.remove(&peer_id_name); - let _ = net_message_tx.send(km).await; - } - } - }); - return Peer { - identity: new_peer_id, - sender: message_tx, - decrypter: decrypter_tx, - socket_tx: conn_sender, - }; -} - -/// 1. take in messages from a specific peer, decrypt them, and send to kernel -/// 2. take in messages targeted at specific peer and either: -/// - encrypt them, and send to proper connection -/// - forward them untouched along the connection -async fn peer_handler( - our: Identity, - who: String, - secret: Arc>, - mut message_rx: UnboundedReceiver<(PeerMessage, Option)>, - mut decrypter_rx: UnboundedReceiver<(Vec, ErrorShuttle)>, - socket_tx: UnboundedSender<(NetworkMessage, Option)>, - kernel_message_tx: MessageSender, - network_error_tx: NetworkErrorSender, -) -> Option { - // println!("peer_handler\r"); - let mut key = [0u8; 32]; - secret - .extract::(None) - .expand(&[], &mut key) - .unwrap(); - let cipher = XChaCha20Poly1305::new(generic_array::GenericArray::from_slice(&key)); - - let (ack_tx, mut ack_rx) = unbounded_channel::(); - // TODO use a more efficient data structure - let ack_map = Arc::new(RwLock::new(HashMap::::new())); - let recv_ack_map = ack_map.clone(); - tokio::select! { - // - // take in messages from a specific peer, decrypt them, and send to kernel - // - _ = async { - while let Some((encrypted_bytes, result_tx)) = decrypter_rx.recv().await { - let nonce = XNonce::from_slice(&encrypted_bytes[..24]); - if let Ok(decrypted) = cipher.decrypt(&nonce, &encrypted_bytes[24..]) { - if let Ok(message) = bincode::deserialize::(&decrypted) { - if message.source.node == who { - // println!("net: got peer message {}, acking\r", message.id); - let _ = result_tx.send(Ok(NetworkMessage::Ack(message.id))); - let _ = kernel_message_tx.send(message).await; - continue; - } - println!("net: got message 'from' wrong person! cheater/liar!\r"); - break; - } - println!("net: failed to deserialize message from {}\r", who); - break; - } - println!("net: failed to decrypt message from {}, could be spoofer\r", who); - break; - } - } => { - // println!("net: lost peer {who}\r"); - return None - } - // - // take in messages targeted at specific peer and either: - // - encrypt them, and send to proper connection - // - forward them untouched along the connection - // - _ = async { - // if we get a result_tx, rather than track it here, let a different - // part of the code handle whatever comes back from the socket. - while let Some((message, maybe_result_tx)) = message_rx.recv().await { - // if message is raw, we should encrypt. - // otherwise, simply send - match message { - PeerMessage::Raw(message) => { - let id = message.id; - if let Ok(bytes) = bincode::serialize::(&message) { - // generating a random nonce for each message. - // this isn't really as secure as we could get: should - // add a counter and then throw away the key when we hit a - // certain # of messages. TODO. - let nonce = XChaCha20Poly1305::generate_nonce(&mut OsRng); - if let Ok(encrypted) = cipher.encrypt(&nonce, bytes.as_ref()) { - if maybe_result_tx.is_none() { - ack_map.write().await.insert(id, message); - } - match socket_tx.send(( - NetworkMessage::Msg { - from: our.name.clone(), - to: who.clone(), - id: id, - contents: [nonce.to_vec(), encrypted].concat(), - }, - Some(maybe_result_tx.unwrap_or(ack_tx.clone())), - )) { - Ok(()) => tokio::task::yield_now().await, - Err(tokio::sync::mpsc::error::SendError((_, result_tx))) => { - // println!("net: lost socket with {who}\r"); - let _ = result_tx.unwrap().send(Ok(NetworkMessage::Nack(id))); - }, - } - } - } - } - PeerMessage::Net(net_message) => { - match socket_tx.send((net_message, Some(maybe_result_tx.unwrap_or(ack_tx.clone())))) { - Ok(()) => continue, - Err(tokio::sync::mpsc::error::SendError((net_message, result_tx))) => { - // println!("net: lost *forwarding* socket with {who}\r"); - let id = match net_message { - NetworkMessage::Msg { id, .. } => id, - NetworkMessage::Handshake(h) => h.id, - _ => continue, - }; - let _ = result_tx.unwrap().send(Ok(NetworkMessage::Nack(id))); - break; - }, - } - } - } - } - } => return None, - // - // receive acks and nacks from our socket - // throw away acks, but kill this peer and retry the send on nacks. - // - maybe_km = async { - while let Some(result) = ack_rx.recv().await { - match result { - Ok(NetworkMessage::Ack(id)) => { - // println!("net: got ack for message {}\r", id); - recv_ack_map.write().await.remove(&id); - continue; - } - Ok(NetworkMessage::Nack(id)) => { - // println!("net: got nack for message {}\r", id); - let Some(km) = recv_ack_map.write().await.remove(&id) else { - continue; - }; - // when we get a Nack, **delete this peer** and try to send the message again! - return Some(km) - } - Err((message_id, e)) => { - // println!("net: got error from ack_rx: {:?}\r", e); - // in practice this is always a timeout in current implementation - let Some(km) = recv_ack_map.write().await.remove(&message_id) else { - continue; - }; - let _ = network_error_tx - .send(WrappedSendError { - id: km.id, - source: km.source, - error: SendError { - kind: e, - target: km.target, - message: km.message, - payload: km.payload, - }, - }) - .await; - return None - } - _ => { - // println!("net: weird none ack\r"); - return None - } - } - } - return None; - } => { - // println!("net: exiting peer due to nackage\r"); - return maybe_km - }, - } -} diff --git a/src/net/mod.rs b/src/net/mod.rs index fefe7f7f..5e926123 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -1,23 +1,30 @@ -use crate::net::connections::*; -use crate::net::types::*; +use crate::net::{types::*, utils::*}; use crate::types::*; -use anyhow::Result; -use elliptic_curve::ecdh::EphemeralSecret; -use elliptic_curve::PublicKey; -use ethers::prelude::k256::{self, Secp256k1}; -use ring::signature::{self, Ed25519KeyPair}; -use std::{collections::HashMap, sync::Arc}; +use anyhow::{anyhow, Result}; +use futures::{SinkExt, StreamExt}; +use rand::seq::SliceRandom; +use ring::signature::Ed25519KeyPair; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use tokio::net::TcpListener; use tokio::sync::{mpsc::unbounded_channel, RwLock}; use tokio::task::JoinSet; -use tokio::time::timeout; -use tokio_tungstenite::{accept_async, connect_async, MaybeTlsStream}; +use tokio::time; +use tokio_tungstenite::{ + accept_async, connect_async, tungstenite, MaybeTlsStream, WebSocketStream, +}; -mod connections; mod types; +mod utils; // only used in connection initialization, otherwise, nacks and Responses are only used for "timeouts" -const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(15); +const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); + +/// 10 MB -- TODO analyze as desired, apps can always chunk data into many messages +/// note that this only applies to cross-network messages, not local ones. +const MESSAGE_MAX_SIZE: u32 = 10_485_800; /// Entry point from the main kernel task. Runs forever, spawns listener and sender tasks. pub async fn networking( @@ -27,676 +34,812 @@ pub async fn networking( kernel_message_tx: MessageSender, network_error_tx: NetworkErrorSender, print_tx: PrintSender, - message_tx: MessageSender, - mut message_rx: MessageReceiver, + self_message_tx: MessageSender, + message_rx: MessageReceiver, + reveal_ip: bool, ) -> Result<()> { - // TODO persist this here - let pki: OnchainPKI = Arc::new(RwLock::new(HashMap::new())); - let keys: PeerKeys = Arc::new(RwLock::new(HashMap::new())); - // mapping from QNS namehash to username - let names: PKINames = Arc::new(RwLock::new(HashMap::new())); - - // this only lives during a given run of the kernel - let peers: Peers = Arc::new(RwLock::new(HashMap::new())); - - // listener task either kickstarts our networking by establishing active connections - // with one or more routers, or spawns a websocket listener if we are a direct node. - let listener = match &our.ws_routing { + // branch on whether we are a direct or indirect node + match &our.ws_routing { None => { - // indirect node: connect to router(s) - tokio::spawn(connect_to_routers( - our.clone(), - keypair.clone(), - our_ip.clone(), - pki.clone(), - keys.clone(), - peers.clone(), - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - print_tx.clone(), - )) + // indirect node: run the indirect networking strategy + indirect_networking( + our, + our_ip, + keypair, + kernel_message_tx, + network_error_tx, + print_tx, + self_message_tx, + message_rx, + reveal_ip, + ) + .await } - Some((_ip, port)) => { - // direct node: spawn the websocket listener - tokio::spawn(receive_incoming_connections( - our.clone(), - keypair.clone(), - *port, - pki.clone(), - keys.clone(), - peers.clone(), - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - )) - } - }; - - let _ = tokio::join!(listener, async { - while let Some(km) = message_rx.recv().await { - // got a message from kernel to send out over the network - let target = &km.target.node; - // if the message is for us, it's either a protocol-level "hello" message, - // or a debugging command issued from our terminal. handle it here: - if target == &our.name { - handle_incoming_message( - &our, - km, - peers.clone(), - keys.clone(), - pki.clone(), - names.clone(), - kernel_message_tx.clone(), - print_tx.clone(), - ) - .await; - continue; - } - let peers_read = peers.read().await; - // - // we have the target as an active peer, meaning we can send the message directly - // - if let Some(peer) = peers_read.get(target) { - // println!("net: direct send to known peer\r"); - match peer.sender.send((PeerMessage::Raw(km.clone()), None)) { - Ok(_) => continue, - Err(_) => { - // println!("net: failed to send to known peer\r"); - drop(peers_read); - peers.write().await.remove(target); - error_offline(km, &network_error_tx).await; - continue; - } - } - } - drop(peers_read); - // - // we don't have the target as a peer yet, but we have shaken hands with them - // before, and can try to reuse that shared secret to send a message. - // first, we'll need to open a websocket and create a Peer struct for them. - // - if let Some((peer_id, secret)) = keys.read().await.get(target).cloned() { - // - // we can establish a connection directly with this peer - // - if let Some((ref ip, ref port)) = peer_id.ws_routing { - // println!("net: creating direct connection to peer with known keys\r"); - let Ok(ws_url) = make_ws_url(&our_ip, ip, port) else { - error_offline(km, &network_error_tx).await; - continue; - }; - let Ok(Ok((websocket, _response))) = - timeout(TIMEOUT, connect_async(ws_url)).await - else { - error_offline(km, &network_error_tx).await; - continue; - }; - let (socket_tx, conn_handle) = build_connection( - our.clone(), - keypair.clone(), - pki.clone(), - keys.clone(), - peers.clone(), - websocket, - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - None, - ) - .await; - let new_peer = create_new_peer( - our.clone(), - peer_id.clone(), - peers.clone(), - keys.clone(), - secret, - socket_tx.clone(), - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - ); - let (temp_result_tx, mut temp_result_rx) = unbounded_channel::(); - let _ = new_peer - .sender - .send((PeerMessage::Raw(km.clone()), Some(temp_result_tx))); - match timeout(TIMEOUT, temp_result_rx.recv()).await { - Ok(Some(Ok(NetworkMessage::Ack(_id)))) => { - peers.write().await.insert(peer_id.name.clone(), new_peer); - continue; - } - _ => { - // instead of throwing Offline now, throw away their keys and - // try again. - keys.write().await.remove(target); - conn_handle.abort(); - } - } - } - // - // need to find a router that will connect to this peer! - // - else { - // println!("net: finding router for peer with known keys\r"); - // get their ID from PKI so we have their most up-to-date router list - let Some(peer_id) = pki.read().await.get(target).cloned() else { - // this target cannot be found in the PKI! - // throw an Offline error. - error_offline(km, &network_error_tx).await; - continue; - }; - let mut success = false; - for router_namehash in &peer_id.allowed_routers { - let km = km.clone(); - let Some(router_name) = names.read().await.get(router_namehash).cloned() - else { - continue; - }; - let Some(router_id) = pki.read().await.get(&router_name).cloned() else { - continue; - }; - let Some((ref ip, ref port)) = router_id.ws_routing else { - continue; - }; - // - // otherwise, attempt to connect to the router's IP+port and send through that - // - // if we already have this router as a peer, use that socket_tx - let (socket_tx, maybe_conn_handle) = - if let Some(router) = peers.read().await.get(&router_name) { - (router.socket_tx.clone(), None) - } else { - let Ok(ws_url) = make_ws_url(&our_ip, ip, port) else { - continue; - }; - let Ok(Ok((websocket, _response))) = - timeout(TIMEOUT, connect_async(ws_url)).await - else { - continue; - }; - let (socket_tx, conn_handle) = build_connection( - our.clone(), - keypair.clone(), - pki.clone(), - keys.clone(), - peers.clone(), - websocket, - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - None, - ) - .await; - (socket_tx, Some(conn_handle)) - }; - let new_peer = create_new_peer( - our.clone(), - peer_id.clone(), - peers.clone(), - keys.clone(), - secret.clone(), - socket_tx.clone(), - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - ); - let (temp_result_tx, mut temp_result_rx) = - unbounded_channel::(); - let _ = new_peer - .sender - .send((PeerMessage::Raw(km.clone()), Some(temp_result_tx))); - match timeout(TIMEOUT, temp_result_rx.recv()).await { - Ok(Some(Ok(NetworkMessage::Ack(_id)))) => { - peers.write().await.insert(peer_id.name.clone(), new_peer); - success = true; - break; - } - _ => { - if let Some(conn_handle) = maybe_conn_handle { - conn_handle.abort(); - } - continue; - } - } - } - if !success { - // instead of throwing Offline now, throw away their keys and - // try again. - keys.write().await.remove(target); - } - } - } - // - // sending a message to a peer for which we don't have active networking info. - // this means that we need to search the PKI for the peer, and then attempt to - // exchange handshakes with them. - // - let Some(peer_id) = pki.read().await.get(target).cloned() else { - // this target cannot be found in the PKI! - // throw an Offline error. - error_offline(km, &network_error_tx).await; - continue; - }; - // - // we can establish a connection directly with this peer, then send a handshake - // - if let Some((ref ip, ref port)) = peer_id.ws_routing { - // println!("net: creating direct connection to peer in PKI\r"); - let Ok(ws_url) = make_ws_url(&our_ip, ip, port) else { - error_offline(km, &network_error_tx).await; - continue; - }; - let Ok(Ok((websocket, _response))) = timeout(TIMEOUT, connect_async(ws_url)).await - else { - error_offline(km, &network_error_tx).await; - continue; - }; - let (socket_tx, conn_handle) = build_connection( - our.clone(), - keypair.clone(), - pki.clone(), - keys.clone(), - peers.clone(), - websocket, - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - None, - ) - .await; - let (secret, handshake) = - make_secret_and_handshake(&our, keypair.clone(), target, None); - let (handshake_tx, mut handshake_rx) = unbounded_channel::(); - socket_tx - .send((NetworkMessage::Handshake(handshake), Some(handshake_tx))) - .unwrap(); - let response_shake = match timeout(TIMEOUT, handshake_rx.recv()).await { - Ok(Some(Ok(NetworkMessage::HandshakeAck(shake)))) => shake, - _ => { - // println!("net: failed handshake with {target}\r"); - error_offline(km, &network_error_tx).await; - conn_handle.abort(); - continue; - } - }; - let Ok(their_ephemeral_pk) = validate_handshake(&response_shake, &peer_id) else { - // println!("net: failed handshake with {target}\r"); - error_offline(km, &network_error_tx).await; - conn_handle.abort(); - continue; - }; - let secret = Arc::new(secret.diffie_hellman(&their_ephemeral_pk)); - // save the handshake to our Keys map - keys.write() - .await - .insert(peer_id.name.clone(), (peer_id.clone(), secret.clone())); - let new_peer = create_new_peer( - our.clone(), - peer_id.clone(), - peers.clone(), - keys.clone(), - secret, - socket_tx.clone(), - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - ); - // can't do a self_tx.send here because we need to maintain ordering of messages - // already queued. - let (temp_result_tx, mut temp_result_rx) = unbounded_channel::(); - let _ = new_peer - .sender - .send((PeerMessage::Raw(km.clone()), Some(temp_result_tx))); - match timeout(TIMEOUT, temp_result_rx.recv()).await { - Ok(Some(Ok(NetworkMessage::Ack(_id)))) => { - peers.write().await.insert(peer_id.name.clone(), new_peer); - continue; - } - _ => { - // instead of throwing Offline now, throw away their keys and - // try again. - keys.write().await.remove(target); - conn_handle.abort(); - } - } - } - // - // need to find a router that will connect to this peer, then do a handshake - // - else { - // println!("net: looking for router to create connection to peer in PKI\r"); - let Some(peer_id) = pki.read().await.get(target).cloned() else { - // this target cannot be found in the PKI! - // throw an Offline error. - error_offline(km, &network_error_tx).await; - continue; - }; - let mut success = false; - for router_namehash in &peer_id.allowed_routers { - let km = km.clone(); - let Some(router_name) = names.read().await.get(router_namehash).cloned() else { - continue; - }; - if router_name == our.name { - // don't try to connect to ourselves! - continue; - } - let Some(router_id) = pki.read().await.get(&router_name).cloned() else { - continue; - }; - let Some((ref ip, ref port)) = router_id.ws_routing else { - continue; - }; - // - // attempt to connect to the router's IP+port and send through that - // if we already have this router as a peer, use that socket_tx - let (socket_tx, maybe_conn_handle) = - if let Some(router) = peers.read().await.get(&router_name) { - (router.socket_tx.clone(), None) - } else { - let Ok(ws_url) = make_ws_url(&our_ip, ip, port) else { - continue; - }; - let Ok(Ok((websocket, _response))) = - timeout(TIMEOUT, connect_async(ws_url)).await - else { - continue; - }; - let (socket_tx, conn_handle) = build_connection( - our.clone(), - keypair.clone(), - pki.clone(), - keys.clone(), - peers.clone(), - websocket, - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - None, - ) - .await; - (socket_tx, Some(conn_handle)) - }; - let (secret, handshake) = - make_secret_and_handshake(&our, keypair.clone(), target, None); - let (handshake_tx, mut handshake_rx) = unbounded_channel::(); - socket_tx - .send((NetworkMessage::Handshake(handshake), Some(handshake_tx))) - .unwrap(); - let response_shake = match timeout(TIMEOUT, handshake_rx.recv()).await { - Ok(Some(Ok(NetworkMessage::HandshakeAck(shake)))) => shake, - _ => { - if let Some(conn_handle) = maybe_conn_handle { - conn_handle.abort(); - } - continue; - } - }; - let Ok(their_ephemeral_pk) = validate_handshake(&response_shake, &peer_id) - else { - if let Some(conn_handle) = maybe_conn_handle { - conn_handle.abort(); - } - continue; - }; - let secret = Arc::new(secret.diffie_hellman(&their_ephemeral_pk)); - // save the handshake to our Keys map - keys.write() - .await - .insert(peer_id.name.clone(), (peer_id.clone(), secret.clone())); - let new_peer = create_new_peer( - our.clone(), - peer_id.clone(), - peers.clone(), - keys.clone(), - secret, - socket_tx.clone(), - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - ); - let (temp_result_tx, mut temp_result_rx) = unbounded_channel::(); - let _ = new_peer - .sender - .send((PeerMessage::Raw(km.clone()), Some(temp_result_tx))); - match timeout(TIMEOUT, temp_result_rx.recv()).await { - Ok(Some(Ok(NetworkMessage::Ack(_id)))) => { - peers.write().await.insert(peer_id.name.clone(), new_peer); - success = true; - break; - } - _ => { - if let Some(conn_handle) = maybe_conn_handle { - conn_handle.abort(); - } - continue; - } - } - } - if !success { - error_offline(km, &network_error_tx).await; - continue; - } - } - } - }); - Err(anyhow::anyhow!("networking task exited")) -} - -async fn error_offline(km: KernelMessage, network_error_tx: &NetworkErrorSender) { - let _ = network_error_tx - .send(WrappedSendError { - id: km.id, - source: km.source, - error: SendError { - kind: SendErrorKind::Offline, - target: km.target, - message: km.message, - payload: km.payload, - }, - }) - .await; -} - -/// Only used if an indirect node. -async fn connect_to_routers( - our: Identity, - keypair: Arc, - our_ip: String, - pki: OnchainPKI, - keys: PeerKeys, - peers: Peers, - kernel_message_tx: MessageSender, - net_message_tx: MessageSender, - network_error_tx: NetworkErrorSender, - print_tx: PrintSender, -) { - // as soon as we boot, need to try and connect to all of our allowed_routers - // we can "throw away" routers that have a bad URL setup - // - // any time we *lose* a router, we need to try and reconnect on a loop - // we should always be trying to connect to all "good" routers we don't already have - - let (routers_to_try_tx, mut routers_to_try_rx) = unbounded_channel::(); - let mut active_routers = JoinSet::, tokio::task::JoinError>>::new(); - - // at start, add all our routers to list - for router_name in our.allowed_routers.clone() { - routers_to_try_tx.send(router_name).unwrap(); - } - - loop { - // we sleep here in order not to BLAST routers with connections - // if we have a PKI mismatch with them for some amount of time. - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - tokio::select! { - Some(Ok(Ok(Some(dead_router)))) = active_routers.join_next() => { - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("net: connection to router {dead_router} died"), - }) - .await; - peers.write().await.remove(&dead_router); - if active_routers.is_empty() { - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("net: no working routers, we are offline!"), - }) - .await; - } - let _ = routers_to_try_tx.send(dead_router); - } - Some(router_name) = routers_to_try_rx.recv() => { - if peers.read().await.contains_key(&router_name) { - continue; - } - let Some(router_id) = pki.read().await.get(&router_name).cloned() else { - let _ = routers_to_try_tx.send(router_name); - continue; - }; - let Some((ref ip, ref port)) = router_id.ws_routing else { - // this is a bad router, can remove from our list - continue; - }; - let Ok(ws_url) = make_ws_url(&our_ip, ip, port) else { - // this is a bad router, can remove from our list - continue; - }; - let Ok(Ok((websocket, _response))) = timeout(TIMEOUT, connect_async(ws_url)).await else { - let _ = routers_to_try_tx.send(router_name); - continue; - }; - // we never try to reuse keys with routers because we need to make - // sure we have a live connection with them. - // connect to their websocket and then send a handshake. - // save the handshake in our keys map, then save them as an active peer. - let (socket_tx, conn_handle) = build_connection( - our.clone(), - keypair.clone(), - pki.clone(), - keys.clone(), - peers.clone(), - websocket, - kernel_message_tx.clone(), - net_message_tx.clone(), - network_error_tx.clone(), - Some(router_name.clone()), - ) - .await; - let (secret, handshake) = - make_secret_and_handshake(&our, keypair.clone(), &router_name, None); - let (handshake_tx, mut handshake_rx) = unbounded_channel::(); - socket_tx - .send((NetworkMessage::Handshake(handshake), Some(handshake_tx))) - .unwrap(); - let response_shake = match timeout(TIMEOUT, handshake_rx.recv()).await { - Ok(Some(Ok(NetworkMessage::HandshakeAck(shake)))) => shake, - _ => { - // println!("net: failed handshake with {router_name}\r"); - conn_handle.abort(); - let _ = routers_to_try_tx.send(router_name); - continue; - } - }; - let Ok(their_ephemeral_pk) = validate_handshake(&response_shake, &router_id) else { - // println!("net: failed handshake with {router_name}\r"); - conn_handle.abort(); - let _ = routers_to_try_tx.send(router_name); - continue; - }; - let secret = Arc::new(secret.diffie_hellman(&their_ephemeral_pk)); - // save the handshake to our Keys map - keys.write().await.insert( - router_id.name.clone(), - (router_id.clone(), secret.clone()), - ); - let new_peer = create_new_peer( - our.clone(), - router_id.clone(), - peers.clone(), - keys.clone(), - secret, - socket_tx.clone(), - kernel_message_tx.clone(), - net_message_tx.clone(), - network_error_tx.clone(), - ); - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("net: connected to router {router_name}"), - }) - .await; - peers.write().await.insert(router_id.name.clone(), new_peer); - active_routers.spawn(conn_handle); - } - } - } -} - -/// only used if direct. should live forever -async fn receive_incoming_connections( - our: Identity, - keypair: Arc, - port: u16, - pki: OnchainPKI, - keys: PeerKeys, - peers: Peers, - kernel_message_tx: MessageSender, - net_message_tx: MessageSender, - network_error_tx: NetworkErrorSender, -) { - let tcp = TcpListener::bind(format!("0.0.0.0:{}", port)) - .await - .expect(format!("net: fatal error: can't listen on port {port}. change port onchain or free up this port!").as_str()); - - while let Ok((stream, _socket_addr)) = tcp.accept().await { - // TODO we can perform some amount of validation here - // to prevent some amount of potential DDoS attacks. - // can block based on socket_addr, but not QNS ID. - match accept_async(MaybeTlsStream::Plain(stream)).await { - Ok(websocket) => { - // println!("received incoming connection\r"); - tokio::spawn(build_connection( - our.clone(), - keypair.clone(), - pki.clone(), - keys.clone(), - peers.clone(), - websocket, - kernel_message_tx.clone(), - net_message_tx.clone(), - network_error_tx.clone(), - None, + Some((ip, port)) => { + // direct node: run the direct networking strategy + if &our_ip != ip { + return Err(anyhow!( + "net: fatal error: IP address mismatch: {} != {}, update your QNS identity", + our_ip, + ip )); } - // ignore connections we failed to accept - Err(_) => {} + let tcp = match TcpListener::bind(format!("0.0.0.0:{}", port)).await { + Ok(tcp) => tcp, + Err(_e) => { + return Err(anyhow!( + "net: fatal error: can't listen on port {}, update your QNS identity or free up that port", + port, + )); + } + }; + direct_networking( + our, + our_ip, + tcp, + keypair, + kernel_message_tx, + network_error_tx, + print_tx, + self_message_tx, + message_rx, + ) + .await } } } -/// net module only handles requests, will never return a response -async fn handle_incoming_message( +async fn indirect_networking( + our: Identity, + our_ip: String, + keypair: Arc, + kernel_message_tx: MessageSender, + network_error_tx: NetworkErrorSender, + print_tx: PrintSender, + self_message_tx: MessageSender, + mut message_rx: MessageReceiver, + reveal_ip: bool, +) -> Result<()> { + let mut pki: OnchainPKI = HashMap::new(); + let mut peers: Peers = HashMap::new(); + // mapping from QNS namehash to username + let mut names: PKINames = HashMap::new(); + let mut peer_connections = JoinSet::<(NodeId, Option)>::new(); + // indirect-specific structure + let mut active_routers = HashSet::::new(); + + loop { + tokio::select! { + // 1. receive messages from kernel and send out over connections, + // making new connections through our router-set as needed + Some(km) = message_rx.recv() => { + // got a message from kernel to send out over the network + let target = &km.target.node; + // if the message is for us, it's either a protocol-level "hello" message, + // or a debugging command issued from our terminal. handle it here: + if target == &our.name { + match handle_local_message( + &our, + &our_ip, + &keypair, + km, + &mut peers, + &mut pki, + &mut peer_connections, + None, + None, + Some(&active_routers), + &mut names, + &kernel_message_tx, + &print_tx, + ) + .await { + Ok(()) => {}, + Err(e) => { + print_tx.send(Printout { + verbosity: 0, + content: format!("net: error handling local message: {e}") + }).await?; + } + } + } + // if the message is for a peer we currently have a connection with, + // try to send it to them + else if let Some(peer) = peers.get_mut(target) { + peer.write().await.sender.send(km)?; + } + else if let Some(peer_id) = pki.get(target) { + // if the message is for a *direct* peer we don't have a connection with, + // try to establish a connection with them + // here, we can *choose* to use our routers so as not to reveal + // networking information about ourselves to the target. + if peer_id.ws_routing.is_some() && reveal_ip { + match init_connection(&our, &our_ip, peer_id, &keypair, None, false).await { + Ok(direct_conn) => { + save_new_peer( + peer_id, + false, + &mut peers, + &mut peer_connections, + direct_conn, + Some(km), + &kernel_message_tx, + &print_tx, + ).await?; + } + Err(_) => { + error_offline(km, &network_error_tx).await?; + } + } + } + // if the message is for an *indirect* peer we don't have a connection with, + // or we want to protect our node's physical networking details from non-routers, + // do some routing: in a randomized order, go through their listed routers + // on chain and try to get one of them to build a proxied connection to + // this node for you + else { + let sent = time::timeout(TIMEOUT, + init_connection_via_router( + &our, + &our_ip, + &keypair, + km.clone(), + peer_id, + &pki, + &names, + &mut peers, + &mut peer_connections, + kernel_message_tx.clone(), + print_tx.clone(), + )).await; + if !sent.unwrap_or(false) { + // none of the routers worked! + error_offline(km, &network_error_tx).await?; + } + } + } + // peer cannot be found in PKI, throw an offline error + else { + error_offline(km, &network_error_tx).await?; + } + } + // 2. deal with active connections that die by removing the associated peer + // if the peer is one of our routers, remove them from router-set + Some(Ok((dead_peer, maybe_resend))) = peer_connections.join_next() => { + peers.remove(&dead_peer); + active_routers.remove(&dead_peer); + match maybe_resend { + None => {}, + Some(km) => { + self_message_tx.send(km).await?; + } + } + } + // 3. periodically attempt to connect to any allowed routers that we + // are not connected to + _ = time::sleep(time::Duration::from_secs(3)) => { + if active_routers.len() == our.allowed_routers.len() { + continue; + } + for router in &our.allowed_routers { + if active_routers.contains(router) { + continue; + } + let Some(router_id) = pki.get(router) else { + continue; + }; + match init_connection(&our, &our_ip, router_id, &keypair, None, true).await { + Ok(direct_conn) => { + print_tx.send(Printout { + verbosity: 0, + content: format!("now connected to router {}", router_id.name), + }).await?; + active_routers.insert(router_id.name.clone()); + save_new_peer( + router_id, + false, + &mut peers, + &mut peer_connections, + direct_conn, + None, + &kernel_message_tx, + &print_tx, + ).await?; + } + Err(_e) => continue, + } + } + } + } + } +} + +async fn direct_networking( + our: Identity, + our_ip: String, + tcp: TcpListener, + keypair: Arc, + kernel_message_tx: MessageSender, + network_error_tx: NetworkErrorSender, + print_tx: PrintSender, + self_message_tx: MessageSender, + mut message_rx: MessageReceiver, +) -> Result<()> { + let mut pki: OnchainPKI = HashMap::new(); + let mut peers: Peers = HashMap::new(); + // mapping from QNS namehash to username + let mut names: PKINames = HashMap::new(); + let mut peer_connections = JoinSet::<(NodeId, Option)>::new(); + // direct-specific structures + let mut forwarding_connections = JoinSet::<()>::new(); + let mut pending_passthroughs: PendingPassthroughs = HashMap::new(); + + loop { + tokio::select! { + // 1. receive messages from kernel and send out over our connections, + // making new connections as needed + Some(km) = message_rx.recv() => { + // got a message from kernel to send out over the network + let target = &km.target.node; + // if the message is for us, it's either a protocol-level "hello" message, + // or a debugging command issued from our terminal. handle it here: + if target == &our.name { + match handle_local_message( + &our, + &our_ip, + &keypair, + km, + &mut peers, + &mut pki, + &mut peer_connections, + Some(&mut pending_passthroughs), + Some(&forwarding_connections), + None, + &mut names, + &kernel_message_tx, + &print_tx, + ) + .await { + Ok(()) => {}, + Err(e) => { + print_tx.send(Printout { + verbosity: 0, + content: format!("net: error handling local message: {}", e) + }).await?; + } + } + } + // if the message is for a peer we currently have a connection with, + // try to send it to them + else if let Some(peer) = peers.get_mut(target) { + peer.write().await.sender.send(km)?; + } + else if let Some(peer_id) = pki.get(target) { + // if the message is for a *direct* peer we don't have a connection with, + // try to establish a connection with them + if peer_id.ws_routing.is_some() { + match init_connection(&our, &our_ip, peer_id, &keypair, None, false).await { + Ok(direct_conn) => { + save_new_peer( + peer_id, + false, + &mut peers, + &mut peer_connections, + direct_conn, + Some(km), + &kernel_message_tx, + &print_tx, + ).await?; + } + Err(_) => { + error_offline(km, &network_error_tx).await?; + } + } + } + // if the message is for an *indirect* peer we don't have a connection with, + // do some routing: in a randomized order, go through their listed routers + // on chain and try to get one of them to build a proxied connection to + // this node for you + else { + let sent = time::timeout(TIMEOUT, + init_connection_via_router( + &our, + &our_ip, + &keypair, + km.clone(), + peer_id, + &pki, + &names, + &mut peers, + &mut peer_connections, + kernel_message_tx.clone(), + print_tx.clone(), + )).await; + if !sent.unwrap_or(false) { + // none of the routers worked! + error_offline(km, &network_error_tx).await?; + } + } + } + // peer cannot be found in PKI, throw an offline error + else { + error_offline(km, &network_error_tx).await?; + } + } + // 2. receive incoming TCP connections + Ok((stream, _socket_addr)) = tcp.accept() => { + // TODO we can perform some amount of validation here + // to prevent some amount of potential DDoS attacks. + // can also block based on socket_addr + match accept_async(MaybeTlsStream::Plain(stream)).await { + Ok(websocket) => { + let (peer_id, routing_for, conn) = + match recv_connection( + &our, + &our_ip, + &pki, + &peers, + &mut pending_passthroughs, + &keypair, + websocket).await + { + Ok(res) => res, + Err(e) => { + print_tx.send(Printout { + verbosity: 0, + content: format!("net: recv_connection failed: {e}"), + }).await?; + continue; + } + }; + // TODO if their handshake indicates they want us to proxy + // for them (aka act as a router for them) we can choose + // whether to do so here! + // if conn is direct, add peer. if passthrough, add to our + // forwarding connections joinset + match conn { + Connection::Peer(peer_conn) => { + let (peer_tx, peer_rx) = unbounded_channel::(); + let peer = Arc::new(RwLock::new(Peer { + identity: peer_id, + routing_for, + sender: peer_tx, + })); + peers.insert(peer.read().await.identity.name.clone(), peer.clone()); + peer_connections.spawn(maintain_connection( + peer, + peer_conn, + peer_rx, + kernel_message_tx.clone(), + print_tx.clone(), + )); + } + Connection::Passthrough(passthrough_conn) => { + forwarding_connections.spawn(maintain_passthrough( + passthrough_conn, + )); + } + Connection::PendingPassthrough(pending_conn) => { + pending_passthroughs.insert( + (peer_id.name.clone(), pending_conn.target.clone()), + pending_conn + ); + } + } + } + // ignore connections we failed to accept...? + Err(_) => {} + } + } + // 3. deal with active connections that die by removing the associated peer + Some(Ok((dead_peer, maybe_resend))) = peer_connections.join_next() => { + peers.remove(&dead_peer); + match maybe_resend { + None => {}, + Some(km) => { + self_message_tx.send(km).await?; + } + } + } + } + } +} + +async fn init_connection_via_router( our: &Identity, + our_ip: &str, + keypair: &Ed25519KeyPair, km: KernelMessage, - peers: Peers, - keys: PeerKeys, - pki: OnchainPKI, - names: PKINames, + peer_id: &Identity, + pki: &OnchainPKI, + names: &PKINames, + peers: &mut Peers, + peer_connections: &mut JoinSet<(NodeId, Option)>, kernel_message_tx: MessageSender, print_tx: PrintSender, -) { +) -> bool { + let routers_shuffled = { + let mut routers = peer_id.allowed_routers.clone(); + routers.shuffle(&mut rand::thread_rng()); + routers + }; + for router_namehash in &routers_shuffled { + let Some(router_name) = names.get(router_namehash) else { + continue; + }; + let router_id = match pki.get(router_name) { + None => continue, + Some(id) => id, + }; + match init_connection(&our, &our_ip, peer_id, &keypair, Some(router_id), false).await { + Ok(direct_conn) => { + return save_new_peer( + peer_id, + false, + peers, + peer_connections, + direct_conn, + Some(km), + &kernel_message_tx, + &print_tx, + ) + .await + .is_ok() + } + Err(_) => continue, + } + } + return false; +} + +async fn recv_connection( + our: &Identity, + our_ip: &str, + pki: &OnchainPKI, + peers: &Peers, + pending_passthroughs: &mut PendingPassthroughs, + keypair: &Ed25519KeyPair, + websocket: WebSocketStream>, +) -> Result<(Identity, bool, Connection)> { + let mut buf = vec![0u8; 65535]; + let (mut noise, our_static_key) = build_responder(); + let (mut write_stream, mut read_stream) = websocket.split(); + + // before we begin XX handshake pattern, check first message over socket + let first_message = &ws_recv(&mut read_stream).await?; + + // if the first message contains a "routing request", + // we see if the target is someone we are actively routing for, + // and create a Passthrough connection if so. + // a Noise 'e' message with have len 32 + if first_message.len() != 32 { + let (their_id, target_name) = validate_routing_request(&our.name, &first_message, pki)?; + let (id, conn) = create_passthrough( + our, + our_ip, + their_id, + target_name, + pki, + peers, + pending_passthroughs, + write_stream, + read_stream, + ) + .await?; + return Ok((id, false, conn)); + } + + // <- e + noise.read_message(first_message, &mut buf)?; + + // -> e, ee, s, es + send_uqbar_handshake( + &our, + keypair, + &our_static_key, + &mut noise, + &mut buf, + &mut write_stream, + false, + ) + .await?; + + // <- s, se + let their_handshake = recv_uqbar_handshake(&mut noise, &mut buf, &mut read_stream).await?; + + // now validate this handshake payload against the QNS PKI + let their_id = pki + .get(&their_handshake.name) + .ok_or(anyhow!("unknown QNS name"))?; + validate_handshake( + &their_handshake, + noise + .get_remote_static() + .ok_or(anyhow!("noise error: missing remote pubkey"))?, + their_id, + )?; + + Ok(( + their_id.clone(), + their_handshake.proxy_request, + Connection::Peer(PeerConnection { + noise: noise.into_transport_mode()?, + buf, + write_stream, + read_stream, + }), + )) +} + +async fn recv_connection_via_router( + our: &Identity, + our_ip: &str, + their_name: &str, + pki: &OnchainPKI, + keypair: &Ed25519KeyPair, + router: &Identity, +) -> Result<(Identity, PeerConnection)> { + let mut buf = vec![0u8; 65535]; + let (mut noise, our_static_key) = build_responder(); + + let Some((ref ip, ref port)) = router.ws_routing else { + return Err(anyhow!("router has no routing information")); + }; + let Ok(ws_url) = make_ws_url(our_ip, ip, port) else { + return Err(anyhow!("failed to parse websocket url")); + }; + let Ok(Ok((websocket, _response))) = time::timeout(TIMEOUT, connect_async(ws_url)).await else { + return Err(anyhow!("failed to connect to target")); + }; + let (mut write_stream, mut read_stream) = websocket.split(); + + // before beginning XX handshake pattern, send a routing request + let req = rmp_serde::to_vec(&RoutingRequest { + protocol_version: 1, + source: our.name.clone(), + signature: keypair + .sign([their_name, router.name.as_str()].concat().as_bytes()) + .as_ref() + .to_vec(), + target: their_name.to_string(), + })?; + write_stream.send(tungstenite::Message::binary(req)).await?; + // <- e + noise.read_message(&ws_recv(&mut read_stream).await?, &mut buf)?; + + // -> e, ee, s, es + send_uqbar_handshake( + &our, + keypair, + &our_static_key, + &mut noise, + &mut buf, + &mut write_stream, + false, + ) + .await?; + + // <- s, se + let their_handshake = recv_uqbar_handshake(&mut noise, &mut buf, &mut read_stream).await?; + + // now validate this handshake payload against the QNS PKI + let their_id = pki + .get(&their_handshake.name) + .ok_or(anyhow!("unknown QNS name"))?; + validate_handshake( + &their_handshake, + noise + .get_remote_static() + .ok_or(anyhow!("noise error: missing remote pubkey"))?, + their_id, + )?; + + Ok(( + their_id.clone(), + PeerConnection { + noise: noise.into_transport_mode()?, + buf, + write_stream, + read_stream, + }, + )) +} + +async fn init_connection( + our: &Identity, + our_ip: &str, + peer_id: &Identity, + keypair: &Ed25519KeyPair, + use_router: Option<&Identity>, + proxy_request: bool, +) -> Result { + let mut buf = vec![0u8; 65535]; + let (mut noise, our_static_key) = build_initiator(); + + let (ref ip, ref port) = match use_router { + None => peer_id + .ws_routing + .as_ref() + .ok_or(anyhow!("target has no routing information"))?, + Some(router_id) => router_id + .ws_routing + .as_ref() + .ok_or(anyhow!("target has no routing information"))?, + }; + let ws_url = make_ws_url(our_ip, ip, port)?; + let Ok(Ok((websocket, _response))) = time::timeout(TIMEOUT, connect_async(ws_url)).await else { + return Err(anyhow!("failed to connect to target")); + }; + let (mut write_stream, mut read_stream) = websocket.split(); + + // if this is a routed request, before starting XX handshake pattern, send a + // routing request message over socket + if use_router.is_some() { + let req = rmp_serde::to_vec(&RoutingRequest { + protocol_version: 1, + source: our.name.clone(), + signature: keypair + .sign( + [&peer_id.name, use_router.unwrap().name.as_str()] + .concat() + .as_bytes(), + ) + .as_ref() + .to_vec(), + target: peer_id.name.clone(), + })?; + write_stream.send(tungstenite::Message::binary(req)).await?; + } + + // -> e + let len = noise.write_message(&[], &mut buf)?; + write_stream + .send(tungstenite::Message::binary(&buf[..len])) + .await?; + + // <- e, ee, s, es + let their_handshake = recv_uqbar_handshake(&mut noise, &mut buf, &mut read_stream).await?; + + // now validate this handshake payload against the QNS PKI + validate_handshake( + &their_handshake, + noise + .get_remote_static() + .ok_or(anyhow!("noise error: missing remote pubkey"))?, + peer_id, + )?; + + // -> s, se + send_uqbar_handshake( + &our, + keypair, + &our_static_key, + &mut noise, + &mut buf, + &mut write_stream, + proxy_request, + ) + .await?; + + Ok(PeerConnection { + noise: noise.into_transport_mode()?, + buf, + write_stream, + read_stream, + }) +} + +/// net module only handles incoming local requests, will never return a response +async fn handle_local_message( + our: &Identity, + our_ip: &str, + keypair: &Ed25519KeyPair, + km: KernelMessage, + peers: &mut Peers, + pki: &mut OnchainPKI, + peer_connections: &mut JoinSet<(NodeId, Option)>, + pending_passthroughs: Option<&mut PendingPassthroughs>, + forwarding_connections: Option<&JoinSet<()>>, + active_routers: Option<&HashSet>, + names: &mut PKINames, + kernel_message_tx: &MessageSender, + print_tx: &PrintSender, +) -> Result<()> { let ipc = match km.message { - Message::Response(_) => return, Message::Request(request) => request.ipc, + Message::Response((response, _context)) => { + // these are received as a router, when we send ConnectionRequests + // to a node we do routing for. + match rmp_serde::from_slice::(&response.ipc) { + Ok(NetResponses::Accepted(_)) => { + // TODO anything here? + } + Ok(NetResponses::Rejected(to)) => { + // drop from our pending map + // this will drop the socket, causing initiator to see it as failed + pending_passthroughs + .ok_or(anyhow!("got net response as non-router"))? + .remove(&(to, km.source.node)); + } + Err(_) => { + // this is usually the "delivered" response to a raw message + } + } + return Ok(()); + } }; if km.source.node != our.name { + if let Ok(act) = rmp_serde::from_slice::(&ipc) { + match act { + NetActions::QnsBatchUpdate(_) | NetActions::QnsUpdate(_) => { + // for now, we don't get these from remote. + } + NetActions::ConnectionRequest(from) => { + // someone wants to open a passthrough with us through a router! + // if we are an indirect node, and source is one of our routers, + // respond by attempting to init a matching passthrough. + let res: Result = if our.allowed_routers.contains(&km.source.node) + { + let router_id = peers + .get(&km.source.node) + .ok_or(anyhow!("unknown router"))? + .read() + .await + .identity + .clone(); + let (peer_id, peer_conn) = time::timeout( + TIMEOUT, + recv_connection_via_router( + our, our_ip, &from, pki, keypair, &router_id, + ), + ) + .await??; + save_new_peer( + &peer_id, + false, + peers, + peer_connections, + peer_conn, + None, + &kernel_message_tx, + &print_tx, + ) + .await?; + Ok(NetResponses::Accepted(from.clone())) + } else { + Ok(NetResponses::Rejected(from.clone())) + }; + kernel_message_tx + .send(KernelMessage { + id: km.id, + source: Address { + node: our.name.clone(), + process: ProcessId::from_str("net:sys:uqbar").unwrap(), + }, + target: km.rsvp.unwrap_or(km.source), + rsvp: None, + message: Message::Response(( + Response { + inherit: false, + ipc: rmp_serde::to_vec( + &res.unwrap_or(NetResponses::Rejected(from)), + )?, + metadata: None, + }, + None, + )), + payload: None, + signed_capabilities: None, + }) + .await?; + } + } + return Ok(()); + }; + // if we can't parse this to a netaction, treat it as a hello and print it // respond to a text message with a simple "delivered" response - let _ = print_tx + print_tx .send(Printout { verbosity: 0, content: format!( @@ -705,8 +848,8 @@ async fn handle_incoming_message( std::str::from_utf8(&ipc).unwrap_or("!!message parse error!!") ), }) - .await; - let _ = kernel_message_tx + .await?; + kernel_message_tx .send(KernelMessage { id: km.id, source: Address { @@ -726,67 +869,61 @@ async fn handle_incoming_message( payload: None, signed_capabilities: None, }) - .await; + .await?; + Ok(()) } else { - // available commands: "peers", "QnsUpdate" (see qns_indexer module) + // available commands: "peers", "pki", "names", "diagnostics" // first parse as raw string, then deserialize to NetActions object + let mut printout = String::new(); match std::str::from_utf8(&ipc) { Ok("peers") => { - let peer_read = peers.read().await; - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("{:?}", peer_read.keys()), - }) - .await; - } - Ok("keys") => { - let keys_read = keys.read().await; - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("{:?}", keys_read.keys()), - }) - .await; + printout.push_str(&format!("{:#?}", peers.keys())); } Ok("pki") => { - let pki_read = pki.read().await; - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("{:?}", pki_read), - }) - .await; + printout.push_str(&format!("{:#?}", pki)); } Ok("names") => { - let names_read = names.read().await; - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("{:?}", names_read), - }) - .await; + printout.push_str(&format!("{:#?}", names)); + } + Ok("diagnostics") => { + printout.push_str(&format!("our Identity: {:#?}\r\n", our)); + printout.push_str(&format!( + "we have connections with peers: {:#?}\r\n", + peers.keys() + )); + printout.push_str(&format!("we have {} entries in the PKI\r\n", pki.len())); + printout.push_str(&format!( + "we have {} open peer connections\r\n", + peer_connections.len() + )); + + if pending_passthroughs.is_some() { + printout.push_str(&format!( + "we have {} pending passthrough connections\r\n", + pending_passthroughs.unwrap().len() + )); + } + if forwarding_connections.is_some() { + printout.push_str(&format!( + "we have {} open passthrough connections\r\n", + forwarding_connections.unwrap().len() + )); + } + if active_routers.is_some() { + printout.push_str(&format!( + "we have {} active routers\r\n", + active_routers.unwrap().len() + )); + } } _ => { - let Ok(act) = serde_json::from_slice::(&ipc) else { - let _ = print_tx - .send(Printout { - verbosity: 0, - content: "net: got unknown command".into(), - }) - .await; - return; - }; - match act { + match rmp_serde::from_slice::(&ipc)? { + NetActions::ConnectionRequest(_) => { + // we shouldn't receive these from ourselves. + } NetActions::QnsUpdate(log) => { - let _ = print_tx - .send(Printout { - verbosity: 1, - content: format!("net: got QNS update for {}", log.name), - }) - .await; - - let _ = pki.write().await.insert( + // printout.push_str(&format!("net: got QNS update for {}", log.name)); + pki.insert( log.name.clone(), Identity { name: log.name.clone(), @@ -799,20 +936,15 @@ async fn handle_incoming_message( allowed_routers: log.routers, }, ); - let _ = names.write().await.insert(log.node, log.name); + names.insert(log.node, log.name); } NetActions::QnsBatchUpdate(log_list) => { - let _ = print_tx - .send(Printout { - verbosity: 1, - content: format!( - "net: got QNS update with {} peers", - log_list.len() - ), - }) - .await; + // printout.push_str(&format!( + // "net: got QNS update with {} peers", + // log_list.len() + // )); for log in log_list { - let _ = pki.write().await.insert( + pki.insert( log.name.clone(), Identity { name: log.name.clone(), @@ -826,125 +958,20 @@ async fn handle_incoming_message( allowed_routers: log.routers, }, ); - let _ = names.write().await.insert(log.node, log.name); + names.insert(log.node, log.name); } } } } } - } -} - -/* - * networking utils - */ - -fn make_ws_url(our_ip: &str, ip: &str, port: &u16) -> Result { - // if we have the same public IP as target, route locally, - // otherwise they will appear offline due to loopback stuff - let ip = if our_ip == ip { "localhost" } else { ip }; - match url::Url::parse(&format!("ws://{}:{}/ws", ip, port)) { - Ok(v) => Ok(v), - Err(_) => Err(SendErrorKind::Offline), - } -} - -/* - * handshake utils - */ - -/// take in handshake and PKI identity, and confirm that the handshake is valid. -/// takes in optional nonce, which must be the one that connection initiator created. -fn validate_handshake( - handshake: &Handshake, - their_id: &Identity, -) -> Result>, String> { - let their_networking_key = signature::UnparsedPublicKey::new( - &signature::ED25519, - hex::decode(&strip_0x(&their_id.networking_key)) - .map_err(|_| "failed to decode networking key")?, - ); - - if !(their_networking_key - .verify( - // TODO use language-neutral serialization here too - &bincode::serialize(their_id).map_err(|_| "failed to serialize their identity")?, - &handshake.id_signature, - ) - .is_ok() - && their_networking_key - .verify( - &handshake.ephemeral_public_key, - &handshake.ephemeral_public_key_signature, - ) - .is_ok()) - { - // improper signatures on identity info, close connection - return Err("got improperly signed networking info".into()); - } - - match PublicKey::::from_sec1_bytes(&handshake.ephemeral_public_key) { - Ok(v) => return Ok(Arc::new(v)), - Err(_) => return Err("error".into()), - }; -} - -/// given an identity and networking key-pair, produces a handshake message along -/// with an ephemeral secret to be used in a specific connection. -fn make_secret_and_handshake( - our: &Identity, - keypair: Arc, - target: &str, - id: Option, -) -> (Arc>, Handshake) { - // produce ephemeral keys for DH exchange and subsequent symmetric encryption - let ephemeral_secret = Arc::new(EphemeralSecret::::random( - &mut rand::rngs::OsRng, - )); - let ephemeral_public_key = ephemeral_secret.public_key(); - // sign the ephemeral public key with our networking management key - let signed_pk = keypair - .sign(&ephemeral_public_key.to_sec1_bytes()) - .as_ref() - .to_vec(); - - // before signing our identity, convert router names to namehashes - // to match the exact onchain representation of our identity - let mut our_onchain_id = our.clone(); - our_onchain_id.allowed_routers = our - .allowed_routers - .clone() - .into_iter() - .map(|namehash| { - let hash = crate::namehash(&namehash); - let mut result = [0u8; 32]; - result.copy_from_slice(hash.as_bytes()); - format!("0x{}", hex::encode(result)) - }) - .collect(); - - // TODO use language-neutral serialization here too - let signed_id = keypair - .sign(&bincode::serialize(&our_onchain_id).unwrap()) - .as_ref() - .to_vec(); - - let handshake = Handshake { - id: id.unwrap_or(rand::random()), - from: our.name.clone(), - target: target.to_string(), - id_signature: signed_id, - ephemeral_public_key: ephemeral_public_key.to_sec1_bytes().to_vec(), - ephemeral_public_key_signature: signed_pk, - }; - - (ephemeral_secret, handshake) -} - -fn strip_0x(s: &str) -> String { - if s.starts_with("0x") { - s[2..].to_string() - } else { - s.to_string() + if !printout.is_empty() { + print_tx + .send(Printout { + verbosity: 0, + content: printout, + }) + .await?; + } + Ok(()) } } diff --git a/src/net/types.rs b/src/net/types.rs index 251c40f2..9d2ab882 100644 --- a/src/net/types.rs +++ b/src/net/types.rs @@ -1,71 +1,110 @@ use crate::types::*; -use anyhow::Result; -use elliptic_curve::ecdh::SharedSecret; -use ethers::prelude::k256::Secp256k1; +use futures::stream::{SplitSink, SplitStream}; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, sync::Arc}; use tokio::net::TcpStream; -use tokio::sync::{mpsc, RwLock}; -use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; +use tokio::sync::{mpsc::UnboundedSender, RwLock}; +use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; -pub type PeerKeys = Arc>)>>>; -pub type Peers = Arc>>; -pub type WebSocket = WebSocketStream>; -pub type MessageResult = Result; -pub type ErrorShuttle = mpsc::UnboundedSender; +/// Sent to a node when you want to connect directly to them. +/// Sent in the 'e, ee, s, es' and 's, se' phases of XX noise protocol pattern. +#[derive(Debug, Deserialize, Serialize)] +pub struct HandshakePayload { + pub protocol_version: u8, + pub name: NodeId, + // signature is created by their networking key, of their static key + // someone could reuse this signature, but then they will be unable + // to encrypt messages to us. + pub signature: Vec, + /// Set to true when you want them to act as a router for you, sending + /// messages from potentially many remote sources over this connection, + /// including from the router itself. + /// This is not relevant in a handshake sent from the receiver side. + pub proxy_request: bool, +} + +/// Sent to a node when you want them to connect you to an indirect node. +/// If the receiver of the request has an open connection to your target, +/// and is willing, they will send a message to the target prompting them +/// to build the other side of the connection, at which point they will +/// hold open a Passthrough for you two. +/// +/// Alternatively, if the receiver does not have an open connection but the +/// target is a direct node, they can create a Passthrough for you two if +/// they are willing to proxy for you. +/// +/// Sent in the 'e' phase of XX noise protocol pattern. +#[derive(Debug, Deserialize, Serialize)] +pub struct RoutingRequest { + pub protocol_version: u8, + pub source: NodeId, + // signature is created by their networking key, of the [target, router name].concat() + // someone could reuse this signature, and TODO need to make sure that's useless. + pub signature: Vec, + pub target: NodeId, +} + +pub enum Connection { + Peer(PeerConnection), + Passthrough(PassthroughConnection), + PendingPassthrough(PendingPassthroughConnection), +} + +pub struct PeerConnection { + pub noise: snow::TransportState, + pub buf: Vec, + pub write_stream: SplitSink>, tungstenite::Message>, + pub read_stream: SplitStream>>, +} + +pub struct PassthroughConnection { + pub write_stream_1: SplitSink>, tungstenite::Message>, + pub read_stream_1: SplitStream>>, + pub write_stream_2: SplitSink>, tungstenite::Message>, + pub read_stream_2: SplitStream>>, +} + +pub struct PendingPassthroughConnection { + pub target: NodeId, + pub write_stream: SplitSink>, tungstenite::Message>, + pub read_stream: SplitStream>>, +} + +// TODO upgrade from hashmaps +pub type Peers = HashMap>>; +pub type PKINames = HashMap; +pub type OnchainPKI = HashMap; +pub type PendingPassthroughs = HashMap<(NodeId, NodeId), PendingPassthroughConnection>; -/// stored in mapping by their username pub struct Peer { pub identity: Identity, - // send messages here to have them encrypted and sent across an active connection - pub sender: mpsc::UnboundedSender<(PeerMessage, Option)>, - // send encrypted messages from this peer here to have them decrypted and sent to kernel - pub decrypter: mpsc::UnboundedSender<(Vec, ErrorShuttle)>, - pub socket_tx: mpsc::UnboundedSender<(NetworkMessage, Option)>, -} - -/// parsed from Binary websocket message -/// TODO add a version number somewhere in the serialized format!! -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum NetworkMessage { - Ack(u64), - Nack(u64), - Msg { - id: u64, - from: String, - to: String, - contents: Vec, - }, - Handshake(Handshake), - HandshakeAck(Handshake), - // only used in implementation, not part of protocol - Ping, - Pong, -} - -pub enum PeerMessage { - Raw(KernelMessage), - Net(NetworkMessage), -} - -/// contains identity and encryption keys, used in initial handshake. -/// parsed from Text websocket message -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Handshake { - pub id: u64, - pub from: String, - pub target: String, - pub id_signature: Vec, - pub ephemeral_public_key: Vec, - pub ephemeral_public_key_signature: Vec, + /// If true, we are routing for them and have a RoutingClientConnection + /// associated with them. We can send them prompts to establish Passthroughs. + pub routing_for: bool, + pub sender: UnboundedSender, } +/// Must be parsed from message pack vector. #[derive(Clone, Debug, Serialize, Deserialize)] pub enum NetActions { + /// Received from a router of ours when they have a new pending passthrough for us. + /// We should respond (if we desire) by using them to initialize a routed connection + /// with the NodeId given. + ConnectionRequest(NodeId), + /// can only receive from trusted source, for now just ourselves locally, + /// in the future could get from remote provider QnsUpdate(QnsUpdate), QnsBatchUpdate(Vec), } +/// For now, only sent in response to a ConnectionRequest. +/// Must be parsed from message pack vector +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetResponses { + Accepted(NodeId), + Rejected(NodeId), +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct QnsUpdate { pub name: String, // actual username / domain name diff --git a/src/net/utils.rs b/src/net/utils.rs new file mode 100644 index 00000000..66384de3 --- /dev/null +++ b/src/net/utils.rs @@ -0,0 +1,409 @@ +use crate::net::{types::*, MESSAGE_MAX_SIZE, TIMEOUT}; +use crate::types::*; +use anyhow::{anyhow, Result}; +use futures::stream::{SplitSink, SplitStream}; +use futures::{SinkExt, StreamExt}; +use ring::signature::{self, Ed25519KeyPair}; +use snow::params::NoiseParams; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver}, + RwLock, +}; +use tokio::task::JoinSet; +use tokio::time::timeout; +use tokio_tungstenite::{connect_async, tungstenite, MaybeTlsStream, WebSocketStream}; + +lazy_static::lazy_static! { + static ref PARAMS: NoiseParams = "Noise_XX_25519_ChaChaPoly_BLAKE2s" + .parse() + .expect("net: couldn't build noise params?"); +} + +pub async fn save_new_peer( + identity: &Identity, + routing_for: bool, + peers: &mut Peers, + peer_connections: &mut JoinSet<(String, Option)>, + conn: PeerConnection, + km: Option, + kernel_message_tx: &MessageSender, + print_tx: &PrintSender, +) -> Result<()> { + let peer_name = identity.name.clone(); + let (peer_tx, peer_rx) = unbounded_channel::(); + if km.is_some() { + peer_tx.send(km.unwrap())? + } + let peer = Arc::new(RwLock::new(Peer { + identity: identity.clone(), + routing_for, + sender: peer_tx, + })); + peers.insert(peer_name, peer.clone()); + peer_connections.spawn(maintain_connection( + peer, + conn, + peer_rx, + kernel_message_tx.clone(), + print_tx.clone(), + )); + Ok(()) +} + +/// should always be spawned on its own task +pub async fn maintain_connection( + peer: Arc>, + mut conn: PeerConnection, + mut peer_rx: UnboundedReceiver, + kernel_message_tx: MessageSender, + print_tx: PrintSender, +) -> (NodeId, Option) { + let peer_name = peer.read().await.identity.name.clone(); + loop { + tokio::select! { + recv_result = recv_uqbar_message(&mut conn) => { + match recv_result { + Ok(km) => { + if km.source.node != peer_name { + let _ = print_tx.send(Printout { + verbosity: 0, + content: format!("net: got message with spoofed source from {peer_name}") + }).await; + return (peer_name, None) + } + kernel_message_tx.send(km).await.expect("net error: fatal: kernel died"); + } + Err(_) => { + return (peer_name, None) + } + } + }, + maybe_recv = peer_rx.recv() => { + match maybe_recv { + Some(km) => { + match send_uqbar_message(&km, &mut conn).await { + Ok(()) => continue, + Err(e) => { + if e.to_string() == "message too large" { + // this will result in a Timeout if the message + // requested a response, otherwise nothing. so, + // we should always print something to terminal + let _ = print_tx.send(Printout { + verbosity: 0, + content: format!( + "net: tried to send too-large message, limit is {:.2}mb", + MESSAGE_MAX_SIZE as f64 / 1_048_576.0 + ), + }).await; + return (peer_name, None) + } + return (peer_name, Some(km)) + } + } + } + None => return (peer_name, None), + } + }, + } + } +} + +/// cross the streams +pub async fn maintain_passthrough(mut conn: PassthroughConnection) { + loop { + tokio::select! { + maybe_recv = conn.read_stream_1.next() => { + match maybe_recv { + Some(Ok(msg)) => { + conn.write_stream_2.send(msg).await.expect("net error: fatal: kernel died"); + } + _ => return, + } + }, + maybe_recv = conn.read_stream_2.next() => { + match maybe_recv { + Some(Ok(msg)) => { + conn.write_stream_1.send(msg).await.expect("net error: fatal: kernel died"); + } + _ => return, + } + }, + } + } +} + +pub async fn create_passthrough( + our: &Identity, + our_ip: &str, + from_id: Identity, + to_name: NodeId, + pki: &OnchainPKI, + peers: &Peers, + pending_passthroughs: &mut PendingPassthroughs, + write_stream_1: SplitSink>, tungstenite::Message>, + read_stream_1: SplitStream>>, +) -> Result<(Identity, Connection)> { + // if the target has already generated a pending passthrough for this source, + // immediately match them + if let Some(pending) = pending_passthroughs.remove(&(to_name.clone(), from_id.name.clone())) { + return Ok(( + from_id, + Connection::Passthrough(PassthroughConnection { + write_stream_1, + read_stream_1, + write_stream_2: pending.write_stream, + read_stream_2: pending.read_stream, + }), + )); + } + let to_id = pki.get(&to_name).ok_or(anyhow!("unknown QNS name"))?; + let Some((ref ip, ref port)) = to_id.ws_routing else { + // create passthrough to indirect node that we do routing for + // + let target_peer = peers + .get(&to_name) + .ok_or(anyhow!("can't route to that indirect node"))?; + if !target_peer.read().await.routing_for { + return Err(anyhow!("we don't route for that indirect node")); + } + // send their net:sys:uqbar process a message, notifying it to create a *matching* + // passthrough request, which we can pair with this pending one. + target_peer.write().await.sender.send(KernelMessage { + id: rand::random(), + source: Address { + node: our.name.clone(), + process: ProcessId::from_str("net:sys:uqbar").unwrap(), + }, + target: Address { + node: to_name.clone(), + process: ProcessId::from_str("net:sys:uqbar").unwrap(), + }, + rsvp: None, + message: Message::Request(Request { + inherit: false, + expects_response: Some(5), + ipc: rmp_serde::to_vec(&NetActions::ConnectionRequest(from_id.name.clone()))?, + metadata: None, + }), + payload: None, + signed_capabilities: None, + })?; + + return Ok(( + from_id, + Connection::PendingPassthrough(PendingPassthroughConnection { + target: to_name, + write_stream: write_stream_1, + read_stream: read_stream_1, + }), + )); + }; + // create passthrough to direct node + // + let ws_url = make_ws_url(our_ip, ip, port)?; + let Ok(Ok((websocket, _response))) = timeout(TIMEOUT, connect_async(ws_url)).await else { + return Err(anyhow!("failed to connect to target")); + }; + let (write_stream_2, read_stream_2) = websocket.split(); + Ok(( + from_id, + Connection::Passthrough(PassthroughConnection { + write_stream_1, + read_stream_1, + write_stream_2, + read_stream_2, + }), + )) +} + +pub fn validate_routing_request( + our_name: &str, + buf: &[u8], + pki: &OnchainPKI, +) -> Result<(Identity, NodeId)> { + let routing_request: RoutingRequest = rmp_serde::from_slice(buf)?; + let their_id = pki + .get(&routing_request.source) + .ok_or(anyhow!("unknown QNS name"))?; + let their_networking_key = signature::UnparsedPublicKey::new( + &signature::ED25519, + hex::decode(&strip_0x(&their_id.networking_key))?, + ); + their_networking_key.verify( + &[&routing_request.target, our_name].concat().as_bytes(), + &routing_request.signature, + )?; + if routing_request.target == routing_request.source { + return Err(anyhow!("can't route to self")); + } + Ok((their_id.clone(), routing_request.target)) +} + +pub fn validate_handshake( + handshake: &HandshakePayload, + their_static_key: &[u8], + their_id: &Identity, +) -> Result<()> { + if handshake.protocol_version != 1 { + return Err(anyhow!("handshake protocol version mismatch")); + } + // verify their signature of their static key + let their_networking_key = signature::UnparsedPublicKey::new( + &signature::ED25519, + hex::decode(&strip_0x(&their_id.networking_key))?, + ); + their_networking_key.verify(their_static_key, &handshake.signature)?; + Ok(()) +} + +pub async fn send_uqbar_message(km: &KernelMessage, conn: &mut PeerConnection) -> Result<()> { + let serialized = rmp_serde::to_vec(km)?; + if serialized.len() > MESSAGE_MAX_SIZE as usize { + return Err(anyhow!("message too large")); + } + + let len = (serialized.len() as u32).to_be_bytes(); + let with_length_prefix = [len.to_vec(), serialized].concat(); + + // 65519 = 65535 - 16 (TAGLEN) + for payload in with_length_prefix.chunks(65519) { + let len = conn.noise.write_message(payload, &mut conn.buf)?; + conn.write_stream + .feed(tungstenite::Message::binary(&conn.buf[..len])) + .await?; + } + conn.write_stream.flush().await?; + Ok(()) +} + +/// any error in receiving a message will result in the connection being closed. +pub async fn recv_uqbar_message(conn: &mut PeerConnection) -> Result { + let outer_len = conn + .noise + .read_message(&ws_recv(&mut conn.read_stream).await?, &mut conn.buf)?; + if outer_len < 4 { + return Err(anyhow!("uqbar message too small!")); + } + + let length_bytes = [conn.buf[0], conn.buf[1], conn.buf[2], conn.buf[3]]; + let msg_len = u32::from_be_bytes(length_bytes); + if msg_len > MESSAGE_MAX_SIZE { + return Err(anyhow!("message too large")); + } + + let mut msg = Vec::with_capacity(msg_len as usize); + msg.extend_from_slice(&conn.buf[4..outer_len]); + + while msg.len() < msg_len as usize { + let len = conn + .noise + .read_message(&ws_recv(&mut conn.read_stream).await?, &mut conn.buf)?; + msg.extend_from_slice(&conn.buf[..len]); + } + + Ok(rmp_serde::from_slice(&msg)?) +} + +pub async fn send_uqbar_handshake( + our: &Identity, + keypair: &Ed25519KeyPair, + noise_static_key: &[u8], + noise: &mut snow::HandshakeState, + buf: &mut Vec, + write_stream: &mut SplitSink>, tungstenite::Message>, + proxy_request: bool, +) -> Result<()> { + let our_hs = rmp_serde::to_vec(&HandshakePayload { + protocol_version: 1, + name: our.name.clone(), + signature: keypair.sign(noise_static_key).as_ref().to_vec(), + proxy_request, + }) + .expect("failed to serialize handshake payload"); + + let len = noise.write_message(&our_hs, buf)?; + write_stream + .send(tungstenite::Message::binary(&buf[..len])) + .await?; + Ok(()) +} + +pub async fn recv_uqbar_handshake( + noise: &mut snow::HandshakeState, + buf: &mut Vec, + read_stream: &mut SplitStream>>, +) -> Result { + let len = noise.read_message(&ws_recv(read_stream).await?, buf)?; + Ok(rmp_serde::from_slice(&buf[..len])?) +} + +pub async fn ws_recv( + read_stream: &mut SplitStream>>, +) -> Result> { + let Some(Ok(tungstenite::Message::Binary(bin))) = read_stream.next().await else { + return Err(anyhow!("websocket closed")); + }; + Ok(bin) +} + +pub fn build_responder() -> (snow::HandshakeState, Vec) { + let builder: snow::Builder<'_> = snow::Builder::new(PARAMS.clone()); + let keypair = builder + .generate_keypair() + .expect("net: couldn't generate keypair?"); + ( + builder + .local_private_key(&keypair.private) + .build_responder() + .expect("net: couldn't build responder?"), + keypair.public, + ) +} + +pub fn build_initiator() -> (snow::HandshakeState, Vec) { + let builder: snow::Builder<'_> = snow::Builder::new(PARAMS.clone()); + let keypair = builder + .generate_keypair() + .expect("net: couldn't generate keypair?"); + ( + builder + .local_private_key(&keypair.private) + .build_initiator() + .expect("net: couldn't build responder?"), + keypair.public, + ) +} + +pub fn make_ws_url(our_ip: &str, ip: &str, port: &u16) -> Result { + // if we have the same public IP as target, route locally, + // otherwise they will appear offline due to loopback stuff + let ip = if our_ip == ip { "localhost" } else { ip }; + let url = url::Url::parse(&format!("ws://{}:{}/ws", ip, port))?; + Ok(url) +} + +pub async fn error_offline(km: KernelMessage, network_error_tx: &NetworkErrorSender) -> Result<()> { + network_error_tx + .send(WrappedSendError { + id: km.id, + source: km.source, + error: SendError { + kind: SendErrorKind::Offline, + target: km.target, + message: km.message, + payload: km.payload, + }, + }) + .await?; + Ok(()) +} + +fn strip_0x(s: &str) -> String { + if s.starts_with("0x") { + s[2..].to_string() + } else { + s.to_string() + } +} diff --git a/src/register.rs b/src/register.rs index 71f7a655..9b55fa68 100644 --- a/src/register.rs +++ b/src/register.rs @@ -256,14 +256,18 @@ async fn handle_info( // TODO: if IP is localhost, assign a router... let ws_port = http_server::find_open_port(9000).await.unwrap(); + // this is NOT our real identity. it's stuff we give to the frontend + // to match on let our = Identity { networking_key: format!("0x{}", public_key), name: String::new(), ws_routing: Some((ip.clone(), ws_port)), allowed_routers: vec![ - "uqbar-router-1.uq".into(), // "0x8d9e54427c50660c6d4802f63edca86a9ca5fd6a78070c4635950e9d149ed441".into(), - "uqbar-router-2.uq".into(), // "0x06d331ed65843ecf0860c73292005d8103af20820546b2f8f9007d01f60595b1".into(), - "uqbar-router-3.uq".into(), // "0xe6ab611eb62e8aee0460295667f8179cda4315982717db4b0b3da6022deecac1".into(), + "testnode101.uq".into(), + "testnode102.uq".into(), + // "uqbar-router-1.uq".into(), // "0x8d9e54427c50660c6d4802f63edca86a9ca5fd6a78070c4635950e9d149ed441".into(), + // "uqbar-router-2.uq".into(), // "0x06d331ed65843ecf0860c73292005d8103af20820546b2f8f9007d01f60595b1".into(), + // "uqbar-router-3.uq".into(), // "0xe6ab611eb62e8aee0460295667f8179cda4315982717db4b0b3da6022deecac1".into(), ], }; diff --git a/src/types.rs b/src/types.rs index 379f9b31..76105987 100644 --- a/src/types.rs +++ b/src/types.rs @@ -43,8 +43,6 @@ pub type CapMessageReceiver = tokio::sync::mpsc::Receiver; // types used for UQI: uqbar's identity system // pub type NodeId = String; -pub type PKINames = Arc>>; // TODO maybe U256 to String -pub type OnchainPKI = Arc>>; #[derive(Debug, Serialize, Deserialize)] pub struct Registration {