diff --git a/Cargo.lock b/Cargo.lock index dc9b26ab..711addd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,6 +36,15 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aead" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b613b8e1e3cf911a086f53f03bf286f52fd7a7258e4fa606f0ef220d39d8877" +dependencies = [ + "generic-array", +] + [[package]] name = "aead" version = "0.5.2" @@ -46,6 +55,18 @@ dependencies = [ "generic-array", ] +[[package]] +name = "aes" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8" +dependencies = [ + "cfg-if", + "cipher 0.3.0", + "cpufeatures", + "opaque-debug", +] + [[package]] name = "aes" version = "0.8.3" @@ -53,21 +74,35 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac1f845298e95f983ff1944b728ae08b8cebab80d684f0a832ed0fc74dfa27e2" dependencies = [ "cfg-if", - "cipher", + "cipher 0.4.4", "cpufeatures", ] +[[package]] +name = "aes-gcm" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df5f85a83a7d8b0442b6aa7b504b8212c1733da07b98aae43d4bc21b2cb3cdf6" +dependencies = [ + "aead 0.4.3", + "aes 0.7.5", + "cipher 0.3.0", + "ctr 0.8.0", + "ghash 0.4.4", + "subtle", +] + [[package]] name = "aes-gcm" version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "209b47e8954a928e1d72e86eca7000ebb6655fe1436d33eefc2201cad027e237" dependencies = [ - "aead", - "aes", - "cipher", - "ctr", - "ghash", + "aead 0.5.2", + "aes 0.8.3", + "cipher 0.4.4", + "ctr 0.9.2", + "ghash 0.5.0", "subtle", ] @@ -298,6 +333,15 @@ dependencies = [ "wyz", ] +[[package]] +name = "blake2" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" +dependencies = [ + "digest 0.10.7", +] + [[package]] name = "blake3" version = "1.4.1" @@ -499,6 +543,18 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chacha20" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c80e5460aa66fe3b91d40bcbdab953a597b60053e34d684ac6903f863b680a6" +dependencies = [ + "cfg-if", + "cipher 0.3.0", + "cpufeatures", + "zeroize", +] + [[package]] name = "chacha20" version = "0.9.1" @@ -506,20 +562,33 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3613f74bd2eac03dad61bd53dbe620703d4371614fe0bc3b9f04dd36fe4e818" dependencies = [ "cfg-if", - "cipher", + "cipher 0.4.4", "cpufeatures", ] +[[package]] +name = "chacha20poly1305" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a18446b09be63d457bbec447509e85f662f32952b035ce892290396bc0b0cff5" +dependencies = [ + "aead 0.4.3", + "chacha20 0.8.2", + "cipher 0.3.0", + "poly1305 0.7.2", + "zeroize", +] + [[package]] name = "chacha20poly1305" version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "10cd79432192d1c0f4e1a0fef9527696cc039165d729fb41b3f4f4f354c2dc35" dependencies = [ - "aead", - "chacha20", - "cipher", - "poly1305", + "aead 0.5.2", + "chacha20 0.9.1", + "cipher 0.4.4", + "poly1305 0.8.0", "zeroize", ] @@ -538,6 +607,15 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "cipher" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7" +dependencies = [ + "generic-array", +] + [[package]] name = "cipher" version = "0.4.4" @@ -901,13 +979,49 @@ dependencies = [ "subtle", ] +[[package]] +name = "ctr" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "049bb91fb4aaf0e3c7efa6cd5ef877dbbbd15b39dad06d9948de4ec8a75761ea" +dependencies = [ + "cipher 0.3.0", +] + [[package]] name = "ctr" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" dependencies = [ - "cipher", + "cipher 0.4.4", +] + +[[package]] +name = "curve25519-dalek" +version = "4.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89b8c6a2e4b1f45971ad09761aafb85514a84744b67a95e32c3cc1352d1f65c" +dependencies = [ + "cfg-if", + "cpufeatures", + "curve25519-dalek-derive", + "fiat-crypto", + "platforms", + "rustc_version", + "subtle", + "zeroize", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fdaf97f4804dcebfa5862639bc9ce4121e82140bec2a987ac5140294865b5b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.32", ] [[package]] @@ -1277,8 +1391,8 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fda3bf123be441da5260717e0661c25a2fd9cb2b2c1d20bf2e05580047158ab" dependencies = [ - "aes", - "ctr", + "aes 0.8.3", + "ctr 0.9.2", "digest 0.10.7", "hex", "hmac 0.12.1", @@ -1632,6 +1746,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "fiat-crypto" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a481586acf778f1b1455424c343f71124b048ffa5f4fc3f8f6ae9dc432dcb3c7" + [[package]] name = "file-per-thread-logger" version = "0.2.0" @@ -1880,6 +2000,16 @@ dependencies = [ "wasi", ] +[[package]] +name = "ghash" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1583cc1656d7839fd3732b80cf4f38850336cdb9b8ded1cd399ca62958de3c99" +dependencies = [ + "opaque-debug", + "polyval 0.5.3", +] + [[package]] name = "ghash" version = "0.5.0" @@ -1887,7 +2017,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d930750de5717d2dd0b8c0d42c076c0e884c81a73e6cab859bbd2339c71e3e40" dependencies = [ "opaque-debug", - "polyval", + "polyval 0.6.1", ] [[package]] @@ -3208,6 +3338,23 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "platforms" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4503fa043bf02cee09a9582e9554b4c6403b2ef55e4612e96561d294419429f8" + +[[package]] +name = "poly1305" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "048aeb476be11a4b6ca432ca569e375810de9294ae78f4774e78ea98a9246ede" +dependencies = [ + "cpufeatures", + "opaque-debug", + "universal-hash 0.4.1", +] + [[package]] name = "poly1305" version = "0.8.0" @@ -3216,7 +3363,19 @@ checksum = "8159bd90725d2df49889a078b54f4f79e87f1f8a8444194cdca81d38f5393abf" dependencies = [ "cpufeatures", "opaque-debug", - "universal-hash", + "universal-hash 0.5.1", +] + +[[package]] +name = "polyval" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8419d2b623c7c0896ff2d5d96e2cb4ede590fed28fcc34934f4c33c036e620a1" +dependencies = [ + "cfg-if", + "cpufeatures", + "opaque-debug", + "universal-hash 0.4.1", ] [[package]] @@ -3228,7 +3387,7 @@ dependencies = [ "cfg-if", "cpufeatures", "opaque-debug", - "universal-hash", + "universal-hash 0.5.1", ] [[package]] @@ -3828,7 +3987,7 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213" dependencies = [ - "cipher", + "cipher 0.4.4", ] [[package]] @@ -4176,6 +4335,23 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +[[package]] +name = "snow" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c9d1425eb528a21de2755c75af4c9b5d57f50a0d4c3b7f1828a4cd03f8ba155" +dependencies = [ + "aes-gcm 0.9.4", + "blake2", + "chacha20poly1305 0.9.1", + "curve25519-dalek", + "rand_core", + "ring", + "rustc_version", + "sha2 0.10.7", + "subtle", +] + [[package]] name = "socket2" version = "0.4.9" @@ -4832,6 +5008,16 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +[[package]] +name = "universal-hash" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f214e8f697e925001e66ec2c6e37a4ef93f0f78c2eed7814394e10c62025b05" +dependencies = [ + "generic-array", + "subtle", +] + [[package]] name = "universal-hash" version = "0.5.1" @@ -4852,7 +5038,7 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" name = "uqbar" version = "0.1.0" dependencies = [ - "aes-gcm", + "aes-gcm 0.10.2", "anyhow", "async-recursion", "async-trait", @@ -4861,7 +5047,7 @@ dependencies = [ "blake3", "bytes", "cap-std", - "chacha20poly1305", + "chacha20poly1305 0.10.1", "chrono", "cita_trie", "crossterm", @@ -4897,6 +5083,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "sha2 0.10.7", + "snow", "thiserror", "tokio", "tokio-tungstenite 0.20.0", @@ -5955,7 +6142,7 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "760394e246e4c28189f19d488c058bf16f564016aefac5d32bb1f3b51d5e9261" dependencies = [ - "aes", + "aes 0.8.3", "byteorder", "bzip2", "constant_time_eq 0.1.5", diff --git a/Cargo.toml b/Cargo.toml index 0e80f3e6..6f38ebbd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,7 @@ serde = {version = "1.0", features = ["derive"] } serde_json = "1.0" serde_urlencoded = "0.7" sha2 = "0.10" +snow = { version = "0.9.3", features = ["ring-resolver"] } thiserror = "1.0.43" tokio = { version = "1.28", features = ["fs", "macros", "rt-multi-thread", "sync"] } tokio-tungstenite = "*" diff --git a/src/main.rs b/src/main.rs index 0b38c532..c5685f89 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,6 @@ use crate::types::*; use anyhow::Result; use dotenv; -use ethers::prelude::namehash; use std::env; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; @@ -14,7 +13,7 @@ mod http_client; mod http_server; mod kernel; mod keygen; -mod net; +mod net2; mod register; mod terminal; mod types; @@ -255,7 +254,7 @@ async fn main() { vfs_message_sender, encryptor_sender, )); - tasks.spawn(net::networking( + tasks.spawn(net2::networking( our.clone(), our_ip.to_string(), networking_keypair_arc.clone(), diff --git a/src/net2/mod.rs b/src/net2/mod.rs new file mode 100644 index 00000000..d45ba782 --- /dev/null +++ b/src/net2/mod.rs @@ -0,0 +1,733 @@ +use crate::types::*; +use anyhow::{anyhow, Result}; +use futures::stream::{SplitSink, SplitStream}; +use futures::{SinkExt, StreamExt}; +use ring::signature::{self, Ed25519KeyPair}; +use serde::{Deserialize, Serialize}; +use snow::params::NoiseParams; +use std::{collections::HashMap, sync::Arc}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::task::JoinSet; +use tokio::time::timeout; +use tokio_tungstenite::{ + accept_async, connect_async, tungstenite, MaybeTlsStream, WebSocketStream, +}; + +lazy_static::lazy_static! { + static ref PARAMS: NoiseParams = "Noise_XX_25519_ChaChaPoly_BLAKE2s" + .parse() + .expect("net: couldn't build noise params?"); +} + +// only used in connection initialization, otherwise, nacks and Responses are only used for "timeouts" +const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(15); + +const MESSAGE_MAX_SIZE: u32 = 104_858_000; // 100 MB -- TODO analyze as desired, apps can always chunk data into many messages + +#[derive(Clone, Debug, Serialize, Deserialize)] +enum NetActions { + QnsUpdate(QnsUpdate), + QnsBatchUpdate(Vec), +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +struct QnsUpdate { + pub name: String, // actual username / domain name + pub owner: String, + pub node: String, // hex namehash of node + pub public_key: String, + pub ip: String, + pub port: u16, + pub routers: Vec, +} + +#[derive(Debug, Deserialize, Serialize)] +struct HandshakePayload { + pub name: NodeId, + pub signature: Vec, + pub protocol_version: u8, +} + +struct OpenConnection { + pub noise: snow::TransportState, + pub buf: Vec, + pub write_stream: SplitSink>, tungstenite::Message>, + pub read_stream: SplitStream>>, +} + +type Peers = HashMap>; +type PKINames = HashMap; // TODO maybe U256 to String +type OnchainPKI = HashMap; + +struct Peer { + pub identity: Identity, + pub sender: UnboundedSender, +} + +/// Entry point from the main kernel task. Runs forever, spawns listener and sender tasks. +pub async fn networking( + our: Identity, + our_ip: String, + keypair: Arc, + kernel_message_tx: MessageSender, + network_error_tx: NetworkErrorSender, + print_tx: PrintSender, + self_message_tx: MessageSender, + message_rx: MessageReceiver, +) -> Result<()> { + println!("networking\r"); + println!("our identity: {:#?}\r", our); + let our = Arc::new(our); + // branch on whether we are a direct or indirect node + match &our.ws_routing { + None => { + // indirect node: run the indirect networking strategy + todo!("TODO implement indirect networking strategy") + } + Some((ip, port)) => { + // direct node: run the direct networking strategy + if &our_ip != ip { + return Err(anyhow!( + "net: fatal error: IP address mismatch: {} != {}, update your QNS identity", + our_ip, + ip + )); + } + let tcp = match TcpListener::bind(format!("0.0.0.0:{}", port)).await { + Ok(tcp) => tcp, + Err(_e) => { + return Err(anyhow!( + "net: fatal error: can't listen on port {}, update your QNS identity or free up that port", + port, + )); + } + }; + direct_networking( + our.clone(), + our_ip, + tcp, + keypair, + kernel_message_tx, + network_error_tx, + print_tx, + self_message_tx, + message_rx, + ) + .await + } + } +} + +async fn direct_networking( + our: Arc, + our_ip: String, + tcp: TcpListener, + keypair: Arc, + kernel_message_tx: MessageSender, + network_error_tx: NetworkErrorSender, + print_tx: PrintSender, + self_message_tx: MessageSender, + mut message_rx: MessageReceiver, +) -> Result<()> { + println!("direct_networking\r"); + let mut pki: OnchainPKI = HashMap::new(); + let mut peers: Peers = HashMap::new(); + // mapping from QNS namehash to username + let mut names: PKINames = HashMap::new(); + + let mut active_connections = JoinSet::<(String, Option)>::new(); + + // 1. receive messages from kernel and send out over our connections + // 2. receive incoming TCP connections + // 3. deal with active connections that die by removing the associated peer + loop { + tokio::select! { + Some(km) = message_rx.recv() => { + // got a message from kernel to send out over the network + let target = &km.target.node; + // if the message is for us, it's either a protocol-level "hello" message, + // or a debugging command issued from our terminal. handle it here: + if target == &our.name { + match handle_local_message( + &our, + km, + &peers, + &mut pki, + &mut names, + &kernel_message_tx, + &print_tx, + ) + .await { + Ok(()) => {}, + Err(e) => { + print_tx.send(Printout { + verbosity: 0, + content: format!("net: error handling local message: {}", e) + }).await?; + } + } + } + // if the message is for a peer we currently have a connection with, + // try to send it to them + else if let Some(peer) = peers.get_mut(target) { + peer.sender.send(km)?; + } + else if let Some(peer_id) = pki.get(target) { + // if the message is for a *direct* peer we don't have a connection with, + // try to establish a connection with them + if peer_id.ws_routing.is_some() { + match init_connection(&our, &our_ip, peer_id, &keypair).await { + Ok((peer_name, conn)) => { + let (peer_tx, peer_rx) = unbounded_channel::(); + let peer = Arc::new(Peer { + identity: peer_id.clone(), + sender: peer_tx, + }); + peers.insert(peer_name, peer.clone()); + peer.sender.send(km)?; + active_connections.spawn(maintain_connection( + peer, + conn, + peer_rx, + kernel_message_tx.clone(), + )); + } + Err(e) => { + println!("net: error initializing connection: {}", e); + error_offline(km, &network_error_tx).await?; + } + } + } + // if the message is for an *indirect* peer we don't have a connection with, + // do some routing and shit + else { + todo!() + } + } + // peer cannot be found in PKI, throw an offline error + else { + error_offline(km, &network_error_tx).await?; + } + } + Ok((stream, _socket_addr)) = tcp.accept() => { + // TODO we can perform some amount of validation here + // to prevent some amount of potential DDoS attacks. + // can also block based on socket_addr + match accept_async(MaybeTlsStream::Plain(stream)).await { + Ok(websocket) => { + let (peer_name, conn) = recv_connection(&our, &pki, &keypair, websocket).await?; + let (peer_tx, peer_rx) = unbounded_channel::(); + let peer = Arc::new(Peer { + identity: pki.get(&peer_name).ok_or(anyhow!("jej"))?.clone(), + sender: peer_tx, + }); + peers.insert(peer_name, peer.clone()); + println!("received incoming connection\r"); + active_connections.spawn(maintain_connection( + peer, + conn, + peer_rx, + kernel_message_tx.clone(), + )); + } + // ignore connections we failed to accept...? + Err(_) => {} + } + } + Some(Ok((dead_peer, maybe_resend))) = active_connections.join_next() => { + peers.remove(&dead_peer); + match maybe_resend { + None => {}, + Some(km) => { + self_message_tx.send(km).await?; + } + } + } + } + } +} + +async fn maintain_connection( + peer: Arc, + mut conn: OpenConnection, + mut peer_rx: UnboundedReceiver, + kernel_message_tx: MessageSender, + // network_error_tx: NetworkErrorSender, + // print_tx: PrintSender, +) -> (String, Option) { + println!("maintain_connection\r"); + loop { + tokio::select! { + recv_result = recv_uqbar_message(&mut conn) => { + match recv_result { + Ok(km) => { + if km.source.node != peer.identity.name { + println!("net: got message with spoofed source\r"); + return (peer.identity.name.clone(), None) + } + kernel_message_tx.send(km).await.expect("net error: fatal: kernel died"); + } + Err(e) => { + println!("net: error receiving message: {}\r", e); + return (peer.identity.name.clone(), None) + } + } + }, + maybe_recv = peer_rx.recv() => { + match maybe_recv { + Some(km) => { + // TODO error handle + match send_uqbar_message(&km, &mut conn).await { + Ok(()) => continue, + Err(e) => { + println!("net: error sending message: {}\r", e); + return (peer.identity.name.clone(), Some(km)) + } + } + } + None => { + println!("net: peer disconnected\r"); + return (peer.identity.name.clone(), None) + } + } + }, + } + } +} + +async fn recv_connection( + our: &Identity, + pki: &OnchainPKI, + keypair: &Ed25519KeyPair, + websocket: WebSocketStream>, +) -> Result<(String, OpenConnection)> { + println!("recv_connection\r"); + let mut buf = vec![0u8; 65535]; + let (mut noise, our_static_key) = build_responder(); + let (mut write_stream, mut read_stream) = websocket.split(); + + // <- e + noise.read_message(&ws_recv(&mut read_stream).await?, &mut buf)?; + + // -> e, ee, s, es + send_uqbar_handshake( + &our, + keypair, + &our_static_key, + &mut noise, + &mut buf, + &mut write_stream, + ) + .await?; + + // <- s, se + let their_handshake = recv_uqbar_handshake(&mut noise, &mut buf, &mut read_stream).await?; + + // now validate this handshake payload against the QNS PKI + validate_handshake( + &their_handshake, + noise + .get_remote_static() + .ok_or(anyhow!("noise error: missing remote pubkey"))?, + pki.get(&their_handshake.name) + .ok_or(anyhow!("unknown QNS name"))?, + )?; + + // Transition the state machine into transport mode now that the handshake is complete. + let noise = noise.into_transport_mode()?; + println!("handshake complete, noise session received\r"); + + Ok(( + their_handshake.name, + OpenConnection { + noise, + buf, + write_stream, + read_stream, + }, + )) +} + +async fn init_connection( + our: &Identity, + our_ip: &str, + peer_id: &Identity, + keypair: &Ed25519KeyPair, +) -> Result<(String, OpenConnection)> { + println!("init_connection\r"); + let mut buf = vec![0u8; 65535]; + let (mut noise, our_static_key) = build_initiator(); + + let Some((ref ip, ref port)) = peer_id.ws_routing else { + return Err(anyhow!("target has no routing information")) + }; + let Ok(ws_url) = make_ws_url(our_ip, ip, port) else { + return Err(anyhow!("failed to parse websocket url")) + }; + let Ok(Ok((websocket, _response))) = timeout(TIMEOUT, connect_async(ws_url)).await else { + return Err(anyhow!("failed to connect to target")) + }; + let (mut write_stream, mut read_stream) = websocket.split(); + + // -> e + let len = noise.write_message(&[], &mut buf)?; + ws_send(&mut write_stream, &buf[..len]).await?; + + // <- e, ee, s, es + let their_handshake = recv_uqbar_handshake(&mut noise, &mut buf, &mut read_stream).await?; + + // now validate this handshake payload against the QNS PKI + validate_handshake( + &their_handshake, + noise + .get_remote_static() + .ok_or(anyhow!("noise error: missing remote pubkey"))?, + peer_id, + )?; + + // -> s, se + send_uqbar_handshake( + &our, + keypair, + &our_static_key, + &mut noise, + &mut buf, + &mut write_stream, + ) + .await?; + + let noise = noise.into_transport_mode()?; + println!("handshake complete, noise session initiated\r"); + + Ok(( + their_handshake.name, + OpenConnection { + noise, + buf, + write_stream, + read_stream, + }, + )) +} + +fn validate_handshake( + handshake: &HandshakePayload, + their_static_key: &[u8], + their_id: &Identity, +) -> Result<()> { + println!("validate_handshake\r"); + if handshake.protocol_version != 1 { + return Err(anyhow!("handshake protocol version mismatch")); + } + // verify their signature of their static key + let their_networking_key = signature::UnparsedPublicKey::new( + &signature::ED25519, + hex::decode(&strip_0x(&their_id.networking_key))?, + ); + their_networking_key.verify(their_static_key, &handshake.signature)?; + Ok(()) +} + +async fn send_uqbar_message(km: &KernelMessage, conn: &mut OpenConnection) -> Result<()> { + let serialized = bincode::serialize(km)?; + if serialized.len() > MESSAGE_MAX_SIZE as usize { + return Err(anyhow!("uqbar message too large")); + } + + let len = (serialized.len() as u32).to_be_bytes(); + let with_length_prefix = [len.to_vec(), serialized].concat(); + + for payload in with_length_prefix.chunks(65519) { // 65535 - 16 (TAGLEN) + let len = conn.noise.write_message(payload, &mut conn.buf)?; + ws_send(&mut conn.write_stream, &conn.buf[..len]).await?; + } + Ok(()) +} + +async fn recv_uqbar_message(conn: &mut OpenConnection) -> Result { + let outer_len = conn + .noise + .read_message(&ws_recv(&mut conn.read_stream).await?, &mut conn.buf)?; + if outer_len < 4 { + return Err(anyhow!("uqbar message too small!")); + } + + let length_bytes = [conn.buf[0], conn.buf[1], conn.buf[2], conn.buf[3]]; + let msg_len = u32::from_be_bytes(length_bytes); + + let mut msg = Vec::with_capacity(msg_len as usize); + msg.extend_from_slice(&conn.buf[4..outer_len]); + + while msg.len() < msg_len as usize { + let len = conn + .noise + .read_message(&ws_recv(&mut conn.read_stream).await?, &mut conn.buf)?; + msg.extend_from_slice(&conn.buf[..len]); + } + + Ok(bincode::deserialize(&msg)?) +} + +async fn send_uqbar_handshake( + our: &Identity, + keypair: &Ed25519KeyPair, + noise_static_key: &[u8], + noise: &mut snow::HandshakeState, + buf: &mut Vec, + write_stream: &mut SplitSink>, tungstenite::Message>, +) -> Result<()> { + println!("send_uqbar_handshake\r"); + let our_hs = bincode::serialize(&HandshakePayload { + name: our.name.clone(), + signature: keypair.sign(noise_static_key).as_ref().to_vec(), + protocol_version: 1, + }) + .expect("failed to serialize handshake payload"); + + let len = noise.write_message(&our_hs, buf)?; + ws_send(write_stream, &buf[..len]).await?; + + Ok(()) +} + +async fn recv_uqbar_handshake( + noise: &mut snow::HandshakeState, + buf: &mut Vec, + read_stream: &mut SplitStream>>, +) -> Result { + println!("recv_uqbar_handshake\r"); + let len = noise.read_message(&ws_recv(read_stream).await?, buf)?; + + // from buffer, read a sequence of bytes that deserializes to the + // 1. QNS name of the sender + // 2. a signature by their published networking key that signs the + // static key they will be using for this handshake + // 3. the version number of the networking protocol (so we can upgrade it) + Ok(bincode::deserialize(&buf[..len])?) +} + +async fn ws_recv( + read_stream: &mut SplitStream>>, +) -> Result> { + let Some(Ok(tungstenite::Message::Binary(bin))) = read_stream.next().await else { + return Err(anyhow!("websocket closed")); + }; + Ok(bin) +} + +async fn ws_send( + write_stream: &mut SplitSink>, tungstenite::Message>, + msg: &[u8], +) -> Result<()> { + write_stream.send(tungstenite::Message::binary(msg)).await?; + Ok(()) +} + +fn build_responder() -> (snow::HandshakeState, Vec) { + let builder: snow::Builder<'_> = snow::Builder::new(PARAMS.clone()); + let keypair = builder + .generate_keypair() + .expect("net: couldn't generate keypair?"); + ( + builder + .local_private_key(&keypair.private) + .build_responder() + .expect("net: couldn't build responder?"), + keypair.public, + ) +} + +fn build_initiator() -> (snow::HandshakeState, Vec) { + let builder: snow::Builder<'_> = snow::Builder::new(PARAMS.clone()); + let keypair = builder + .generate_keypair() + .expect("net: couldn't generate keypair?"); + ( + builder + .local_private_key(&keypair.private) + .build_initiator() + .expect("net: couldn't build responder?"), + keypair.public, + ) +} + +fn make_ws_url(our_ip: &str, ip: &str, port: &u16) -> Result { + // if we have the same public IP as target, route locally, + // otherwise they will appear offline due to loopback stuff + let ip = if our_ip == ip { "localhost" } else { ip }; + match url::Url::parse(&format!("ws://{}:{}/ws", ip, port)) { + Ok(v) => Ok(v), + Err(_) => Err(SendErrorKind::Offline), + } +} + +async fn error_offline(km: KernelMessage, network_error_tx: &NetworkErrorSender) -> Result<()> { + network_error_tx + .send(WrappedSendError { + id: km.id, + source: km.source, + error: SendError { + kind: SendErrorKind::Offline, + target: km.target, + message: km.message, + payload: km.payload, + }, + }) + .await?; + Ok(()) +} + +fn strip_0x(s: &str) -> String { + if s.starts_with("0x") { + s[2..].to_string() + } else { + s.to_string() + } +} + +/// net module only handles incoming local requests, will never return a response +async fn handle_local_message( + our: &Identity, + km: KernelMessage, + peers: &Peers, + pki: &mut OnchainPKI, + names: &mut PKINames, + kernel_message_tx: &MessageSender, + print_tx: &PrintSender, +) -> Result<()> { + let ipc = match km.message { + Message::Response(_) => return Ok(()), + Message::Request(request) => request.ipc, + }; + + if km.source.node != our.name { + // respond to a text message with a simple "delivered" response + print_tx + .send(Printout { + verbosity: 0, + content: format!( + "\x1b[3;32m{}: {}\x1b[0m", + km.source.node, + std::str::from_utf8(&ipc).unwrap_or("!!message parse error!!") + ), + }) + .await?; + kernel_message_tx + .send(KernelMessage { + id: km.id, + source: Address { + node: our.name.clone(), + process: ProcessId::from_str("net:sys:uqbar").unwrap(), + }, + target: km.rsvp.unwrap_or(km.source), + rsvp: None, + message: Message::Response(( + Response { + inherit: false, + ipc: "delivered".as_bytes().to_vec(), + metadata: None, + }, + None, + )), + payload: None, + signed_capabilities: None, + }) + .await?; + Ok(()) + } else { + // available commands: "peers", "QnsUpdate" (see qns_indexer module) + // first parse as raw string, then deserialize to NetActions object + match std::str::from_utf8(&ipc) { + Ok("peers") => { + print_tx + .send(Printout { + verbosity: 0, + content: format!("{:#?}", peers.keys()), + }) + .await?; + } + Ok("pki") => { + print_tx + .send(Printout { + verbosity: 0, + content: format!("{:#?}", pki), + }) + .await?; + } + Ok("names") => { + print_tx + .send(Printout { + verbosity: 0, + content: format!("{:#?}", names), + }) + .await?; + } + _ => { + let Ok(act) = serde_json::from_slice::(&ipc) else { + print_tx + .send(Printout { + verbosity: 0, + content: "net: got unknown command".into(), + }) + .await?; + return Ok(()); + }; + match act { + NetActions::QnsUpdate(log) => { + print_tx + .send(Printout { + verbosity: 1, + content: format!("net: got QNS update for {}", log.name), + }) + .await?; + + pki.insert( + log.name.clone(), + Identity { + name: log.name.clone(), + networking_key: log.public_key, + ws_routing: if log.ip == "0.0.0.0".to_string() || log.port == 0 { + None + } else { + Some((log.ip, log.port)) + }, + allowed_routers: log.routers, + }, + ); + names.insert(log.node, log.name); + } + NetActions::QnsBatchUpdate(log_list) => { + print_tx + .send(Printout { + verbosity: 1, + content: format!( + "net: got QNS update with {} peers", + log_list.len() + ), + }) + .await?; + for log in log_list { + pki.insert( + log.name.clone(), + Identity { + name: log.name.clone(), + networking_key: log.public_key, + ws_routing: if log.ip == "0.0.0.0".to_string() || log.port == 0 + { + None + } else { + Some((log.ip, log.port)) + }, + allowed_routers: log.routers, + }, + ); + names.insert(log.node, log.name); + } + } + } + } + } + Ok(()) + } +} diff --git a/src/register.rs b/src/register.rs index 71f7a655..ae57fc3d 100644 --- a/src/register.rs +++ b/src/register.rs @@ -159,11 +159,12 @@ async fn handle_boot( ) -> Result { our.name = info.username; - if info.direct { - our.allowed_routers = vec![]; - } else { - our.ws_routing = None; - } + // println!("handle_boot: info.direct: {}\r", info.direct); + // if info.direct { + // our.allowed_routers = vec![]; + // } else { + // our.ws_routing = None; + // } // if keyfile was not present in node and is present from user upload if encoded_keyfile.is_empty() && !info.keyfile.clone().is_empty() { @@ -260,11 +261,7 @@ async fn handle_info( networking_key: format!("0x{}", public_key), name: String::new(), ws_routing: Some((ip.clone(), ws_port)), - allowed_routers: vec![ - "uqbar-router-1.uq".into(), // "0x8d9e54427c50660c6d4802f63edca86a9ca5fd6a78070c4635950e9d149ed441".into(), - "uqbar-router-2.uq".into(), // "0x06d331ed65843ecf0860c73292005d8103af20820546b2f8f9007d01f60595b1".into(), - "uqbar-router-3.uq".into(), // "0xe6ab611eb62e8aee0460295667f8179cda4315982717db4b0b3da6022deecac1".into(), - ], + allowed_routers: vec![], }; *our_arc.lock().unwrap() = Some(our.clone());