Merge pull request #43 from uqbar-dao/dr/networking-next

Networking rewrite to layer in Noise protocol and generally improve
This commit is contained in:
dr-frmr 2023-11-06 09:34:13 -06:00 committed by GitHub
commit 30100dc98c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1647 additions and 1561 deletions

286
Cargo.lock generated
View File

@ -36,6 +36,15 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aead"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b613b8e1e3cf911a086f53f03bf286f52fd7a7258e4fa606f0ef220d39d8877"
dependencies = [
"generic-array",
]
[[package]]
name = "aead"
version = "0.5.2"
@ -46,6 +55,18 @@ dependencies = [
"generic-array",
]
[[package]]
name = "aes"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8"
dependencies = [
"cfg-if",
"cipher 0.3.0",
"cpufeatures",
"opaque-debug",
]
[[package]]
name = "aes"
version = "0.8.3"
@ -53,21 +74,35 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac1f845298e95f983ff1944b728ae08b8cebab80d684f0a832ed0fc74dfa27e2"
dependencies = [
"cfg-if",
"cipher",
"cipher 0.4.4",
"cpufeatures",
]
[[package]]
name = "aes-gcm"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df5f85a83a7d8b0442b6aa7b504b8212c1733da07b98aae43d4bc21b2cb3cdf6"
dependencies = [
"aead 0.4.3",
"aes 0.7.5",
"cipher 0.3.0",
"ctr 0.8.0",
"ghash 0.4.4",
"subtle",
]
[[package]]
name = "aes-gcm"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "209b47e8954a928e1d72e86eca7000ebb6655fe1436d33eefc2201cad027e237"
dependencies = [
"aead",
"aes",
"cipher",
"ctr",
"ghash",
"aead 0.5.2",
"aes 0.8.3",
"cipher 0.4.4",
"ctr 0.9.2",
"ghash 0.5.0",
"subtle",
]
@ -154,17 +189,6 @@ dependencies = [
"term",
]
[[package]]
name = "async-recursion"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.32",
]
[[package]]
name = "async-trait"
version = "0.1.73"
@ -298,6 +322,15 @@ dependencies = [
"wyz",
]
[[package]]
name = "blake2"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe"
dependencies = [
"digest 0.10.7",
]
[[package]]
name = "blake3"
version = "1.4.1"
@ -499,6 +532,18 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chacha20"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c80e5460aa66fe3b91d40bcbdab953a597b60053e34d684ac6903f863b680a6"
dependencies = [
"cfg-if",
"cipher 0.3.0",
"cpufeatures",
"zeroize",
]
[[package]]
name = "chacha20"
version = "0.9.1"
@ -506,20 +551,33 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3613f74bd2eac03dad61bd53dbe620703d4371614fe0bc3b9f04dd36fe4e818"
dependencies = [
"cfg-if",
"cipher",
"cipher 0.4.4",
"cpufeatures",
]
[[package]]
name = "chacha20poly1305"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a18446b09be63d457bbec447509e85f662f32952b035ce892290396bc0b0cff5"
dependencies = [
"aead 0.4.3",
"chacha20 0.8.2",
"cipher 0.3.0",
"poly1305 0.7.2",
"zeroize",
]
[[package]]
name = "chacha20poly1305"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10cd79432192d1c0f4e1a0fef9527696cc039165d729fb41b3f4f4f354c2dc35"
dependencies = [
"aead",
"chacha20",
"cipher",
"poly1305",
"aead 0.5.2",
"chacha20 0.9.1",
"cipher 0.4.4",
"poly1305 0.8.0",
"zeroize",
]
@ -538,6 +596,15 @@ dependencies = [
"windows-targets",
]
[[package]]
name = "cipher"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7"
dependencies = [
"generic-array",
]
[[package]]
name = "cipher"
version = "0.4.4"
@ -549,17 +616,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "cita_trie"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8c3d2abadaa28e0d277f9f6d07a2052544f045d929cd4d6f7bcfb43567c9767"
dependencies = [
"hasher",
"parking_lot",
"rlp",
]
[[package]]
name = "coins-bip32"
version = "0.8.7"
@ -901,13 +957,49 @@ dependencies = [
"subtle",
]
[[package]]
name = "ctr"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "049bb91fb4aaf0e3c7efa6cd5ef877dbbbd15b39dad06d9948de4ec8a75761ea"
dependencies = [
"cipher 0.3.0",
]
[[package]]
name = "ctr"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835"
dependencies = [
"cipher",
"cipher 0.4.4",
]
[[package]]
name = "curve25519-dalek"
version = "4.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e89b8c6a2e4b1f45971ad09761aafb85514a84744b67a95e32c3cc1352d1f65c"
dependencies = [
"cfg-if",
"cpufeatures",
"curve25519-dalek-derive",
"fiat-crypto",
"platforms",
"rustc_version",
"subtle",
"zeroize",
]
[[package]]
name = "curve25519-dalek-derive"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83fdaf97f4804dcebfa5862639bc9ce4121e82140bec2a987ac5140294865b5b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.32",
]
[[package]]
@ -1277,8 +1369,8 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fda3bf123be441da5260717e0661c25a2fd9cb2b2c1d20bf2e05580047158ab"
dependencies = [
"aes",
"ctr",
"aes 0.8.3",
"ctr 0.9.2",
"digest 0.10.7",
"hex",
"hmac 0.12.1",
@ -1632,6 +1724,12 @@ dependencies = [
"subtle",
]
[[package]]
name = "fiat-crypto"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a481586acf778f1b1455424c343f71124b048ffa5f4fc3f8f6ae9dc432dcb3c7"
[[package]]
name = "file-per-thread-logger"
version = "0.2.0"
@ -1880,6 +1978,16 @@ dependencies = [
"wasi",
]
[[package]]
name = "ghash"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1583cc1656d7839fd3732b80cf4f38850336cdb9b8ded1cd399ca62958de3c99"
dependencies = [
"opaque-debug",
"polyval 0.5.3",
]
[[package]]
name = "ghash"
version = "0.5.0"
@ -1887,7 +1995,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d930750de5717d2dd0b8c0d42c076c0e884c81a73e6cab859bbd2339c71e3e40"
dependencies = [
"opaque-debug",
"polyval",
"polyval 0.6.1",
]
[[package]]
@ -1976,15 +2084,6 @@ version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
[[package]]
name = "hasher"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbbba678b6567f27ce22870d951f4208e1dc2fef64993bd4521b1d497ef8a3aa"
dependencies = [
"tiny-keccak",
]
[[package]]
name = "hashers"
version = "1.0.1"
@ -3205,6 +3304,23 @@ version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964"
[[package]]
name = "platforms"
version = "3.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4503fa043bf02cee09a9582e9554b4c6403b2ef55e4612e96561d294419429f8"
[[package]]
name = "poly1305"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "048aeb476be11a4b6ca432ca569e375810de9294ae78f4774e78ea98a9246ede"
dependencies = [
"cpufeatures",
"opaque-debug",
"universal-hash 0.4.1",
]
[[package]]
name = "poly1305"
version = "0.8.0"
@ -3213,7 +3329,19 @@ checksum = "8159bd90725d2df49889a078b54f4f79e87f1f8a8444194cdca81d38f5393abf"
dependencies = [
"cpufeatures",
"opaque-debug",
"universal-hash",
"universal-hash 0.5.1",
]
[[package]]
name = "polyval"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8419d2b623c7c0896ff2d5d96e2cb4ede590fed28fcc34934f4c33c036e620a1"
dependencies = [
"cfg-if",
"cpufeatures",
"opaque-debug",
"universal-hash 0.4.1",
]
[[package]]
@ -3225,7 +3353,7 @@ dependencies = [
"cfg-if",
"cpufeatures",
"opaque-debug",
"universal-hash",
"universal-hash 0.5.1",
]
[[package]]
@ -3600,6 +3728,28 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "rmp"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f9860a6cc38ed1da53456442089b4dfa35e7cedaa326df63017af88385e6b20"
dependencies = [
"byteorder",
"num-traits",
"paste",
]
[[package]]
name = "rmp-serde"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a"
dependencies = [
"byteorder",
"rmp",
"serde",
]
[[package]]
name = "route-recognizer"
version = "0.3.1"
@ -3825,7 +3975,7 @@ version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213"
dependencies = [
"cipher",
"cipher 0.4.4",
]
[[package]]
@ -4173,6 +4323,23 @@ version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9"
[[package]]
name = "snow"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c9d1425eb528a21de2755c75af4c9b5d57f50a0d4c3b7f1828a4cd03f8ba155"
dependencies = [
"aes-gcm 0.9.4",
"blake2",
"chacha20poly1305 0.9.1",
"curve25519-dalek",
"rand_core",
"ring",
"rustc_version",
"sha2 0.10.7",
"subtle",
]
[[package]]
name = "socket2"
version = "0.4.9"
@ -4814,6 +4981,16 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]]
name = "universal-hash"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f214e8f697e925001e66ec2c6e37a4ef93f0f78c2eed7814394e10c62025b05"
dependencies = [
"generic-array",
"subtle",
]
[[package]]
name = "universal-hash"
version = "0.5.1"
@ -4834,18 +5011,16 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
name = "uqbar"
version = "0.1.0"
dependencies = [
"aes-gcm",
"aes-gcm 0.10.2",
"anyhow",
"async-recursion",
"async-trait",
"base64 0.13.1",
"bincode",
"blake3",
"bytes",
"cap-std",
"chacha20poly1305",
"chacha20poly1305 0.10.1",
"chrono",
"cita_trie",
"crossterm",
"digest 0.10.7",
"dotenv",
@ -4856,7 +5031,6 @@ dependencies = [
"futures",
"generic-array",
"getrandom",
"hasher",
"hex",
"hkdf",
"hmac 0.12.1",
@ -4870,6 +5044,7 @@ dependencies = [
"rand",
"reqwest",
"ring",
"rmp-serde",
"route-recognizer",
"rsa",
"rusoto_core",
@ -4879,6 +5054,7 @@ dependencies = [
"serde_json",
"serde_urlencoded",
"sha2 0.10.7",
"snow",
"thiserror",
"tokio",
"tokio-tungstenite 0.20.0",
@ -5808,7 +5984,7 @@ version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "760394e246e4c28189f19d488c058bf16f564016aefac5d32bb1f3b51d5e9261"
dependencies = [
"aes",
"aes 0.8.3",
"byteorder",
"bzip2",
"constant_time_eq 0.1.5",

View File

@ -14,7 +14,6 @@ zip = "0.6"
[dependencies]
aes-gcm = "0.10.2"
anyhow = "1.0.71"
async-recursion = "1.0.4"
async-trait = "0.1.71"
base64 = "0.13"
bincode = "1.3.3"
@ -23,7 +22,6 @@ bytes = "1.4.0"
cap-std = "2.0.0"
chacha20poly1305 = "0.10.1"
chrono = "0.4.31"
cita_trie = "4.0.0"
crossterm = { version = "0.26.1", features = ["event-stream", "bracketed-paste"] }
digest = "0.10"
dotenv = "0.15.0"
@ -34,7 +32,6 @@ flate2 = "1.0"
futures = "0.3"
generic-array = "0.14"
getrandom = "0.2.10"
hasher = "*"
hex = "0.4.3"
hkdf = "0.12.3"
hmac = "0.12"
@ -48,6 +45,7 @@ public-ip = "0.2.2"
rand = "0.8.4"
reqwest = "0.11.18"
ring = "0.16.20"
rmp-serde = "1.1.2"
route-recognizer = "0.3.1"
rsa = "0.9"
rusoto_core = "0.48.0"
@ -57,6 +55,7 @@ serde = {version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_urlencoded = "0.7"
sha2 = "0.10"
snow = { version = "0.9.3", features = ["ring-resolver"] }
thiserror = "1.0.43"
tokio = { version = "1.28", features = ["fs", "macros", "rt-multi-thread", "sync"] }
tokio-tungstenite = "*"

View File

@ -63,4 +63,8 @@ On boot you will be prompted to navigate to `localhost:8080`. Make sure your eth
## Example usage
TODO
Download and install an app:
```
!message our main:app_store:uqbar {"Download": {"package": {"package_name": "<pkg>", "publisher_node": "<node>"}, "install_from": "<node>"}}
!message our main:app_store:uqbar {"Install": {"package_name": "<pkg>", "publisher_node": "<node>"}}
```

View File

@ -453,6 +453,7 @@ dependencies = [
"bincode",
"cargo-component-bindings",
"hex",
"rmp-serde",
"serde",
"serde_json",
"thiserror",
@ -528,6 +529,28 @@ version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "rmp"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f9860a6cc38ed1da53456442089b4dfa35e7cedaa326df63017af88385e6b20"
dependencies = [
"byteorder",
"num-traits",
"paste",
]
[[package]]
name = "rmp-serde"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a"
dependencies = [
"byteorder",
"rmp",
"serde",
]
[[package]]
name = "ruint"
version = "1.10.1"

View File

@ -12,6 +12,7 @@ lto = true
[dependencies]
cargo-component-bindings = { git = "https://github.com/bytecodealliance/cargo-component" }
rmp-serde = "1.1.2"
serde_json = "1.0"
serde = {version = "1.0", features = ["derive"] }
wit-bindgen = { version = "0.11.0", default_features = false }

View File

@ -129,7 +129,7 @@ impl UqProcess for Component {
inherit: false,
expects_response: None,
metadata: None,
ipc: serde_json::to_vec(&NetActions::QnsBatchUpdate(
ipc: rmp_serde::to_vec(&NetActions::QnsBatchUpdate(
state.nodes.values().cloned().collect::<Vec<_>>(),
))
.unwrap(),
@ -326,7 +326,7 @@ impl UqProcess for Component {
inherit: false,
expects_response: None,
metadata: None,
ipc: serde_json::to_vec(&NetActions::QnsUpdate(update.clone()))
ipc: rmp_serde::to_vec(&NetActions::QnsUpdate(update.clone()))
.unwrap(),
},
None,

View File

@ -1,6 +1,5 @@
use anyhow::Result;
use dotenv;
use ethers::prelude::namehash;
use std::env;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
@ -37,6 +36,11 @@ const CAP_CHANNEL_CAPACITY: usize = 1_000;
const VERSION: &str = env!("CARGO_PKG_VERSION");
/// This can and should be an environment variable / setting. It configures networking
/// such that indirect nodes always use routers, even when target is a direct node,
/// such that only their routers can ever see their physical networking details.
const REVEAL_IP: bool = true;
#[tokio::main]
async fn main() {
// For use with https://github.com/tokio-rs/console
@ -265,6 +269,7 @@ async fn main() {
print_sender.clone(),
net_message_sender,
net_message_receiver,
REVEAL_IP,
));
tasks.spawn(filesystem::fs_sender(
our.name.clone(),
@ -322,10 +327,10 @@ async fn main() {
"\x1b[38;5;196muh oh, a kernel process crashed: {}\x1b[0m",
e
)
// TODO restart the task
// TODO restart the task?
} else {
format!("what does this mean???")
// TODO restart the task
// TODO restart the task?
}
}
quit = terminal::terminal(

View File

@ -1,599 +0,0 @@
use crate::net::*;
use chacha20poly1305::{
aead::{Aead, AeadCore, KeyInit, OsRng},
XChaCha20Poly1305, XNonce,
};
use elliptic_curve::ecdh::SharedSecret;
use futures::{SinkExt, StreamExt};
use ring::signature::Ed25519KeyPair;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;
use tokio_tungstenite::tungstenite;
pub async fn build_connection(
our: Identity,
keypair: Arc<Ed25519KeyPair>,
pki: OnchainPKI,
keys: PeerKeys,
peers: Peers,
websocket: WebSocket,
kernel_message_tx: MessageSender,
net_message_tx: MessageSender,
network_error_tx: NetworkErrorSender,
with: Option<String>,
) -> (
UnboundedSender<(NetworkMessage, Option<ErrorShuttle>)>,
JoinHandle<Option<String>>,
) {
// println!("building new connection\r");
let (message_tx, message_rx) = unbounded_channel::<(NetworkMessage, Option<ErrorShuttle>)>();
let handle = tokio::spawn(maintain_connection(
our,
with,
keypair,
pki,
keys,
peers,
websocket,
message_tx.clone(),
message_rx,
kernel_message_tx,
net_message_tx,
network_error_tx,
));
return (message_tx, handle);
}
/// Keeps a connection alive and handles sending and receiving of NetworkMessages through it.
/// TODO add a keepalive PING/PONG system
/// TODO kill this after a certain amount of inactivity
pub async fn maintain_connection(
our: Identity,
with: Option<String>,
keypair: Arc<Ed25519KeyPair>,
pki: OnchainPKI,
keys: PeerKeys,
peers: Peers,
websocket: WebSocket,
message_tx: UnboundedSender<(NetworkMessage, Option<ErrorShuttle>)>,
mut message_rx: UnboundedReceiver<(NetworkMessage, Option<ErrorShuttle>)>,
kernel_message_tx: MessageSender,
net_message_tx: MessageSender,
network_error_tx: NetworkErrorSender,
) -> Option<String> {
// let conn_id: u64 = rand::random();
// println!("maintaining connection {conn_id}\r");
// accept messages on the websocket in one task, and send messages in another
let (mut write_stream, mut read_stream) = websocket.split();
let (forwarding_ack_tx, mut forwarding_ack_rx) = unbounded_channel::<MessageResult>();
// manage outstanding ACKs from messages sent over the connection
// TODO replace with more performant data structure
let ack_map = Arc::new(RwLock::new(HashMap::<u64, ErrorShuttle>::new()));
let sender_ack_map = ack_map.clone();
let last_pong = Arc::new(RwLock::new(tokio::time::Instant::now()));
let ping_last_pong = last_pong.clone();
let ping_tx = message_tx.clone();
// Ping/Pong keepalive task
let ping_task = tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
if ping_last_pong.read().await.elapsed() > tokio::time::Duration::from_secs(60) {
break;
}
if let Err(_) = ping_tx.send((NetworkMessage::Ping, None)) {
// Failed to send Ping, kill the connection
break;
}
}
});
let forwarder_message_tx = message_tx.clone();
let ack_forwarder = tokio::spawn(async move {
while let Some(result) = forwarding_ack_rx.recv().await {
match result {
Ok(NetworkMessage::Ack(id)) => {
// println!("net: got forwarding ack for message {}\r", id);
forwarder_message_tx
.send((NetworkMessage::Ack(id), None))
.unwrap();
}
Ok(NetworkMessage::Nack(id)) => {
// println!("net: got forwarding nack for message {}\r", id);
forwarder_message_tx
.send((NetworkMessage::Nack(id), None))
.unwrap();
}
Ok(NetworkMessage::HandshakeAck(handshake)) => {
// println!(
// "net: got forwarding handshakeAck for message {}\r",
// handshake.id
// );
forwarder_message_tx
.send((NetworkMessage::HandshakeAck(handshake), None))
.unwrap();
}
Err((message_id, _e)) => {
// println!("net: got forwarding error from ack_rx: {:?}\r", e);
// what do we do here?
forwarder_message_tx
.send((NetworkMessage::Nack(message_id), None))
.unwrap();
}
_ => {
// println!("net: weird none ack\r");
}
}
}
});
// receive messages from over the websocket and route them to the correct peer handler,
// or create it, if necessary.
let ws_receiver = tokio::spawn(async move {
while let Some(incoming) = read_stream.next().await {
let Ok(tungstenite::Message::Binary(bin)) = incoming else {
if let Ok(tungstenite::Message::Ping(_)) = incoming {
// let _ = write_stream.send(tungstenite::Message::Pong(vec![])).await;
}
continue;
};
// TODO use a language-netural serialization format here!
let Ok(net_message) = bincode::deserialize::<NetworkMessage>(&bin) else {
// just kill the connection if we get a non-Uqbar message
break;
};
match net_message {
NetworkMessage::Pong => {
*last_pong.write().await = tokio::time::Instant::now();
continue;
}
NetworkMessage::Ping => {
// respond with a Pong
let _ = message_tx.send((NetworkMessage::Pong, None));
continue;
}
NetworkMessage::Ack(id) => {
let Some(result_tx) = ack_map.write().await.remove(&id) else {
// println!("conn {conn_id}: got unexpected Ack {id}\r");
continue;
};
// println!("conn {conn_id}: got Ack {id}\r");
let _ = result_tx.send(Ok(net_message));
continue;
}
NetworkMessage::Nack(id) => {
let Some(result_tx) = ack_map.write().await.remove(&id) else {
// println!("net: got unexpected Nack\r");
continue;
};
let _ = result_tx.send(Ok(net_message));
continue;
}
NetworkMessage::Msg {
ref id,
ref from,
ref to,
ref contents,
} => {
// println!("conn {conn_id}: handling msg {id}\r");
// if the message is *directed to us*, try to handle with the
// matching peer handler "decrypter".
//
if to == &our.name {
// if we have the peer, send the message to them.
if let Some(peer) = peers.read().await.get(from) {
let _ = peer
.decrypter
.send((contents.to_owned(), forwarding_ack_tx.clone()));
continue;
}
// if we don't have the peer, see if we have the keys to create them.
// if we don't have their keys, throw a nack.
if let Some((peer_id, secret)) = keys.read().await.get(from) {
let new_peer = create_new_peer(
our.clone(),
peer_id.clone(),
peers.clone(),
keys.clone(),
secret.clone(),
message_tx.clone(),
kernel_message_tx.clone(),
net_message_tx.clone(),
network_error_tx.clone(),
);
let _ = new_peer
.decrypter
.send((contents.to_owned(), forwarding_ack_tx.clone()));
peers.write().await.insert(peer_id.name.clone(), new_peer);
} else {
// println!("net: nacking message {id}\r");
message_tx.send((NetworkMessage::Nack(*id), None)).unwrap();
}
} else {
// if the message is *directed to someone else*, try to handle
// with the matching peer handler "sender".
//
if let Some(peer) = peers.read().await.get(to) {
let id = *id;
let to = to.clone();
match peer.sender.send((
PeerMessage::Net(net_message),
Some(forwarding_ack_tx.clone()),
)) {
Ok(_) => {}
Err(_) => {
peers.write().await.remove(&to);
message_tx.send((NetworkMessage::Nack(id), None)).unwrap();
}
}
} else {
// if we don't have the peer, throw a nack.
// println!("net: nacking message with id {id}\r");
message_tx.send((NetworkMessage::Nack(*id), None)).unwrap();
}
}
}
NetworkMessage::Handshake(ref handshake) => {
// when we get a handshake, if we are the target,
// 1. verify it against the PKI
// 2. send a response handshakeAck
// 3. create a Peer and save, replacing old one if it existed
// as long as we are the target, we also get to kill this connection
// if the handshake is invalid, since it must be directly "to" us.
if handshake.target == our.name {
let Some(peer_id) = pki.read().await.get(&handshake.from).cloned() else {
// println!(
// "net: failed handshake with unknown node {}\r",
// handshake.from
// );
message_tx
.send((NetworkMessage::Nack(handshake.id), None))
.unwrap();
break;
};
let their_ephemeral_pk = match validate_handshake(&handshake, &peer_id) {
Ok(pk) => pk,
Err(e) => {
println!("net: invalid handshake from {}: {}\r", handshake.from, e);
message_tx
.send((NetworkMessage::Nack(handshake.id), None))
.unwrap();
break;
}
};
let (secret, handshake) = make_secret_and_handshake(
&our,
keypair.clone(),
&handshake.from,
Some(handshake.id),
);
message_tx
.send((NetworkMessage::HandshakeAck(handshake), None))
.unwrap();
let secret = Arc::new(secret.diffie_hellman(&their_ephemeral_pk));
// save the handshake to our Keys map
keys.write()
.await
.insert(peer_id.name.clone(), (peer_id.clone(), secret.clone()));
let new_peer = create_new_peer(
our.clone(),
peer_id.clone(),
peers.clone(),
keys.clone(),
secret,
message_tx.clone(),
kernel_message_tx.clone(),
net_message_tx.clone(),
network_error_tx.clone(),
);
// we might be replacing an old peer, so we need to remove it first
// we can't rely on the hashmap for this, because the dropped peer
// will trigger a drop of the sender, which will kill the peer_handler
peers.write().await.remove(&peer_id.name);
peers.write().await.insert(peer_id.name.clone(), new_peer);
} else {
// if we are NOT the target,
// try to send it to the matching peer handler "sender"
if let Some(peer) = peers.read().await.get(&handshake.target) {
let _ = peer.sender.send((
PeerMessage::Net(net_message),
Some(forwarding_ack_tx.clone()),
));
} else {
// if we don't have the peer, throw a nack.
// println!("net: nacking handshake with id {}\r", handshake.id);
message_tx
.send((NetworkMessage::Nack(handshake.id), None))
.unwrap();
}
}
}
NetworkMessage::HandshakeAck(ref handshake) => {
let Some(result_tx) = ack_map.write().await.remove(&handshake.id) else {
continue;
};
let _ = result_tx.send(Ok(net_message));
}
}
}
});
tokio::select! {
_ = ws_receiver => {
// println!("ws_receiver died\r");
},
_ = ack_forwarder => {
// println!("ack_forwarder died\r");
},
_ = ping_task => {
// println!("ping_task died\r");
},
// receive messages we would like to send to peers along this connection
// and send them to the websocket
_ = async {
while let Some((message, result_tx)) = message_rx.recv().await {
// TODO use a language-netural serialization format here!
if let Ok(bytes) = bincode::serialize::<NetworkMessage>(&message) {
match &message {
NetworkMessage::Msg { id, .. } => {
// println!("conn {conn_id}: piping msg {id}\r");
sender_ack_map.write().await.insert(*id, result_tx.unwrap());
}
NetworkMessage::Handshake(h) => {
sender_ack_map.write().await.insert(h.id, result_tx.unwrap());
}
_ => {}
}
match write_stream.send(tungstenite::Message::Binary(bytes)).await {
Ok(()) => {}
Err(e) => {
// println!("net: send error: {:?}\r", e);
let id = match &message {
NetworkMessage::Msg { id, .. } => id,
NetworkMessage::Handshake(h) => &h.id,
_ => continue,
};
let Some(result_tx) = sender_ack_map.write().await.remove(&id) else {
continue;
};
// TODO learn how to handle other non-fatal websocket errors.
match e {
tungstenite::error::Error::Capacity(_)
| tungstenite::Error::Io(_) => {
let _ = result_tx.send(Err((*id, SendErrorKind::Timeout)));
}
_ => {
let _ = result_tx.send(Ok(NetworkMessage::Nack(*id)));
}
}
}
}
}
}
} => {
// println!("ws_sender died\r");
},
};
return with;
}
/// After a successful handshake, use information to spawn a new `peer_handler` task
/// and save a `Peer` in our peers mapping. Returns a sender to use for sending messages
/// to this peer, which will also be saved in its Peer struct.
pub fn create_new_peer(
our: Identity,
new_peer_id: Identity,
peers: Peers,
keys: PeerKeys,
secret: Arc<SharedSecret<Secp256k1>>,
conn_sender: UnboundedSender<(NetworkMessage, Option<ErrorShuttle>)>,
kernel_message_tx: MessageSender,
net_message_tx: MessageSender,
network_error_tx: NetworkErrorSender,
) -> Peer {
let (message_tx, message_rx) = unbounded_channel::<(PeerMessage, Option<ErrorShuttle>)>();
let (decrypter_tx, decrypter_rx) = unbounded_channel::<(Vec<u8>, ErrorShuttle)>();
let peer_id_name = new_peer_id.name.clone();
let peer_conn_sender = conn_sender.clone();
tokio::spawn(async move {
match peer_handler(
our,
peer_id_name.clone(),
secret,
message_rx,
decrypter_rx,
peer_conn_sender,
kernel_message_tx,
network_error_tx,
)
.await
{
None => {
// println!("net: dropping peer handler but not deleting\r");
}
Some(km) => {
// println!("net: ok actually deleting peer+keys now and retrying send\r");
peers.write().await.remove(&peer_id_name);
keys.write().await.remove(&peer_id_name);
let _ = net_message_tx.send(km).await;
}
}
});
return Peer {
identity: new_peer_id,
sender: message_tx,
decrypter: decrypter_tx,
socket_tx: conn_sender,
};
}
/// 1. take in messages from a specific peer, decrypt them, and send to kernel
/// 2. take in messages targeted at specific peer and either:
/// - encrypt them, and send to proper connection
/// - forward them untouched along the connection
async fn peer_handler(
our: Identity,
who: String,
secret: Arc<SharedSecret<Secp256k1>>,
mut message_rx: UnboundedReceiver<(PeerMessage, Option<ErrorShuttle>)>,
mut decrypter_rx: UnboundedReceiver<(Vec<u8>, ErrorShuttle)>,
socket_tx: UnboundedSender<(NetworkMessage, Option<ErrorShuttle>)>,
kernel_message_tx: MessageSender,
network_error_tx: NetworkErrorSender,
) -> Option<KernelMessage> {
// println!("peer_handler\r");
let mut key = [0u8; 32];
secret
.extract::<sha2::Sha256>(None)
.expand(&[], &mut key)
.unwrap();
let cipher = XChaCha20Poly1305::new(generic_array::GenericArray::from_slice(&key));
let (ack_tx, mut ack_rx) = unbounded_channel::<MessageResult>();
// TODO use a more efficient data structure
let ack_map = Arc::new(RwLock::new(HashMap::<u64, KernelMessage>::new()));
let recv_ack_map = ack_map.clone();
tokio::select! {
//
// take in messages from a specific peer, decrypt them, and send to kernel
//
_ = async {
while let Some((encrypted_bytes, result_tx)) = decrypter_rx.recv().await {
let nonce = XNonce::from_slice(&encrypted_bytes[..24]);
if let Ok(decrypted) = cipher.decrypt(&nonce, &encrypted_bytes[24..]) {
if let Ok(message) = bincode::deserialize::<KernelMessage>(&decrypted) {
if message.source.node == who {
// println!("net: got peer message {}, acking\r", message.id);
let _ = result_tx.send(Ok(NetworkMessage::Ack(message.id)));
let _ = kernel_message_tx.send(message).await;
continue;
}
println!("net: got message 'from' wrong person! cheater/liar!\r");
break;
}
println!("net: failed to deserialize message from {}\r", who);
break;
}
println!("net: failed to decrypt message from {}, could be spoofer\r", who);
break;
}
} => {
// println!("net: lost peer {who}\r");
return None
}
//
// take in messages targeted at specific peer and either:
// - encrypt them, and send to proper connection
// - forward them untouched along the connection
//
_ = async {
// if we get a result_tx, rather than track it here, let a different
// part of the code handle whatever comes back from the socket.
while let Some((message, maybe_result_tx)) = message_rx.recv().await {
// if message is raw, we should encrypt.
// otherwise, simply send
match message {
PeerMessage::Raw(message) => {
let id = message.id;
if let Ok(bytes) = bincode::serialize::<KernelMessage>(&message) {
// generating a random nonce for each message.
// this isn't really as secure as we could get: should
// add a counter and then throw away the key when we hit a
// certain # of messages. TODO.
let nonce = XChaCha20Poly1305::generate_nonce(&mut OsRng);
if let Ok(encrypted) = cipher.encrypt(&nonce, bytes.as_ref()) {
if maybe_result_tx.is_none() {
ack_map.write().await.insert(id, message);
}
match socket_tx.send((
NetworkMessage::Msg {
from: our.name.clone(),
to: who.clone(),
id: id,
contents: [nonce.to_vec(), encrypted].concat(),
},
Some(maybe_result_tx.unwrap_or(ack_tx.clone())),
)) {
Ok(()) => tokio::task::yield_now().await,
Err(tokio::sync::mpsc::error::SendError((_, result_tx))) => {
// println!("net: lost socket with {who}\r");
let _ = result_tx.unwrap().send(Ok(NetworkMessage::Nack(id)));
},
}
}
}
}
PeerMessage::Net(net_message) => {
match socket_tx.send((net_message, Some(maybe_result_tx.unwrap_or(ack_tx.clone())))) {
Ok(()) => continue,
Err(tokio::sync::mpsc::error::SendError((net_message, result_tx))) => {
// println!("net: lost *forwarding* socket with {who}\r");
let id = match net_message {
NetworkMessage::Msg { id, .. } => id,
NetworkMessage::Handshake(h) => h.id,
_ => continue,
};
let _ = result_tx.unwrap().send(Ok(NetworkMessage::Nack(id)));
break;
},
}
}
}
}
} => return None,
//
// receive acks and nacks from our socket
// throw away acks, but kill this peer and retry the send on nacks.
//
maybe_km = async {
while let Some(result) = ack_rx.recv().await {
match result {
Ok(NetworkMessage::Ack(id)) => {
// println!("net: got ack for message {}\r", id);
recv_ack_map.write().await.remove(&id);
continue;
}
Ok(NetworkMessage::Nack(id)) => {
// println!("net: got nack for message {}\r", id);
let Some(km) = recv_ack_map.write().await.remove(&id) else {
continue;
};
// when we get a Nack, **delete this peer** and try to send the message again!
return Some(km)
}
Err((message_id, e)) => {
// println!("net: got error from ack_rx: {:?}\r", e);
// in practice this is always a timeout in current implementation
let Some(km) = recv_ack_map.write().await.remove(&message_id) else {
continue;
};
let _ = network_error_tx
.send(WrappedSendError {
id: km.id,
source: km.source,
error: SendError {
kind: e,
target: km.target,
message: km.message,
payload: km.payload,
},
})
.await;
return None
}
_ => {
// println!("net: weird none ack\r");
return None
}
}
}
return None;
} => {
// println!("net: exiting peer due to nackage\r");
return maybe_km
},
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,71 +1,110 @@
use crate::types::*;
use anyhow::Result;
use elliptic_curve::ecdh::SharedSecret;
use ethers::prelude::k256::Secp256k1;
use futures::stream::{SplitSink, SplitStream};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
use tokio::net::TcpStream;
use tokio::sync::{mpsc, RwLock};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tokio::sync::{mpsc::UnboundedSender, RwLock};
use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream};
pub type PeerKeys = Arc<RwLock<HashMap<String, (Identity, Arc<SharedSecret<Secp256k1>>)>>>;
pub type Peers = Arc<RwLock<HashMap<String, Peer>>>;
pub type WebSocket = WebSocketStream<MaybeTlsStream<TcpStream>>;
pub type MessageResult = Result<NetworkMessage, (u64, SendErrorKind)>;
pub type ErrorShuttle = mpsc::UnboundedSender<MessageResult>;
/// Sent to a node when you want to connect directly to them.
/// Sent in the 'e, ee, s, es' and 's, se' phases of XX noise protocol pattern.
#[derive(Debug, Deserialize, Serialize)]
pub struct HandshakePayload {
pub protocol_version: u8,
pub name: NodeId,
// signature is created by their networking key, of their static key
// someone could reuse this signature, but then they will be unable
// to encrypt messages to us.
pub signature: Vec<u8>,
/// Set to true when you want them to act as a router for you, sending
/// messages from potentially many remote sources over this connection,
/// including from the router itself.
/// This is not relevant in a handshake sent from the receiver side.
pub proxy_request: bool,
}
/// Sent to a node when you want them to connect you to an indirect node.
/// If the receiver of the request has an open connection to your target,
/// and is willing, they will send a message to the target prompting them
/// to build the other side of the connection, at which point they will
/// hold open a Passthrough for you two.
///
/// Alternatively, if the receiver does not have an open connection but the
/// target is a direct node, they can create a Passthrough for you two if
/// they are willing to proxy for you.
///
/// Sent in the 'e' phase of XX noise protocol pattern.
#[derive(Debug, Deserialize, Serialize)]
pub struct RoutingRequest {
pub protocol_version: u8,
pub source: NodeId,
// signature is created by their networking key, of the [target, router name].concat()
// someone could reuse this signature, and TODO need to make sure that's useless.
pub signature: Vec<u8>,
pub target: NodeId,
}
pub enum Connection {
Peer(PeerConnection),
Passthrough(PassthroughConnection),
PendingPassthrough(PendingPassthroughConnection),
}
pub struct PeerConnection {
pub noise: snow::TransportState,
pub buf: Vec<u8>,
pub write_stream: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
pub read_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
}
pub struct PassthroughConnection {
pub write_stream_1: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
pub read_stream_1: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
pub write_stream_2: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
pub read_stream_2: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
}
pub struct PendingPassthroughConnection {
pub target: NodeId,
pub write_stream: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
pub read_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
}
// TODO upgrade from hashmaps
pub type Peers = HashMap<String, Arc<RwLock<Peer>>>;
pub type PKINames = HashMap<String, NodeId>;
pub type OnchainPKI = HashMap<String, Identity>;
pub type PendingPassthroughs = HashMap<(NodeId, NodeId), PendingPassthroughConnection>;
/// stored in mapping by their username
pub struct Peer {
pub identity: Identity,
// send messages here to have them encrypted and sent across an active connection
pub sender: mpsc::UnboundedSender<(PeerMessage, Option<ErrorShuttle>)>,
// send encrypted messages from this peer here to have them decrypted and sent to kernel
pub decrypter: mpsc::UnboundedSender<(Vec<u8>, ErrorShuttle)>,
pub socket_tx: mpsc::UnboundedSender<(NetworkMessage, Option<ErrorShuttle>)>,
}
/// parsed from Binary websocket message
/// TODO add a version number somewhere in the serialized format!!
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NetworkMessage {
Ack(u64),
Nack(u64),
Msg {
id: u64,
from: String,
to: String,
contents: Vec<u8>,
},
Handshake(Handshake),
HandshakeAck(Handshake),
// only used in implementation, not part of protocol
Ping,
Pong,
}
pub enum PeerMessage {
Raw(KernelMessage),
Net(NetworkMessage),
}
/// contains identity and encryption keys, used in initial handshake.
/// parsed from Text websocket message
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Handshake {
pub id: u64,
pub from: String,
pub target: String,
pub id_signature: Vec<u8>,
pub ephemeral_public_key: Vec<u8>,
pub ephemeral_public_key_signature: Vec<u8>,
/// If true, we are routing for them and have a RoutingClientConnection
/// associated with them. We can send them prompts to establish Passthroughs.
pub routing_for: bool,
pub sender: UnboundedSender<KernelMessage>,
}
/// Must be parsed from message pack vector.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NetActions {
/// Received from a router of ours when they have a new pending passthrough for us.
/// We should respond (if we desire) by using them to initialize a routed connection
/// with the NodeId given.
ConnectionRequest(NodeId),
/// can only receive from trusted source, for now just ourselves locally,
/// in the future could get from remote provider
QnsUpdate(QnsUpdate),
QnsBatchUpdate(Vec<QnsUpdate>),
}
/// For now, only sent in response to a ConnectionRequest.
/// Must be parsed from message pack vector
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NetResponses {
Accepted(NodeId),
Rejected(NodeId),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct QnsUpdate {
pub name: String, // actual username / domain name

409
src/net/utils.rs Normal file
View File

@ -0,0 +1,409 @@
use crate::net::{types::*, MESSAGE_MAX_SIZE, TIMEOUT};
use crate::types::*;
use anyhow::{anyhow, Result};
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use ring::signature::{self, Ed25519KeyPair};
use snow::params::NoiseParams;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver},
RwLock,
};
use tokio::task::JoinSet;
use tokio::time::timeout;
use tokio_tungstenite::{connect_async, tungstenite, MaybeTlsStream, WebSocketStream};
lazy_static::lazy_static! {
static ref PARAMS: NoiseParams = "Noise_XX_25519_ChaChaPoly_BLAKE2s"
.parse()
.expect("net: couldn't build noise params?");
}
pub async fn save_new_peer(
identity: &Identity,
routing_for: bool,
peers: &mut Peers,
peer_connections: &mut JoinSet<(String, Option<KernelMessage>)>,
conn: PeerConnection,
km: Option<KernelMessage>,
kernel_message_tx: &MessageSender,
print_tx: &PrintSender,
) -> Result<()> {
let peer_name = identity.name.clone();
let (peer_tx, peer_rx) = unbounded_channel::<KernelMessage>();
if km.is_some() {
peer_tx.send(km.unwrap())?
}
let peer = Arc::new(RwLock::new(Peer {
identity: identity.clone(),
routing_for,
sender: peer_tx,
}));
peers.insert(peer_name, peer.clone());
peer_connections.spawn(maintain_connection(
peer,
conn,
peer_rx,
kernel_message_tx.clone(),
print_tx.clone(),
));
Ok(())
}
/// should always be spawned on its own task
pub async fn maintain_connection(
peer: Arc<RwLock<Peer>>,
mut conn: PeerConnection,
mut peer_rx: UnboundedReceiver<KernelMessage>,
kernel_message_tx: MessageSender,
print_tx: PrintSender,
) -> (NodeId, Option<KernelMessage>) {
let peer_name = peer.read().await.identity.name.clone();
loop {
tokio::select! {
recv_result = recv_uqbar_message(&mut conn) => {
match recv_result {
Ok(km) => {
if km.source.node != peer_name {
let _ = print_tx.send(Printout {
verbosity: 0,
content: format!("net: got message with spoofed source from {peer_name}")
}).await;
return (peer_name, None)
}
kernel_message_tx.send(km).await.expect("net error: fatal: kernel died");
}
Err(_) => {
return (peer_name, None)
}
}
},
maybe_recv = peer_rx.recv() => {
match maybe_recv {
Some(km) => {
match send_uqbar_message(&km, &mut conn).await {
Ok(()) => continue,
Err(e) => {
if e.to_string() == "message too large" {
// this will result in a Timeout if the message
// requested a response, otherwise nothing. so,
// we should always print something to terminal
let _ = print_tx.send(Printout {
verbosity: 0,
content: format!(
"net: tried to send too-large message, limit is {:.2}mb",
MESSAGE_MAX_SIZE as f64 / 1_048_576.0
),
}).await;
return (peer_name, None)
}
return (peer_name, Some(km))
}
}
}
None => return (peer_name, None),
}
},
}
}
}
/// cross the streams
pub async fn maintain_passthrough(mut conn: PassthroughConnection) {
loop {
tokio::select! {
maybe_recv = conn.read_stream_1.next() => {
match maybe_recv {
Some(Ok(msg)) => {
conn.write_stream_2.send(msg).await.expect("net error: fatal: kernel died");
}
_ => return,
}
},
maybe_recv = conn.read_stream_2.next() => {
match maybe_recv {
Some(Ok(msg)) => {
conn.write_stream_1.send(msg).await.expect("net error: fatal: kernel died");
}
_ => return,
}
},
}
}
}
pub async fn create_passthrough(
our: &Identity,
our_ip: &str,
from_id: Identity,
to_name: NodeId,
pki: &OnchainPKI,
peers: &Peers,
pending_passthroughs: &mut PendingPassthroughs,
write_stream_1: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
read_stream_1: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
) -> Result<(Identity, Connection)> {
// if the target has already generated a pending passthrough for this source,
// immediately match them
if let Some(pending) = pending_passthroughs.remove(&(to_name.clone(), from_id.name.clone())) {
return Ok((
from_id,
Connection::Passthrough(PassthroughConnection {
write_stream_1,
read_stream_1,
write_stream_2: pending.write_stream,
read_stream_2: pending.read_stream,
}),
));
}
let to_id = pki.get(&to_name).ok_or(anyhow!("unknown QNS name"))?;
let Some((ref ip, ref port)) = to_id.ws_routing else {
// create passthrough to indirect node that we do routing for
//
let target_peer = peers
.get(&to_name)
.ok_or(anyhow!("can't route to that indirect node"))?;
if !target_peer.read().await.routing_for {
return Err(anyhow!("we don't route for that indirect node"));
}
// send their net:sys:uqbar process a message, notifying it to create a *matching*
// passthrough request, which we can pair with this pending one.
target_peer.write().await.sender.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.name.clone(),
process: ProcessId::from_str("net:sys:uqbar").unwrap(),
},
target: Address {
node: to_name.clone(),
process: ProcessId::from_str("net:sys:uqbar").unwrap(),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: Some(5),
ipc: rmp_serde::to_vec(&NetActions::ConnectionRequest(from_id.name.clone()))?,
metadata: None,
}),
payload: None,
signed_capabilities: None,
})?;
return Ok((
from_id,
Connection::PendingPassthrough(PendingPassthroughConnection {
target: to_name,
write_stream: write_stream_1,
read_stream: read_stream_1,
}),
));
};
// create passthrough to direct node
//
let ws_url = make_ws_url(our_ip, ip, port)?;
let Ok(Ok((websocket, _response))) = timeout(TIMEOUT, connect_async(ws_url)).await else {
return Err(anyhow!("failed to connect to target"));
};
let (write_stream_2, read_stream_2) = websocket.split();
Ok((
from_id,
Connection::Passthrough(PassthroughConnection {
write_stream_1,
read_stream_1,
write_stream_2,
read_stream_2,
}),
))
}
pub fn validate_routing_request(
our_name: &str,
buf: &[u8],
pki: &OnchainPKI,
) -> Result<(Identity, NodeId)> {
let routing_request: RoutingRequest = rmp_serde::from_slice(buf)?;
let their_id = pki
.get(&routing_request.source)
.ok_or(anyhow!("unknown QNS name"))?;
let their_networking_key = signature::UnparsedPublicKey::new(
&signature::ED25519,
hex::decode(&strip_0x(&their_id.networking_key))?,
);
their_networking_key.verify(
&[&routing_request.target, our_name].concat().as_bytes(),
&routing_request.signature,
)?;
if routing_request.target == routing_request.source {
return Err(anyhow!("can't route to self"));
}
Ok((their_id.clone(), routing_request.target))
}
pub fn validate_handshake(
handshake: &HandshakePayload,
their_static_key: &[u8],
their_id: &Identity,
) -> Result<()> {
if handshake.protocol_version != 1 {
return Err(anyhow!("handshake protocol version mismatch"));
}
// verify their signature of their static key
let their_networking_key = signature::UnparsedPublicKey::new(
&signature::ED25519,
hex::decode(&strip_0x(&their_id.networking_key))?,
);
their_networking_key.verify(their_static_key, &handshake.signature)?;
Ok(())
}
pub async fn send_uqbar_message(km: &KernelMessage, conn: &mut PeerConnection) -> Result<()> {
let serialized = rmp_serde::to_vec(km)?;
if serialized.len() > MESSAGE_MAX_SIZE as usize {
return Err(anyhow!("message too large"));
}
let len = (serialized.len() as u32).to_be_bytes();
let with_length_prefix = [len.to_vec(), serialized].concat();
// 65519 = 65535 - 16 (TAGLEN)
for payload in with_length_prefix.chunks(65519) {
let len = conn.noise.write_message(payload, &mut conn.buf)?;
conn.write_stream
.feed(tungstenite::Message::binary(&conn.buf[..len]))
.await?;
}
conn.write_stream.flush().await?;
Ok(())
}
/// any error in receiving a message will result in the connection being closed.
pub async fn recv_uqbar_message(conn: &mut PeerConnection) -> Result<KernelMessage> {
let outer_len = conn
.noise
.read_message(&ws_recv(&mut conn.read_stream).await?, &mut conn.buf)?;
if outer_len < 4 {
return Err(anyhow!("uqbar message too small!"));
}
let length_bytes = [conn.buf[0], conn.buf[1], conn.buf[2], conn.buf[3]];
let msg_len = u32::from_be_bytes(length_bytes);
if msg_len > MESSAGE_MAX_SIZE {
return Err(anyhow!("message too large"));
}
let mut msg = Vec::with_capacity(msg_len as usize);
msg.extend_from_slice(&conn.buf[4..outer_len]);
while msg.len() < msg_len as usize {
let len = conn
.noise
.read_message(&ws_recv(&mut conn.read_stream).await?, &mut conn.buf)?;
msg.extend_from_slice(&conn.buf[..len]);
}
Ok(rmp_serde::from_slice(&msg)?)
}
pub async fn send_uqbar_handshake(
our: &Identity,
keypair: &Ed25519KeyPair,
noise_static_key: &[u8],
noise: &mut snow::HandshakeState,
buf: &mut Vec<u8>,
write_stream: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
proxy_request: bool,
) -> Result<()> {
let our_hs = rmp_serde::to_vec(&HandshakePayload {
protocol_version: 1,
name: our.name.clone(),
signature: keypair.sign(noise_static_key).as_ref().to_vec(),
proxy_request,
})
.expect("failed to serialize handshake payload");
let len = noise.write_message(&our_hs, buf)?;
write_stream
.send(tungstenite::Message::binary(&buf[..len]))
.await?;
Ok(())
}
pub async fn recv_uqbar_handshake(
noise: &mut snow::HandshakeState,
buf: &mut Vec<u8>,
read_stream: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
) -> Result<HandshakePayload> {
let len = noise.read_message(&ws_recv(read_stream).await?, buf)?;
Ok(rmp_serde::from_slice(&buf[..len])?)
}
pub async fn ws_recv(
read_stream: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
) -> Result<Vec<u8>> {
let Some(Ok(tungstenite::Message::Binary(bin))) = read_stream.next().await else {
return Err(anyhow!("websocket closed"));
};
Ok(bin)
}
pub fn build_responder() -> (snow::HandshakeState, Vec<u8>) {
let builder: snow::Builder<'_> = snow::Builder::new(PARAMS.clone());
let keypair = builder
.generate_keypair()
.expect("net: couldn't generate keypair?");
(
builder
.local_private_key(&keypair.private)
.build_responder()
.expect("net: couldn't build responder?"),
keypair.public,
)
}
pub fn build_initiator() -> (snow::HandshakeState, Vec<u8>) {
let builder: snow::Builder<'_> = snow::Builder::new(PARAMS.clone());
let keypair = builder
.generate_keypair()
.expect("net: couldn't generate keypair?");
(
builder
.local_private_key(&keypair.private)
.build_initiator()
.expect("net: couldn't build responder?"),
keypair.public,
)
}
pub fn make_ws_url(our_ip: &str, ip: &str, port: &u16) -> Result<url::Url> {
// if we have the same public IP as target, route locally,
// otherwise they will appear offline due to loopback stuff
let ip = if our_ip == ip { "localhost" } else { ip };
let url = url::Url::parse(&format!("ws://{}:{}/ws", ip, port))?;
Ok(url)
}
pub async fn error_offline(km: KernelMessage, network_error_tx: &NetworkErrorSender) -> Result<()> {
network_error_tx
.send(WrappedSendError {
id: km.id,
source: km.source,
error: SendError {
kind: SendErrorKind::Offline,
target: km.target,
message: km.message,
payload: km.payload,
},
})
.await?;
Ok(())
}
fn strip_0x(s: &str) -> String {
if s.starts_with("0x") {
s[2..].to_string()
} else {
s.to_string()
}
}

View File

@ -256,14 +256,18 @@ async fn handle_info(
// TODO: if IP is localhost, assign a router...
let ws_port = http_server::find_open_port(9000).await.unwrap();
// this is NOT our real identity. it's stuff we give to the frontend
// to match on
let our = Identity {
networking_key: format!("0x{}", public_key),
name: String::new(),
ws_routing: Some((ip.clone(), ws_port)),
allowed_routers: vec![
"uqbar-router-1.uq".into(), // "0x8d9e54427c50660c6d4802f63edca86a9ca5fd6a78070c4635950e9d149ed441".into(),
"uqbar-router-2.uq".into(), // "0x06d331ed65843ecf0860c73292005d8103af20820546b2f8f9007d01f60595b1".into(),
"uqbar-router-3.uq".into(), // "0xe6ab611eb62e8aee0460295667f8179cda4315982717db4b0b3da6022deecac1".into(),
"testnode101.uq".into(),
"testnode102.uq".into(),
// "uqbar-router-1.uq".into(), // "0x8d9e54427c50660c6d4802f63edca86a9ca5fd6a78070c4635950e9d149ed441".into(),
// "uqbar-router-2.uq".into(), // "0x06d331ed65843ecf0860c73292005d8103af20820546b2f8f9007d01f60595b1".into(),
// "uqbar-router-3.uq".into(), // "0xe6ab611eb62e8aee0460295667f8179cda4315982717db4b0b3da6022deecac1".into(),
],
};

View File

@ -43,8 +43,6 @@ pub type CapMessageReceiver = tokio::sync::mpsc::Receiver<CapMessage>;
// types used for UQI: uqbar's identity system
//
pub type NodeId = String;
pub type PKINames = Arc<RwLock<HashMap<String, NodeId>>>; // TODO maybe U256 to String
pub type OnchainPKI = Arc<RwLock<HashMap<String, Identity>>>;
#[derive(Debug, Serialize, Deserialize)]
pub struct Registration {