From 4094be9293fbf3e7a96933789643889ab7eb41f4 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 31 Oct 2023 15:43:19 -0400 Subject: [PATCH] replace old net with net2 --- src/main.rs | 4 +- src/net/connections.rs | 599 ------------ src/net/mod.rs | 1853 ++++++++++++++++++++---------------- src/net/types.rs | 141 ++- src/{net2 => net}/utils.rs | 2 +- src/net2/mod.rs | 1186 ----------------------- src/net2/types.rs | 115 --- 7 files changed, 1134 insertions(+), 2766 deletions(-) delete mode 100644 src/net/connections.rs rename src/{net2 => net}/utils.rs (99%) delete mode 100644 src/net2/mod.rs delete mode 100644 src/net2/types.rs diff --git a/src/main.rs b/src/main.rs index eeb0733c..5da6fc27 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,7 @@ mod http_client; mod http_server; mod kernel; mod keygen; -mod net2; +mod net; mod register; mod terminal; mod types; @@ -255,7 +255,7 @@ async fn main() { vfs_message_sender, encryptor_sender, )); - tasks.spawn(net2::networking( + tasks.spawn(net::networking( our.clone(), our_ip.to_string(), networking_keypair_arc.clone(), diff --git a/src/net/connections.rs b/src/net/connections.rs deleted file mode 100644 index d6b70f85..00000000 --- a/src/net/connections.rs +++ /dev/null @@ -1,599 +0,0 @@ -use crate::net::*; -use chacha20poly1305::{ - aead::{Aead, AeadCore, KeyInit, OsRng}, - XChaCha20Poly1305, XNonce, -}; -use elliptic_curve::ecdh::SharedSecret; -use futures::{SinkExt, StreamExt}; -use ring::signature::Ed25519KeyPair; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::task::JoinHandle; -use tokio_tungstenite::tungstenite; - -pub async fn build_connection( - our: Identity, - keypair: Arc, - pki: OnchainPKI, - keys: PeerKeys, - peers: Peers, - websocket: WebSocket, - kernel_message_tx: MessageSender, - net_message_tx: MessageSender, - network_error_tx: NetworkErrorSender, - with: Option, -) -> ( - UnboundedSender<(NetworkMessage, Option)>, - JoinHandle>, -) { - // println!("building new connection\r"); - let (message_tx, message_rx) = unbounded_channel::<(NetworkMessage, Option)>(); - let handle = tokio::spawn(maintain_connection( - our, - with, - keypair, - pki, - keys, - peers, - websocket, - message_tx.clone(), - message_rx, - kernel_message_tx, - net_message_tx, - network_error_tx, - )); - return (message_tx, handle); -} - -/// Keeps a connection alive and handles sending and receiving of NetworkMessages through it. -/// TODO add a keepalive PING/PONG system -/// TODO kill this after a certain amount of inactivity -pub async fn maintain_connection( - our: Identity, - with: Option, - keypair: Arc, - pki: OnchainPKI, - keys: PeerKeys, - peers: Peers, - websocket: WebSocket, - message_tx: UnboundedSender<(NetworkMessage, Option)>, - mut message_rx: UnboundedReceiver<(NetworkMessage, Option)>, - kernel_message_tx: MessageSender, - net_message_tx: MessageSender, - network_error_tx: NetworkErrorSender, -) -> Option { - // let conn_id: u64 = rand::random(); - // println!("maintaining connection {conn_id}\r"); - - // accept messages on the websocket in one task, and send messages in another - let (mut write_stream, mut read_stream) = websocket.split(); - - let (forwarding_ack_tx, mut forwarding_ack_rx) = unbounded_channel::(); - // manage outstanding ACKs from messages sent over the connection - // TODO replace with more performant data structure - let ack_map = Arc::new(RwLock::new(HashMap::::new())); - let sender_ack_map = ack_map.clone(); - - let last_pong = Arc::new(RwLock::new(tokio::time::Instant::now())); - let ping_last_pong = last_pong.clone(); - let ping_tx = message_tx.clone(); - - // Ping/Pong keepalive task - let ping_task = tokio::spawn(async move { - loop { - tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; - if ping_last_pong.read().await.elapsed() > tokio::time::Duration::from_secs(60) { - break; - } - if let Err(_) = ping_tx.send((NetworkMessage::Ping, None)) { - // Failed to send Ping, kill the connection - break; - } - } - }); - - let forwarder_message_tx = message_tx.clone(); - let ack_forwarder = tokio::spawn(async move { - while let Some(result) = forwarding_ack_rx.recv().await { - match result { - Ok(NetworkMessage::Ack(id)) => { - // println!("net: got forwarding ack for message {}\r", id); - forwarder_message_tx - .send((NetworkMessage::Ack(id), None)) - .unwrap(); - } - Ok(NetworkMessage::Nack(id)) => { - // println!("net: got forwarding nack for message {}\r", id); - forwarder_message_tx - .send((NetworkMessage::Nack(id), None)) - .unwrap(); - } - Ok(NetworkMessage::HandshakeAck(handshake)) => { - // println!( - // "net: got forwarding handshakeAck for message {}\r", - // handshake.id - // ); - forwarder_message_tx - .send((NetworkMessage::HandshakeAck(handshake), None)) - .unwrap(); - } - Err((message_id, _e)) => { - // println!("net: got forwarding error from ack_rx: {:?}\r", e); - // what do we do here? - forwarder_message_tx - .send((NetworkMessage::Nack(message_id), None)) - .unwrap(); - } - _ => { - // println!("net: weird none ack\r"); - } - } - } - }); - - // receive messages from over the websocket and route them to the correct peer handler, - // or create it, if necessary. - let ws_receiver = tokio::spawn(async move { - while let Some(incoming) = read_stream.next().await { - let Ok(tungstenite::Message::Binary(bin)) = incoming else { - if let Ok(tungstenite::Message::Ping(_)) = incoming { - // let _ = write_stream.send(tungstenite::Message::Pong(vec![])).await; - } - continue; - }; - // TODO use a language-netural serialization format here! - let Ok(net_message) = bincode::deserialize::(&bin) else { - // just kill the connection if we get a non-Uqbar message - break; - }; - match net_message { - NetworkMessage::Pong => { - *last_pong.write().await = tokio::time::Instant::now(); - continue; - } - NetworkMessage::Ping => { - // respond with a Pong - let _ = message_tx.send((NetworkMessage::Pong, None)); - continue; - } - NetworkMessage::Ack(id) => { - let Some(result_tx) = ack_map.write().await.remove(&id) else { - // println!("conn {conn_id}: got unexpected Ack {id}\r"); - continue; - }; - // println!("conn {conn_id}: got Ack {id}\r"); - let _ = result_tx.send(Ok(net_message)); - continue; - } - NetworkMessage::Nack(id) => { - let Some(result_tx) = ack_map.write().await.remove(&id) else { - // println!("net: got unexpected Nack\r"); - continue; - }; - let _ = result_tx.send(Ok(net_message)); - continue; - } - NetworkMessage::Msg { - ref id, - ref from, - ref to, - ref contents, - } => { - // println!("conn {conn_id}: handling msg {id}\r"); - // if the message is *directed to us*, try to handle with the - // matching peer handler "decrypter". - // - if to == &our.name { - // if we have the peer, send the message to them. - if let Some(peer) = peers.read().await.get(from) { - let _ = peer - .decrypter - .send((contents.to_owned(), forwarding_ack_tx.clone())); - continue; - } - // if we don't have the peer, see if we have the keys to create them. - // if we don't have their keys, throw a nack. - if let Some((peer_id, secret)) = keys.read().await.get(from) { - let new_peer = create_new_peer( - our.clone(), - peer_id.clone(), - peers.clone(), - keys.clone(), - secret.clone(), - message_tx.clone(), - kernel_message_tx.clone(), - net_message_tx.clone(), - network_error_tx.clone(), - ); - let _ = new_peer - .decrypter - .send((contents.to_owned(), forwarding_ack_tx.clone())); - peers.write().await.insert(peer_id.name.clone(), new_peer); - } else { - // println!("net: nacking message {id}\r"); - message_tx.send((NetworkMessage::Nack(*id), None)).unwrap(); - } - } else { - // if the message is *directed to someone else*, try to handle - // with the matching peer handler "sender". - // - if let Some(peer) = peers.read().await.get(to) { - let id = *id; - let to = to.clone(); - match peer.sender.send(( - PeerMessage::Net(net_message), - Some(forwarding_ack_tx.clone()), - )) { - Ok(_) => {} - Err(_) => { - peers.write().await.remove(&to); - message_tx.send((NetworkMessage::Nack(id), None)).unwrap(); - } - } - } else { - // if we don't have the peer, throw a nack. - // println!("net: nacking message with id {id}\r"); - message_tx.send((NetworkMessage::Nack(*id), None)).unwrap(); - } - } - } - NetworkMessage::Handshake(ref handshake) => { - // when we get a handshake, if we are the target, - // 1. verify it against the PKI - // 2. send a response handshakeAck - // 3. create a Peer and save, replacing old one if it existed - // as long as we are the target, we also get to kill this connection - // if the handshake is invalid, since it must be directly "to" us. - if handshake.target == our.name { - let Some(peer_id) = pki.read().await.get(&handshake.from).cloned() else { - // println!( - // "net: failed handshake with unknown node {}\r", - // handshake.from - // ); - message_tx - .send((NetworkMessage::Nack(handshake.id), None)) - .unwrap(); - break; - }; - let their_ephemeral_pk = match validate_handshake(&handshake, &peer_id) { - Ok(pk) => pk, - Err(e) => { - println!("net: invalid handshake from {}: {}\r", handshake.from, e); - message_tx - .send((NetworkMessage::Nack(handshake.id), None)) - .unwrap(); - break; - } - }; - let (secret, handshake) = make_secret_and_handshake( - &our, - keypair.clone(), - &handshake.from, - Some(handshake.id), - ); - message_tx - .send((NetworkMessage::HandshakeAck(handshake), None)) - .unwrap(); - let secret = Arc::new(secret.diffie_hellman(&their_ephemeral_pk)); - // save the handshake to our Keys map - keys.write() - .await - .insert(peer_id.name.clone(), (peer_id.clone(), secret.clone())); - let new_peer = create_new_peer( - our.clone(), - peer_id.clone(), - peers.clone(), - keys.clone(), - secret, - message_tx.clone(), - kernel_message_tx.clone(), - net_message_tx.clone(), - network_error_tx.clone(), - ); - // we might be replacing an old peer, so we need to remove it first - // we can't rely on the hashmap for this, because the dropped peer - // will trigger a drop of the sender, which will kill the peer_handler - peers.write().await.remove(&peer_id.name); - peers.write().await.insert(peer_id.name.clone(), new_peer); - } else { - // if we are NOT the target, - // try to send it to the matching peer handler "sender" - if let Some(peer) = peers.read().await.get(&handshake.target) { - let _ = peer.sender.send(( - PeerMessage::Net(net_message), - Some(forwarding_ack_tx.clone()), - )); - } else { - // if we don't have the peer, throw a nack. - // println!("net: nacking handshake with id {}\r", handshake.id); - message_tx - .send((NetworkMessage::Nack(handshake.id), None)) - .unwrap(); - } - } - } - NetworkMessage::HandshakeAck(ref handshake) => { - let Some(result_tx) = ack_map.write().await.remove(&handshake.id) else { - continue; - }; - let _ = result_tx.send(Ok(net_message)); - } - } - } - }); - - tokio::select! { - _ = ws_receiver => { - // println!("ws_receiver died\r"); - }, - _ = ack_forwarder => { - // println!("ack_forwarder died\r"); - }, - _ = ping_task => { - // println!("ping_task died\r"); - }, - // receive messages we would like to send to peers along this connection - // and send them to the websocket - _ = async { - while let Some((message, result_tx)) = message_rx.recv().await { - // TODO use a language-netural serialization format here! - if let Ok(bytes) = bincode::serialize::(&message) { - match &message { - NetworkMessage::Msg { id, .. } => { - // println!("conn {conn_id}: piping msg {id}\r"); - sender_ack_map.write().await.insert(*id, result_tx.unwrap()); - } - NetworkMessage::Handshake(h) => { - sender_ack_map.write().await.insert(h.id, result_tx.unwrap()); - } - _ => {} - } - match write_stream.send(tungstenite::Message::Binary(bytes)).await { - Ok(()) => {} - Err(e) => { - // println!("net: send error: {:?}\r", e); - let id = match &message { - NetworkMessage::Msg { id, .. } => id, - NetworkMessage::Handshake(h) => &h.id, - _ => continue, - }; - let Some(result_tx) = sender_ack_map.write().await.remove(&id) else { - continue; - }; - // TODO learn how to handle other non-fatal websocket errors. - match e { - tungstenite::error::Error::Capacity(_) - | tungstenite::Error::Io(_) => { - let _ = result_tx.send(Err((*id, SendErrorKind::Timeout))); - } - _ => { - let _ = result_tx.send(Ok(NetworkMessage::Nack(*id))); - } - } - } - } - } - } - } => { - // println!("ws_sender died\r"); - }, - }; - return with; -} - -/// After a successful handshake, use information to spawn a new `peer_handler` task -/// and save a `Peer` in our peers mapping. Returns a sender to use for sending messages -/// to this peer, which will also be saved in its Peer struct. -pub fn create_new_peer( - our: Identity, - new_peer_id: Identity, - peers: Peers, - keys: PeerKeys, - secret: Arc>, - conn_sender: UnboundedSender<(NetworkMessage, Option)>, - kernel_message_tx: MessageSender, - net_message_tx: MessageSender, - network_error_tx: NetworkErrorSender, -) -> Peer { - let (message_tx, message_rx) = unbounded_channel::<(PeerMessage, Option)>(); - let (decrypter_tx, decrypter_rx) = unbounded_channel::<(Vec, ErrorShuttle)>(); - let peer_id_name = new_peer_id.name.clone(); - let peer_conn_sender = conn_sender.clone(); - tokio::spawn(async move { - match peer_handler( - our, - peer_id_name.clone(), - secret, - message_rx, - decrypter_rx, - peer_conn_sender, - kernel_message_tx, - network_error_tx, - ) - .await - { - None => { - // println!("net: dropping peer handler but not deleting\r"); - } - Some(km) => { - // println!("net: ok actually deleting peer+keys now and retrying send\r"); - peers.write().await.remove(&peer_id_name); - keys.write().await.remove(&peer_id_name); - let _ = net_message_tx.send(km).await; - } - } - }); - return Peer { - identity: new_peer_id, - sender: message_tx, - decrypter: decrypter_tx, - socket_tx: conn_sender, - }; -} - -/// 1. take in messages from a specific peer, decrypt them, and send to kernel -/// 2. take in messages targeted at specific peer and either: -/// - encrypt them, and send to proper connection -/// - forward them untouched along the connection -async fn peer_handler( - our: Identity, - who: String, - secret: Arc>, - mut message_rx: UnboundedReceiver<(PeerMessage, Option)>, - mut decrypter_rx: UnboundedReceiver<(Vec, ErrorShuttle)>, - socket_tx: UnboundedSender<(NetworkMessage, Option)>, - kernel_message_tx: MessageSender, - network_error_tx: NetworkErrorSender, -) -> Option { - // println!("peer_handler\r"); - let mut key = [0u8; 32]; - secret - .extract::(None) - .expand(&[], &mut key) - .unwrap(); - let cipher = XChaCha20Poly1305::new(generic_array::GenericArray::from_slice(&key)); - - let (ack_tx, mut ack_rx) = unbounded_channel::(); - // TODO use a more efficient data structure - let ack_map = Arc::new(RwLock::new(HashMap::::new())); - let recv_ack_map = ack_map.clone(); - tokio::select! { - // - // take in messages from a specific peer, decrypt them, and send to kernel - // - _ = async { - while let Some((encrypted_bytes, result_tx)) = decrypter_rx.recv().await { - let nonce = XNonce::from_slice(&encrypted_bytes[..24]); - if let Ok(decrypted) = cipher.decrypt(&nonce, &encrypted_bytes[24..]) { - if let Ok(message) = bincode::deserialize::(&decrypted) { - if message.source.node == who { - // println!("net: got peer message {}, acking\r", message.id); - let _ = result_tx.send(Ok(NetworkMessage::Ack(message.id))); - let _ = kernel_message_tx.send(message).await; - continue; - } - println!("net: got message 'from' wrong person! cheater/liar!\r"); - break; - } - println!("net: failed to deserialize message from {}\r", who); - break; - } - println!("net: failed to decrypt message from {}, could be spoofer\r", who); - break; - } - } => { - // println!("net: lost peer {who}\r"); - return None - } - // - // take in messages targeted at specific peer and either: - // - encrypt them, and send to proper connection - // - forward them untouched along the connection - // - _ = async { - // if we get a result_tx, rather than track it here, let a different - // part of the code handle whatever comes back from the socket. - while let Some((message, maybe_result_tx)) = message_rx.recv().await { - // if message is raw, we should encrypt. - // otherwise, simply send - match message { - PeerMessage::Raw(message) => { - let id = message.id; - if let Ok(bytes) = bincode::serialize::(&message) { - // generating a random nonce for each message. - // this isn't really as secure as we could get: should - // add a counter and then throw away the key when we hit a - // certain # of messages. TODO. - let nonce = XChaCha20Poly1305::generate_nonce(&mut OsRng); - if let Ok(encrypted) = cipher.encrypt(&nonce, bytes.as_ref()) { - if maybe_result_tx.is_none() { - ack_map.write().await.insert(id, message); - } - match socket_tx.send(( - NetworkMessage::Msg { - from: our.name.clone(), - to: who.clone(), - id: id, - contents: [nonce.to_vec(), encrypted].concat(), - }, - Some(maybe_result_tx.unwrap_or(ack_tx.clone())), - )) { - Ok(()) => tokio::task::yield_now().await, - Err(tokio::sync::mpsc::error::SendError((_, result_tx))) => { - // println!("net: lost socket with {who}\r"); - let _ = result_tx.unwrap().send(Ok(NetworkMessage::Nack(id))); - }, - } - } - } - } - PeerMessage::Net(net_message) => { - match socket_tx.send((net_message, Some(maybe_result_tx.unwrap_or(ack_tx.clone())))) { - Ok(()) => continue, - Err(tokio::sync::mpsc::error::SendError((net_message, result_tx))) => { - // println!("net: lost *forwarding* socket with {who}\r"); - let id = match net_message { - NetworkMessage::Msg { id, .. } => id, - NetworkMessage::Handshake(h) => h.id, - _ => continue, - }; - let _ = result_tx.unwrap().send(Ok(NetworkMessage::Nack(id))); - break; - }, - } - } - } - } - } => return None, - // - // receive acks and nacks from our socket - // throw away acks, but kill this peer and retry the send on nacks. - // - maybe_km = async { - while let Some(result) = ack_rx.recv().await { - match result { - Ok(NetworkMessage::Ack(id)) => { - // println!("net: got ack for message {}\r", id); - recv_ack_map.write().await.remove(&id); - continue; - } - Ok(NetworkMessage::Nack(id)) => { - // println!("net: got nack for message {}\r", id); - let Some(km) = recv_ack_map.write().await.remove(&id) else { - continue; - }; - // when we get a Nack, **delete this peer** and try to send the message again! - return Some(km) - } - Err((message_id, e)) => { - // println!("net: got error from ack_rx: {:?}\r", e); - // in practice this is always a timeout in current implementation - let Some(km) = recv_ack_map.write().await.remove(&message_id) else { - continue; - }; - let _ = network_error_tx - .send(WrappedSendError { - id: km.id, - source: km.source, - error: SendError { - kind: e, - target: km.target, - message: km.message, - payload: km.payload, - }, - }) - .await; - return None - } - _ => { - // println!("net: weird none ack\r"); - return None - } - } - } - return None; - } => { - // println!("net: exiting peer due to nackage\r"); - return maybe_km - }, - } -} diff --git a/src/net/mod.rs b/src/net/mod.rs index fefe7f7f..bcdb31ef 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -1,23 +1,27 @@ -use crate::net::connections::*; -use crate::net::types::*; +use crate::net::{types::*, utils::*}; use crate::types::*; -use anyhow::Result; -use elliptic_curve::ecdh::EphemeralSecret; -use elliptic_curve::PublicKey; -use ethers::prelude::k256::{self, Secp256k1}; -use ring::signature::{self, Ed25519KeyPair}; -use std::{collections::HashMap, sync::Arc}; +use anyhow::{anyhow, Result}; +use futures::{SinkExt, StreamExt}; +use rand::seq::SliceRandom; +use ring::signature::Ed25519KeyPair; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use tokio::net::TcpListener; -use tokio::sync::{mpsc::unbounded_channel, RwLock}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::task::JoinSet; -use tokio::time::timeout; -use tokio_tungstenite::{accept_async, connect_async, MaybeTlsStream}; +use tokio::time; +use tokio_tungstenite::{accept_async, connect_async, MaybeTlsStream, WebSocketStream}; -mod connections; mod types; +mod utils; // only used in connection initialization, otherwise, nacks and Responses are only used for "timeouts" -const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(15); +const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); + +// 10 MB -- TODO analyze as desired, apps can always chunk data into many messages +const MESSAGE_MAX_SIZE: u32 = 10_485_800; /// Entry point from the main kernel task. Runs forever, spawns listener and sender tasks. pub async fn networking( @@ -27,676 +31,962 @@ pub async fn networking( kernel_message_tx: MessageSender, network_error_tx: NetworkErrorSender, print_tx: PrintSender, - message_tx: MessageSender, + self_message_tx: MessageSender, + message_rx: MessageReceiver, +) -> Result<()> { + println!("networking!\r"); + println!("our identity: {:#?}\r", our); + // branch on whether we are a direct or indirect node + match &our.ws_routing { + None => { + // indirect node: run the indirect networking strategy + indirect_networking( + our, + our_ip, + keypair, + kernel_message_tx, + network_error_tx, + print_tx, + self_message_tx, + message_rx, + ) + .await + } + Some((ip, port)) => { + // direct node: run the direct networking strategy + if &our_ip != ip { + return Err(anyhow!( + "net: fatal error: IP address mismatch: {} != {}, update your QNS identity", + our_ip, + ip + )); + } + let tcp = match TcpListener::bind(format!("0.0.0.0:{}", port)).await { + Ok(tcp) => tcp, + Err(_e) => { + return Err(anyhow!( + "net: fatal error: can't listen on port {}, update your QNS identity or free up that port", + port, + )); + } + }; + direct_networking( + our, + our_ip, + tcp, + keypair, + kernel_message_tx, + network_error_tx, + print_tx, + self_message_tx, + message_rx, + ) + .await + } + } +} + +async fn indirect_networking( + our: Identity, + our_ip: String, + keypair: Arc, + kernel_message_tx: MessageSender, + network_error_tx: NetworkErrorSender, + print_tx: PrintSender, + self_message_tx: MessageSender, mut message_rx: MessageReceiver, ) -> Result<()> { - // TODO persist this here - let pki: OnchainPKI = Arc::new(RwLock::new(HashMap::new())); - let keys: PeerKeys = Arc::new(RwLock::new(HashMap::new())); + println!("indirect_networking\r"); + let mut pki: OnchainPKI = HashMap::new(); + let mut peers: Peers = HashMap::new(); // mapping from QNS namehash to username - let names: PKINames = Arc::new(RwLock::new(HashMap::new())); + let mut names: PKINames = HashMap::new(); - // this only lives during a given run of the kernel - let peers: Peers = Arc::new(RwLock::new(HashMap::new())); + let mut peer_connections = JoinSet::<(NodeId, Option)>::new(); + let mut active_routers = HashSet::::new(); - // listener task either kickstarts our networking by establishing active connections - // with one or more routers, or spawns a websocket listener if we are a direct node. - let listener = match &our.ws_routing { - None => { - // indirect node: connect to router(s) - tokio::spawn(connect_to_routers( - our.clone(), - keypair.clone(), - our_ip.clone(), - pki.clone(), - keys.clone(), - peers.clone(), - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - print_tx.clone(), - )) + // before opening up the main loop, go through our allowed routers + // and attempt to connect to all of them, saving the successfully + // connected-to ones in our router-set + connect_to_routers( + &our, + &our_ip, + &keypair, + &mut active_routers, + &pki, + &mut peers, + &mut peer_connections, + kernel_message_tx.clone(), + ) + .await; + + loop { + tokio::select! { + // 1. receive messages from kernel and send out over connections, + // making new connections through our router-set as needed + Some(km) = message_rx.recv() => { + // got a message from kernel to send out over the network + let target = &km.target.node; + // if the message is for us, it's either a protocol-level "hello" message, + // or a debugging command issued from our terminal. handle it here: + if target == &our.name { + match handle_local_message( + &our, + &our_ip, + &keypair, + km, + &mut peers, + &mut pki, + &mut peer_connections, + None, + None, + Some(&active_routers), + &mut names, + &kernel_message_tx, + &print_tx, + ) + .await { + Ok(()) => {}, + Err(e) => { + print_tx.send(Printout { + verbosity: 0, + content: format!("net: error handling local message: {}", e) + }).await?; + } + } + } + // if the message is for a peer we currently have a connection with, + // try to send it to them + else if let Some(peer) = peers.get_mut(target) { + peer.sender.send(km)?; + } + else if let Some(peer_id) = pki.get(target) { + // if the message is for a *direct* peer we don't have a connection with, + // try to establish a connection with them + // TODO: here, we can *choose* to use our routers so as not to reveal + // networking information about ourselves to the target. + if peer_id.ws_routing.is_some() { + match init_connection(&our, &our_ip, peer_id, &keypair, None, false).await { + Ok((peer_name, direct_conn)) => { + let (peer_tx, peer_rx) = unbounded_channel::(); + let peer = Arc::new(Peer { + identity: peer_id.clone(), + routing_for: false, + sender: peer_tx, + }); + peers.insert(peer_name, peer.clone()); + peer.sender.send(km)?; + peer_connections.spawn(maintain_connection( + peer, + direct_conn, + peer_rx, + kernel_message_tx.clone(), + )); + } + Err(e) => { + println!("net: error initializing connection: {}\r", e); + error_offline(km, &network_error_tx).await?; + } + } + } + // if the message is for an *indirect* peer we don't have a connection with, + // do some routing: in a randomized order, go through their listed routers + // on chain and try to get one of them to build a proxied connection to + // this node for you + else { + let sent = time::timeout(TIMEOUT, + init_connection_via_router( + &our, + &our_ip, + &keypair, + km.clone(), + peer_id, + &pki, + &names, + &mut peers, + &mut peer_connections, + kernel_message_tx.clone() + )).await; + if !sent.unwrap_or(false) { + // none of the routers worked! + println!("net: error initializing routed connection\r"); + error_offline(km, &network_error_tx).await?; + } + } + } + // peer cannot be found in PKI, throw an offline error + else { + error_offline(km, &network_error_tx).await?; + } + } + // 2. deal with active connections that die by removing the associated peer + // if the peer is one of our routers, remove them from router-set + Some(Ok((dead_peer, maybe_resend))) = peer_connections.join_next() => { + peers.remove(&dead_peer); + active_routers.remove(&dead_peer); + match maybe_resend { + None => {}, + Some(km) => { + self_message_tx.send(km).await?; + } + } + } + // 3. periodically attempt to connect to any allowed routers that we + // are not connected to + _ = time::sleep(time::Duration::from_secs(3)) => { + connect_to_routers( + &our, + &our_ip, + &keypair, + &mut active_routers, + &pki, + &mut peers, + &mut peer_connections, + kernel_message_tx.clone(), + ) + .await; + } } - Some((_ip, port)) => { - // direct node: spawn the websocket listener - tokio::spawn(receive_incoming_connections( - our.clone(), - keypair.clone(), - *port, - pki.clone(), - keys.clone(), - peers.clone(), - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - )) + } +} + +async fn connect_to_routers( + our: &Identity, + our_ip: &str, + keypair: &Ed25519KeyPair, + active_routers: &mut HashSet, + pki: &OnchainPKI, + peers: &mut Peers, + peer_connections: &mut JoinSet<(NodeId, Option)>, + kernel_message_tx: MessageSender, +) { + for router in &our.allowed_routers { + if active_routers.contains(router) { + continue; + } + let Some(router_id) = pki.get(router) else { + continue; + }; + match init_connection(our, our_ip, router_id, keypair, None, true).await { + Ok((peer_name, direct_conn)) => { + let (peer_tx, peer_rx) = unbounded_channel::(); + let peer = Arc::new(Peer { + identity: router_id.clone(), + routing_for: false, + sender: peer_tx, + }); + println!("net: connected to router {}\r", peer_name); + peers.insert(peer_name.clone(), peer.clone()); + active_routers.insert(peer_name); + peer_connections.spawn(maintain_connection( + peer, + direct_conn, + peer_rx, + kernel_message_tx.clone(), + )); + } + Err(_e) => continue, + } + } +} + +async fn direct_networking( + our: Identity, + our_ip: String, + tcp: TcpListener, + keypair: Arc, + kernel_message_tx: MessageSender, + network_error_tx: NetworkErrorSender, + print_tx: PrintSender, + self_message_tx: MessageSender, + mut message_rx: MessageReceiver, +) -> Result<()> { + println!("direct_networking\r"); + let mut pki: OnchainPKI = HashMap::new(); + let mut peers: Peers = HashMap::new(); + // mapping from QNS namehash to username + let mut names: PKINames = HashMap::new(); + + let mut peer_connections = JoinSet::<(NodeId, Option)>::new(); + let mut forwarding_connections = JoinSet::<()>::new(); + let mut pending_passthroughs: PendingPassthroughs = HashMap::new(); + + loop { + tokio::select! { + // 1. receive messages from kernel and send out over our connections, + // making new connections as needed + Some(km) = message_rx.recv() => { + // got a message from kernel to send out over the network + let target = &km.target.node; + // if the message is for us, it's either a protocol-level "hello" message, + // or a debugging command issued from our terminal. handle it here: + if target == &our.name { + match handle_local_message( + &our, + &our_ip, + &keypair, + km, + &mut peers, + &mut pki, + &mut peer_connections, + Some(&mut pending_passthroughs), + Some(&forwarding_connections), + None, + &mut names, + &kernel_message_tx, + &print_tx, + ) + .await { + Ok(()) => {}, + Err(e) => { + print_tx.send(Printout { + verbosity: 0, + content: format!("net: error handling local message: {}", e) + }).await?; + } + } + } + // if the message is for a peer we currently have a connection with, + // try to send it to them + else if let Some(peer) = peers.get_mut(target) { + peer.sender.send(km)?; + } + else if let Some(peer_id) = pki.get(target) { + // if the message is for a *direct* peer we don't have a connection with, + // try to establish a connection with them + if peer_id.ws_routing.is_some() { + match init_connection(&our, &our_ip, peer_id, &keypair, None, false).await { + Ok((peer_name, direct_conn)) => { + let (peer_tx, peer_rx) = unbounded_channel::(); + let peer = Arc::new(Peer { + identity: peer_id.clone(), + routing_for: false, + sender: peer_tx, + }); + peers.insert(peer_name, peer.clone()); + peer.sender.send(km)?; + peer_connections.spawn(maintain_connection( + peer, + direct_conn, + peer_rx, + kernel_message_tx.clone(), + )); + } + Err(e) => { + println!("net: error initializing connection: {}\r", e); + error_offline(km, &network_error_tx).await?; + } + } + } + // if the message is for an *indirect* peer we don't have a connection with, + // do some routing: in a randomized order, go through their listed routers + // on chain and try to get one of them to build a proxied connection to + // this node for you + else { + let sent = time::timeout(TIMEOUT, + init_connection_via_router( + &our, + &our_ip, + &keypair, + km.clone(), + peer_id, + &pki, + &names, + &mut peers, + &mut peer_connections, + kernel_message_tx.clone() + )).await; + if !sent.unwrap_or(false) { + // none of the routers worked! + println!("net: error initializing routed connection\r"); + error_offline(km, &network_error_tx).await?; + } + } + } + // peer cannot be found in PKI, throw an offline error + else { + error_offline(km, &network_error_tx).await?; + } + } + // 2. receive incoming TCP connections + Ok((stream, _socket_addr)) = tcp.accept() => { + // TODO we can perform some amount of validation here + // to prevent some amount of potential DDoS attacks. + // can also block based on socket_addr + match accept_async(MaybeTlsStream::Plain(stream)).await { + Ok(websocket) => { + let (peer_id, routing_for, conn) = + match recv_connection( + &our, + &our_ip, + &pki, + &peers, + &mut pending_passthroughs, + &keypair, + websocket).await + { + Ok(res) => res, + Err(e) => { + println!("net: recv_connection failed: {e}\r"); + continue; + } + }; + // if conn is direct, add peer + // if passthrough, add to our forwarding connections joinset + match conn { + Connection::Peer(peer_conn) => { + let (peer_tx, peer_rx) = unbounded_channel::(); + let peer = Arc::new(Peer { + identity: peer_id, + routing_for, + sender: peer_tx, + }); + peers.insert(peer.identity.name.clone(), peer.clone()); + peer_connections.spawn(maintain_connection( + peer, + peer_conn, + peer_rx, + kernel_message_tx.clone(), + )); + } + Connection::Passthrough(passthrough_conn) => { + forwarding_connections.spawn(maintain_passthrough( + passthrough_conn, + )); + } + Connection::PendingPassthrough(pending_conn) => { + pending_passthroughs.insert( + (peer_id.name.clone(), pending_conn.target.clone()), + pending_conn + ); + } + } + } + // ignore connections we failed to accept...? + Err(_) => {} + } + } + // 3. deal with active connections that die by removing the associated peer + Some(Ok((dead_peer, maybe_resend))) = peer_connections.join_next() => { + peers.remove(&dead_peer); + match maybe_resend { + None => {}, + Some(km) => { + self_message_tx.send(km).await?; + } + } + } + } + } +} + +async fn init_connection_via_router( + our: &Identity, + our_ip: &str, + keypair: &Ed25519KeyPair, + km: KernelMessage, + peer_id: &Identity, + pki: &OnchainPKI, + names: &PKINames, + peers: &mut Peers, + peer_connections: &mut JoinSet<(NodeId, Option)>, + kernel_message_tx: MessageSender, +) -> bool { + println!("init_connection_via_router\r"); + let routers_shuffled = { + let mut routers = peer_id.allowed_routers.clone(); + routers.shuffle(&mut rand::thread_rng()); + routers + }; + for router_namehash in &routers_shuffled { + let Some(router_name) = names.get(router_namehash) else { + continue; + }; + let router_id = match pki.get(router_name) { + None => continue, + Some(id) => id, + }; + match init_connection(&our, &our_ip, peer_id, &keypair, Some(router_id), false).await { + Ok((peer_name, direct_conn)) => { + let (peer_tx, peer_rx) = unbounded_channel::(); + let peer = Arc::new(Peer { + identity: peer_id.clone(), + routing_for: false, + sender: peer_tx, + }); + peers.insert(peer_name, peer.clone()); + peer.sender.send(km).unwrap(); + peer_connections.spawn(maintain_connection( + peer, + direct_conn, + peer_rx, + kernel_message_tx.clone(), + )); + return true; + } + Err(_) => continue, + } + } + return false; +} + +async fn maintain_connection( + peer: Arc, + mut conn: PeerConnection, + mut peer_rx: UnboundedReceiver, + kernel_message_tx: MessageSender, + // network_error_tx: NetworkErrorSender, + // print_tx: PrintSender, +) -> (NodeId, Option) { + println!("maintain_connection\r"); + loop { + tokio::select! { + recv_result = recv_uqbar_message(&mut conn) => { + match recv_result { + Ok(km) => { + if km.source.node != peer.identity.name { + println!("net: got message with spoofed source\r"); + return (peer.identity.name.clone(), None) + } + kernel_message_tx.send(km).await.expect("net error: fatal: kernel died"); + } + Err(e) => { + println!("net: error receiving message: {}\r", e); + return (peer.identity.name.clone(), None) + } + } + }, + maybe_recv = peer_rx.recv() => { + match maybe_recv { + Some(km) => { + // TODO error handle + match send_uqbar_message(&km, &mut conn).await { + Ok(()) => continue, + Err(e) => { + println!("net: error sending message: {}\r", e); + return (peer.identity.name.clone(), Some(km)) + } + } + } + None => { + println!("net: peer disconnected\r"); + return (peer.identity.name.clone(), None) + } + } + }, + } + } +} + +/// match the streams +/// TODO optimize performance of this +async fn maintain_passthrough(mut conn: PassthroughConnection) { + println!("maintain_passthrough\r"); + loop { + tokio::select! { + maybe_recv = conn.read_stream_1.next() => { + match maybe_recv { + Some(Ok(msg)) => { + conn.write_stream_2.send(msg).await.expect("net error: fatal: kernel died"); + } + _ => { + println!("net: passthrough broke\r"); + return + } + } + }, + maybe_recv = conn.read_stream_2.next() => { + match maybe_recv { + Some(Ok(msg)) => { + conn.write_stream_1.send(msg).await.expect("net error: fatal: kernel died"); + } + _ => { + println!("net: passthrough broke\r"); + return + } + } + }, + } + } +} + +async fn recv_connection( + our: &Identity, + our_ip: &str, + pki: &OnchainPKI, + peers: &Peers, + pending_passthroughs: &mut PendingPassthroughs, + keypair: &Ed25519KeyPair, + websocket: WebSocketStream>, +) -> Result<(Identity, bool, Connection)> { + println!("recv_connection\r"); + let mut buf = vec![0u8; 65535]; + let (mut noise, our_static_key) = build_responder(); + let (mut write_stream, mut read_stream) = websocket.split(); + + // before we begin XX handshake pattern, check first message over socket + let first_message = &ws_recv(&mut read_stream).await?; + + // if the first message contains a "routing request", + // we see if the target is someone we are actively routing for, + // and create a Passthrough connection if so. + // a Noise 'e' message with have len 32 + if first_message.len() != 32 { + let (their_id, target_name) = validate_routing_request(&our.name, &first_message, pki)?; + let (id, conn) = create_passthrough( + our, + our_ip, + their_id, + target_name, + pki, + peers, + pending_passthroughs, + write_stream, + read_stream, + ) + .await?; + return Ok((id, false, conn)); + } + + // <- e + noise.read_message(first_message, &mut buf)?; + + // -> e, ee, s, es + send_uqbar_handshake( + &our, + keypair, + &our_static_key, + &mut noise, + &mut buf, + &mut write_stream, + false, + ) + .await?; + + // <- s, se + let their_handshake = recv_uqbar_handshake(&mut noise, &mut buf, &mut read_stream).await?; + + // now validate this handshake payload against the QNS PKI + let their_id = pki + .get(&their_handshake.name) + .ok_or(anyhow!("unknown QNS name"))?; + validate_handshake( + &their_handshake, + noise + .get_remote_static() + .ok_or(anyhow!("noise error: missing remote pubkey"))?, + their_id, + )?; + + // Transition the state machine into transport mode now that the handshake is complete. + let noise = noise.into_transport_mode()?; + println!("handshake complete, noise session received\r"); + + // TODO if their handshake indicates they want us to proxy + // for them (aka act as a router for them) we can choose + // whether to do so here. + Ok(( + their_id.clone(), + their_handshake.proxy_request, + Connection::Peer(PeerConnection { + noise, + buf, + write_stream, + read_stream, + }), + )) +} + +async fn recv_connection_via_router( + our: &Identity, + our_ip: &str, + their_name: &str, + pki: &OnchainPKI, + keypair: &Ed25519KeyPair, + router: &Identity, +) -> Result<(Identity, PeerConnection)> { + println!("recv_connection_via_router\r"); + let mut buf = vec![0u8; 65535]; + let (mut noise, our_static_key) = build_responder(); + + let Some((ref ip, ref port)) = router.ws_routing else { + return Err(anyhow!("router has no routing information")); + }; + let Ok(ws_url) = make_ws_url(our_ip, ip, port) else { + return Err(anyhow!("failed to parse websocket url")); + }; + let Ok(Ok((websocket, _response))) = time::timeout(TIMEOUT, connect_async(ws_url)).await + else { + return Err(anyhow!("failed to connect to target")); + }; + let (mut write_stream, mut read_stream) = websocket.split(); + + // before beginning XX handshake pattern, send a routing request + let message = bincode::serialize(&RoutingRequest { + source: our.name.clone(), + signature: keypair + .sign([their_name, router.name.as_str()].concat().as_bytes()) + .as_ref() + .to_vec(), + target: their_name.to_string(), + protocol_version: 1, + })?; + ws_send(&mut write_stream, &message).await?; + + // <- e + noise.read_message(&ws_recv(&mut read_stream).await?, &mut buf)?; + + // -> e, ee, s, es + send_uqbar_handshake( + &our, + keypair, + &our_static_key, + &mut noise, + &mut buf, + &mut write_stream, + false, + ) + .await?; + + // <- s, se + let their_handshake = recv_uqbar_handshake(&mut noise, &mut buf, &mut read_stream).await?; + + // now validate this handshake payload against the QNS PKI + let their_id = pki + .get(&their_handshake.name) + .ok_or(anyhow!("unknown QNS name"))?; + validate_handshake( + &their_handshake, + noise + .get_remote_static() + .ok_or(anyhow!("noise error: missing remote pubkey"))?, + their_id, + )?; + + // Transition the state machine into transport mode now that the handshake is complete. + let noise = noise.into_transport_mode()?; + println!("handshake complete, noise session received\r"); + + Ok(( + their_id.clone(), + PeerConnection { + noise, + buf, + write_stream, + read_stream, + }, + )) +} + +async fn init_connection( + our: &Identity, + our_ip: &str, + peer_id: &Identity, + keypair: &Ed25519KeyPair, + use_router: Option<&Identity>, + proxy_request: bool, +) -> Result<(String, PeerConnection)> { + println!("init_connection\r"); + let mut buf = vec![0u8; 65535]; + let (mut noise, our_static_key) = build_initiator(); + + let (mut write_stream, mut read_stream) = match use_router { + None => { + let Some((ref ip, ref port)) = peer_id.ws_routing else { + return Err(anyhow!("target has no routing information")); + }; + let Ok(ws_url) = make_ws_url(our_ip, ip, port) else { + return Err(anyhow!("failed to parse websocket url")); + }; + let Ok(Ok((websocket, _response))) = time::timeout(TIMEOUT, connect_async(ws_url)).await + else { + return Err(anyhow!("failed to connect to target")); + }; + websocket.split() + } + Some(router_id) => { + let Some((ref ip, ref port)) = router_id.ws_routing else { + return Err(anyhow!("router has no routing information")); + }; + let Ok(ws_url) = make_ws_url(our_ip, ip, port) else { + return Err(anyhow!("failed to parse websocket url")); + }; + let Ok(Ok((websocket, _response))) = time::timeout(TIMEOUT, connect_async(ws_url)).await + else { + return Err(anyhow!("failed to connect to target")); + }; + websocket.split() } }; - let _ = tokio::join!(listener, async { - while let Some(km) = message_rx.recv().await { - // got a message from kernel to send out over the network - let target = &km.target.node; - // if the message is for us, it's either a protocol-level "hello" message, - // or a debugging command issued from our terminal. handle it here: - if target == &our.name { - handle_incoming_message( - &our, - km, - peers.clone(), - keys.clone(), - pki.clone(), - names.clone(), - kernel_message_tx.clone(), - print_tx.clone(), + // if this is a routed request, before starting XX handshake pattern, send a + // routing request message over socket + if use_router.is_some() { + let message = bincode::serialize(&RoutingRequest { + source: our.name.clone(), + signature: keypair + .sign( + [&peer_id.name, use_router.unwrap().name.as_str()] + .concat() + .as_bytes(), ) - .await; - continue; - } - let peers_read = peers.read().await; - // - // we have the target as an active peer, meaning we can send the message directly - // - if let Some(peer) = peers_read.get(target) { - // println!("net: direct send to known peer\r"); - match peer.sender.send((PeerMessage::Raw(km.clone()), None)) { - Ok(_) => continue, - Err(_) => { - // println!("net: failed to send to known peer\r"); - drop(peers_read); - peers.write().await.remove(target); - error_offline(km, &network_error_tx).await; - continue; - } - } - } - drop(peers_read); - // - // we don't have the target as a peer yet, but we have shaken hands with them - // before, and can try to reuse that shared secret to send a message. - // first, we'll need to open a websocket and create a Peer struct for them. - // - if let Some((peer_id, secret)) = keys.read().await.get(target).cloned() { - // - // we can establish a connection directly with this peer - // - if let Some((ref ip, ref port)) = peer_id.ws_routing { - // println!("net: creating direct connection to peer with known keys\r"); - let Ok(ws_url) = make_ws_url(&our_ip, ip, port) else { - error_offline(km, &network_error_tx).await; - continue; - }; - let Ok(Ok((websocket, _response))) = - timeout(TIMEOUT, connect_async(ws_url)).await - else { - error_offline(km, &network_error_tx).await; - continue; - }; - let (socket_tx, conn_handle) = build_connection( - our.clone(), - keypair.clone(), - pki.clone(), - keys.clone(), - peers.clone(), - websocket, - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - None, - ) - .await; - let new_peer = create_new_peer( - our.clone(), - peer_id.clone(), - peers.clone(), - keys.clone(), - secret, - socket_tx.clone(), - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - ); - let (temp_result_tx, mut temp_result_rx) = unbounded_channel::(); - let _ = new_peer - .sender - .send((PeerMessage::Raw(km.clone()), Some(temp_result_tx))); - match timeout(TIMEOUT, temp_result_rx.recv()).await { - Ok(Some(Ok(NetworkMessage::Ack(_id)))) => { - peers.write().await.insert(peer_id.name.clone(), new_peer); - continue; - } - _ => { - // instead of throwing Offline now, throw away their keys and - // try again. - keys.write().await.remove(target); - conn_handle.abort(); - } - } - } - // - // need to find a router that will connect to this peer! - // - else { - // println!("net: finding router for peer with known keys\r"); - // get their ID from PKI so we have their most up-to-date router list - let Some(peer_id) = pki.read().await.get(target).cloned() else { - // this target cannot be found in the PKI! - // throw an Offline error. - error_offline(km, &network_error_tx).await; - continue; - }; - let mut success = false; - for router_namehash in &peer_id.allowed_routers { - let km = km.clone(); - let Some(router_name) = names.read().await.get(router_namehash).cloned() - else { - continue; - }; - let Some(router_id) = pki.read().await.get(&router_name).cloned() else { - continue; - }; - let Some((ref ip, ref port)) = router_id.ws_routing else { - continue; - }; - // - // otherwise, attempt to connect to the router's IP+port and send through that - // - // if we already have this router as a peer, use that socket_tx - let (socket_tx, maybe_conn_handle) = - if let Some(router) = peers.read().await.get(&router_name) { - (router.socket_tx.clone(), None) - } else { - let Ok(ws_url) = make_ws_url(&our_ip, ip, port) else { - continue; - }; - let Ok(Ok((websocket, _response))) = - timeout(TIMEOUT, connect_async(ws_url)).await - else { - continue; - }; - let (socket_tx, conn_handle) = build_connection( - our.clone(), - keypair.clone(), - pki.clone(), - keys.clone(), - peers.clone(), - websocket, - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - None, - ) - .await; - (socket_tx, Some(conn_handle)) - }; - let new_peer = create_new_peer( - our.clone(), - peer_id.clone(), - peers.clone(), - keys.clone(), - secret.clone(), - socket_tx.clone(), - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - ); - let (temp_result_tx, mut temp_result_rx) = - unbounded_channel::(); - let _ = new_peer - .sender - .send((PeerMessage::Raw(km.clone()), Some(temp_result_tx))); - match timeout(TIMEOUT, temp_result_rx.recv()).await { - Ok(Some(Ok(NetworkMessage::Ack(_id)))) => { - peers.write().await.insert(peer_id.name.clone(), new_peer); - success = true; - break; - } - _ => { - if let Some(conn_handle) = maybe_conn_handle { - conn_handle.abort(); - } - continue; - } - } - } - if !success { - // instead of throwing Offline now, throw away their keys and - // try again. - keys.write().await.remove(target); - } - } - } - // - // sending a message to a peer for which we don't have active networking info. - // this means that we need to search the PKI for the peer, and then attempt to - // exchange handshakes with them. - // - let Some(peer_id) = pki.read().await.get(target).cloned() else { - // this target cannot be found in the PKI! - // throw an Offline error. - error_offline(km, &network_error_tx).await; - continue; - }; - // - // we can establish a connection directly with this peer, then send a handshake - // - if let Some((ref ip, ref port)) = peer_id.ws_routing { - // println!("net: creating direct connection to peer in PKI\r"); - let Ok(ws_url) = make_ws_url(&our_ip, ip, port) else { - error_offline(km, &network_error_tx).await; - continue; - }; - let Ok(Ok((websocket, _response))) = timeout(TIMEOUT, connect_async(ws_url)).await - else { - error_offline(km, &network_error_tx).await; - continue; - }; - let (socket_tx, conn_handle) = build_connection( - our.clone(), - keypair.clone(), - pki.clone(), - keys.clone(), - peers.clone(), - websocket, - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - None, - ) - .await; - let (secret, handshake) = - make_secret_and_handshake(&our, keypair.clone(), target, None); - let (handshake_tx, mut handshake_rx) = unbounded_channel::(); - socket_tx - .send((NetworkMessage::Handshake(handshake), Some(handshake_tx))) - .unwrap(); - let response_shake = match timeout(TIMEOUT, handshake_rx.recv()).await { - Ok(Some(Ok(NetworkMessage::HandshakeAck(shake)))) => shake, - _ => { - // println!("net: failed handshake with {target}\r"); - error_offline(km, &network_error_tx).await; - conn_handle.abort(); - continue; - } - }; - let Ok(their_ephemeral_pk) = validate_handshake(&response_shake, &peer_id) else { - // println!("net: failed handshake with {target}\r"); - error_offline(km, &network_error_tx).await; - conn_handle.abort(); - continue; - }; - let secret = Arc::new(secret.diffie_hellman(&their_ephemeral_pk)); - // save the handshake to our Keys map - keys.write() - .await - .insert(peer_id.name.clone(), (peer_id.clone(), secret.clone())); - let new_peer = create_new_peer( - our.clone(), - peer_id.clone(), - peers.clone(), - keys.clone(), - secret, - socket_tx.clone(), - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - ); - // can't do a self_tx.send here because we need to maintain ordering of messages - // already queued. - let (temp_result_tx, mut temp_result_rx) = unbounded_channel::(); - let _ = new_peer - .sender - .send((PeerMessage::Raw(km.clone()), Some(temp_result_tx))); - match timeout(TIMEOUT, temp_result_rx.recv()).await { - Ok(Some(Ok(NetworkMessage::Ack(_id)))) => { - peers.write().await.insert(peer_id.name.clone(), new_peer); - continue; - } - _ => { - // instead of throwing Offline now, throw away their keys and - // try again. - keys.write().await.remove(target); - conn_handle.abort(); - } - } - } - // - // need to find a router that will connect to this peer, then do a handshake - // - else { - // println!("net: looking for router to create connection to peer in PKI\r"); - let Some(peer_id) = pki.read().await.get(target).cloned() else { - // this target cannot be found in the PKI! - // throw an Offline error. - error_offline(km, &network_error_tx).await; - continue; - }; - let mut success = false; - for router_namehash in &peer_id.allowed_routers { - let km = km.clone(); - let Some(router_name) = names.read().await.get(router_namehash).cloned() else { - continue; - }; - if router_name == our.name { - // don't try to connect to ourselves! - continue; - } - let Some(router_id) = pki.read().await.get(&router_name).cloned() else { - continue; - }; - let Some((ref ip, ref port)) = router_id.ws_routing else { - continue; - }; - // - // attempt to connect to the router's IP+port and send through that - // if we already have this router as a peer, use that socket_tx - let (socket_tx, maybe_conn_handle) = - if let Some(router) = peers.read().await.get(&router_name) { - (router.socket_tx.clone(), None) - } else { - let Ok(ws_url) = make_ws_url(&our_ip, ip, port) else { - continue; - }; - let Ok(Ok((websocket, _response))) = - timeout(TIMEOUT, connect_async(ws_url)).await - else { - continue; - }; - let (socket_tx, conn_handle) = build_connection( - our.clone(), - keypair.clone(), - pki.clone(), - keys.clone(), - peers.clone(), - websocket, - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - None, - ) - .await; - (socket_tx, Some(conn_handle)) - }; - let (secret, handshake) = - make_secret_and_handshake(&our, keypair.clone(), target, None); - let (handshake_tx, mut handshake_rx) = unbounded_channel::(); - socket_tx - .send((NetworkMessage::Handshake(handshake), Some(handshake_tx))) - .unwrap(); - let response_shake = match timeout(TIMEOUT, handshake_rx.recv()).await { - Ok(Some(Ok(NetworkMessage::HandshakeAck(shake)))) => shake, - _ => { - if let Some(conn_handle) = maybe_conn_handle { - conn_handle.abort(); - } - continue; - } - }; - let Ok(their_ephemeral_pk) = validate_handshake(&response_shake, &peer_id) - else { - if let Some(conn_handle) = maybe_conn_handle { - conn_handle.abort(); - } - continue; - }; - let secret = Arc::new(secret.diffie_hellman(&their_ephemeral_pk)); - // save the handshake to our Keys map - keys.write() - .await - .insert(peer_id.name.clone(), (peer_id.clone(), secret.clone())); - let new_peer = create_new_peer( - our.clone(), - peer_id.clone(), - peers.clone(), - keys.clone(), - secret, - socket_tx.clone(), - kernel_message_tx.clone(), - message_tx.clone(), - network_error_tx.clone(), - ); - let (temp_result_tx, mut temp_result_rx) = unbounded_channel::(); - let _ = new_peer - .sender - .send((PeerMessage::Raw(km.clone()), Some(temp_result_tx))); - match timeout(TIMEOUT, temp_result_rx.recv()).await { - Ok(Some(Ok(NetworkMessage::Ack(_id)))) => { - peers.write().await.insert(peer_id.name.clone(), new_peer); - success = true; - break; - } - _ => { - if let Some(conn_handle) = maybe_conn_handle { - conn_handle.abort(); - } - continue; - } - } - } - if !success { - error_offline(km, &network_error_tx).await; - continue; - } - } - } - }); - Err(anyhow::anyhow!("networking task exited")) -} - -async fn error_offline(km: KernelMessage, network_error_tx: &NetworkErrorSender) { - let _ = network_error_tx - .send(WrappedSendError { - id: km.id, - source: km.source, - error: SendError { - kind: SendErrorKind::Offline, - target: km.target, - message: km.message, - payload: km.payload, - }, - }) - .await; -} - -/// Only used if an indirect node. -async fn connect_to_routers( - our: Identity, - keypair: Arc, - our_ip: String, - pki: OnchainPKI, - keys: PeerKeys, - peers: Peers, - kernel_message_tx: MessageSender, - net_message_tx: MessageSender, - network_error_tx: NetworkErrorSender, - print_tx: PrintSender, -) { - // as soon as we boot, need to try and connect to all of our allowed_routers - // we can "throw away" routers that have a bad URL setup - // - // any time we *lose* a router, we need to try and reconnect on a loop - // we should always be trying to connect to all "good" routers we don't already have - - let (routers_to_try_tx, mut routers_to_try_rx) = unbounded_channel::(); - let mut active_routers = JoinSet::, tokio::task::JoinError>>::new(); - - // at start, add all our routers to list - for router_name in our.allowed_routers.clone() { - routers_to_try_tx.send(router_name).unwrap(); + .as_ref() + .to_vec(), + target: peer_id.name.clone(), + protocol_version: 1, + })?; + ws_send(&mut write_stream, &message).await?; } - loop { - // we sleep here in order not to BLAST routers with connections - // if we have a PKI mismatch with them for some amount of time. - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - tokio::select! { - Some(Ok(Ok(Some(dead_router)))) = active_routers.join_next() => { - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("net: connection to router {dead_router} died"), - }) - .await; - peers.write().await.remove(&dead_router); - if active_routers.is_empty() { - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("net: no working routers, we are offline!"), - }) - .await; - } - let _ = routers_to_try_tx.send(dead_router); - } - Some(router_name) = routers_to_try_rx.recv() => { - if peers.read().await.contains_key(&router_name) { - continue; - } - let Some(router_id) = pki.read().await.get(&router_name).cloned() else { - let _ = routers_to_try_tx.send(router_name); - continue; - }; - let Some((ref ip, ref port)) = router_id.ws_routing else { - // this is a bad router, can remove from our list - continue; - }; - let Ok(ws_url) = make_ws_url(&our_ip, ip, port) else { - // this is a bad router, can remove from our list - continue; - }; - let Ok(Ok((websocket, _response))) = timeout(TIMEOUT, connect_async(ws_url)).await else { - let _ = routers_to_try_tx.send(router_name); - continue; - }; - // we never try to reuse keys with routers because we need to make - // sure we have a live connection with them. - // connect to their websocket and then send a handshake. - // save the handshake in our keys map, then save them as an active peer. - let (socket_tx, conn_handle) = build_connection( - our.clone(), - keypair.clone(), - pki.clone(), - keys.clone(), - peers.clone(), - websocket, - kernel_message_tx.clone(), - net_message_tx.clone(), - network_error_tx.clone(), - Some(router_name.clone()), - ) - .await; - let (secret, handshake) = - make_secret_and_handshake(&our, keypair.clone(), &router_name, None); - let (handshake_tx, mut handshake_rx) = unbounded_channel::(); - socket_tx - .send((NetworkMessage::Handshake(handshake), Some(handshake_tx))) - .unwrap(); - let response_shake = match timeout(TIMEOUT, handshake_rx.recv()).await { - Ok(Some(Ok(NetworkMessage::HandshakeAck(shake)))) => shake, - _ => { - // println!("net: failed handshake with {router_name}\r"); - conn_handle.abort(); - let _ = routers_to_try_tx.send(router_name); - continue; - } - }; - let Ok(their_ephemeral_pk) = validate_handshake(&response_shake, &router_id) else { - // println!("net: failed handshake with {router_name}\r"); - conn_handle.abort(); - let _ = routers_to_try_tx.send(router_name); - continue; - }; - let secret = Arc::new(secret.diffie_hellman(&their_ephemeral_pk)); - // save the handshake to our Keys map - keys.write().await.insert( - router_id.name.clone(), - (router_id.clone(), secret.clone()), - ); - let new_peer = create_new_peer( - our.clone(), - router_id.clone(), - peers.clone(), - keys.clone(), - secret, - socket_tx.clone(), - kernel_message_tx.clone(), - net_message_tx.clone(), - network_error_tx.clone(), - ); - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("net: connected to router {router_name}"), - }) - .await; - peers.write().await.insert(router_id.name.clone(), new_peer); - active_routers.spawn(conn_handle); - } - } - } + // -> e + let len = noise.write_message(&[], &mut buf)?; + ws_send(&mut write_stream, &buf[..len]).await?; + + // <- e, ee, s, es + let their_handshake = recv_uqbar_handshake(&mut noise, &mut buf, &mut read_stream).await?; + + // now validate this handshake payload against the QNS PKI + validate_handshake( + &their_handshake, + noise + .get_remote_static() + .ok_or(anyhow!("noise error: missing remote pubkey"))?, + peer_id, + )?; + + // -> s, se + send_uqbar_handshake( + &our, + keypair, + &our_static_key, + &mut noise, + &mut buf, + &mut write_stream, + proxy_request, + ) + .await?; + + let noise = noise.into_transport_mode()?; + println!("handshake complete, noise session initiated\r"); + + Ok(( + their_handshake.name, + PeerConnection { + noise, + buf, + write_stream, + read_stream, + }, + )) } -/// only used if direct. should live forever -async fn receive_incoming_connections( - our: Identity, - keypair: Arc, - port: u16, - pki: OnchainPKI, - keys: PeerKeys, - peers: Peers, - kernel_message_tx: MessageSender, - net_message_tx: MessageSender, - network_error_tx: NetworkErrorSender, -) { - let tcp = TcpListener::bind(format!("0.0.0.0:{}", port)) - .await - .expect(format!("net: fatal error: can't listen on port {port}. change port onchain or free up this port!").as_str()); - - while let Ok((stream, _socket_addr)) = tcp.accept().await { - // TODO we can perform some amount of validation here - // to prevent some amount of potential DDoS attacks. - // can block based on socket_addr, but not QNS ID. - match accept_async(MaybeTlsStream::Plain(stream)).await { - Ok(websocket) => { - // println!("received incoming connection\r"); - tokio::spawn(build_connection( - our.clone(), - keypair.clone(), - pki.clone(), - keys.clone(), - peers.clone(), - websocket, - kernel_message_tx.clone(), - net_message_tx.clone(), - network_error_tx.clone(), - None, - )); - } - // ignore connections we failed to accept - Err(_) => {} - } - } -} - -/// net module only handles requests, will never return a response -async fn handle_incoming_message( +/// net module only handles incoming local requests, will never return a response +async fn handle_local_message( our: &Identity, + our_ip: &str, + keypair: &Ed25519KeyPair, km: KernelMessage, - peers: Peers, - keys: PeerKeys, - pki: OnchainPKI, - names: PKINames, - kernel_message_tx: MessageSender, - print_tx: PrintSender, -) { + peers: &mut Peers, + pki: &mut OnchainPKI, + peer_connections: &mut JoinSet<(NodeId, Option)>, + pending_passthroughs: Option<&mut PendingPassthroughs>, + forwarding_connections: Option<&JoinSet<()>>, + active_routers: Option<&HashSet>, + names: &mut PKINames, + kernel_message_tx: &MessageSender, + print_tx: &PrintSender, +) -> Result<()> { + println!("handle_local_message\r"); let ipc = match km.message { - Message::Response(_) => return, Message::Request(request) => request.ipc, + Message::Response((response, _context)) => { + // these are received as a router, when we send ConnectionRequests + // to a node we do routing for. + match serde_json::from_slice::(&response.ipc)? { + NetResponses::Attempting(_) => { + // TODO anything here? + } + NetResponses::Rejected(to) => { + // drop from our pending map + // this will drop the socket, causing initiator to see it as failed + pending_passthroughs + .ok_or(anyhow!("got net response as non-router"))? + .remove(&(to, km.source.node)); + } + } + return Ok(()); + } }; if km.source.node != our.name { + if let Ok(act) = serde_json::from_slice::(&ipc) { + match act { + NetActions::QnsBatchUpdate(_) | NetActions::QnsUpdate(_) => { + // for now, we don't get these from remote. + } + NetActions::ConnectionRequest(from) => { + // someone wants to open a passthrough with us through a router! + // if we are an indirect node, and source is one of our routers, + // respond by attempting to init a matching passthrough. + // TODO can discriminate more here.. + if our.allowed_routers.contains(&km.source.node) { + let Ok((peer_id, peer_conn)) = time::timeout(TIMEOUT, + recv_connection_via_router( + our, + our_ip, + &from, + pki, + keypair, + &peers + .get(&km.source.node) + .ok_or(anyhow!("unknown router"))? + .identity, + )).await? else { + return Err(anyhow!("someone tried to connect to us but it timed out")) + }; + let (peer_tx, peer_rx) = unbounded_channel::(); + let peer = Arc::new(Peer { + identity: peer_id, + routing_for: false, + sender: peer_tx, + }); + peers.insert(peer.identity.name.clone(), peer.clone()); + peer_connections.spawn(maintain_connection( + peer, + peer_conn, + peer_rx, + kernel_message_tx.clone(), + )); + } else { + kernel_message_tx + .send(KernelMessage { + id: km.id, + source: Address { + node: our.name.clone(), + process: ProcessId::from_str("net:sys:uqbar").unwrap(), + }, + target: km.rsvp.unwrap_or(km.source), + rsvp: None, + message: Message::Response(( + Response { + inherit: false, + ipc: serde_json::to_vec(&NetResponses::Rejected(from))?, + metadata: None, + }, + None, + )), + payload: None, + signed_capabilities: None, + }) + .await?; + } + } + } + return Ok(()); + }; + // if we can't parse this to a netaction, treat it as a hello and print it // respond to a text message with a simple "delivered" response - let _ = print_tx + print_tx .send(Printout { verbosity: 0, content: format!( @@ -705,8 +995,8 @@ async fn handle_incoming_message( std::str::from_utf8(&ipc).unwrap_or("!!message parse error!!") ), }) - .await; - let _ = kernel_message_tx + .await?; + kernel_message_tx .send(KernelMessage { id: km.id, source: Address { @@ -726,67 +1016,121 @@ async fn handle_incoming_message( payload: None, signed_capabilities: None, }) - .await; + .await?; + Ok(()) } else { - // available commands: "peers", "QnsUpdate" (see qns_indexer module) + // available commands: "peers", "pki", "names", "diagnostics" // first parse as raw string, then deserialize to NetActions object match std::str::from_utf8(&ipc) { Ok("peers") => { - let peer_read = peers.read().await; - let _ = print_tx + print_tx .send(Printout { verbosity: 0, - content: format!("{:?}", peer_read.keys()), + content: format!("{:#?}", peers.keys()), }) - .await; - } - Ok("keys") => { - let keys_read = keys.read().await; - let _ = print_tx - .send(Printout { - verbosity: 0, - content: format!("{:?}", keys_read.keys()), - }) - .await; + .await?; } Ok("pki") => { - let pki_read = pki.read().await; - let _ = print_tx + print_tx .send(Printout { verbosity: 0, - content: format!("{:?}", pki_read), + content: format!("{:#?}", pki), }) - .await; + .await?; } Ok("names") => { - let names_read = names.read().await; - let _ = print_tx + print_tx .send(Printout { verbosity: 0, - content: format!("{:?}", names_read), + content: format!("{:#?}", names), }) - .await; + .await?; + } + Ok("diagnostics") => { + print_tx + .send(Printout { + verbosity: 0, + content: format!("our Identity: {:#?}", our), + }) + .await?; + print_tx + .send(Printout { + verbosity: 0, + content: format!("we have connections with peers: {:#?}", peers.keys()), + }) + .await?; + print_tx + .send(Printout { + verbosity: 0, + content: format!("we have {} entries in the PKI", pki.len()), + }) + .await?; + print_tx + .send(Printout { + verbosity: 0, + content: format!( + "we have {} open peer connections", + peer_connections.len() + ), + }) + .await?; + if pending_passthroughs.is_some() { + print_tx + .send(Printout { + verbosity: 0, + content: format!( + "we have {} pending passthrough connections", + pending_passthroughs.unwrap().len() + ), + }) + .await?; + } + if forwarding_connections.is_some() { + print_tx + .send(Printout { + verbosity: 0, + content: format!( + "we have {} open passthrough connections", + forwarding_connections.unwrap().len() + ), + }) + .await?; + } + if active_routers.is_some() { + print_tx + .send(Printout { + verbosity: 0, + content: format!( + "we have {} active routers", + active_routers.unwrap().len() + ), + }) + .await?; + } } _ => { let Ok(act) = serde_json::from_slice::(&ipc) else { - let _ = print_tx + print_tx .send(Printout { verbosity: 0, content: "net: got unknown command".into(), }) - .await; - return; + .await?; + return Ok(()); }; match act { + NetActions::ConnectionRequest(_) => { + // we shouldn't receive these from ourselves. + } NetActions::QnsUpdate(log) => { - let _ = print_tx + print_tx .send(Printout { verbosity: 1, content: format!("net: got QNS update for {}", log.name), }) - .await; + .await?; - let _ = pki.write().await.insert( + pki.insert( log.name.clone(), Identity { name: log.name.clone(), @@ -799,10 +1143,10 @@ async fn handle_incoming_message( allowed_routers: log.routers, }, ); - let _ = names.write().await.insert(log.node, log.name); + names.insert(log.node, log.name); } NetActions::QnsBatchUpdate(log_list) => { - let _ = print_tx + print_tx .send(Printout { verbosity: 1, content: format!( @@ -810,9 +1154,9 @@ async fn handle_incoming_message( log_list.len() ), }) - .await; + .await?; for log in log_list { - let _ = pki.write().await.insert( + pki.insert( log.name.clone(), Identity { name: log.name.clone(), @@ -826,125 +1170,12 @@ async fn handle_incoming_message( allowed_routers: log.routers, }, ); - let _ = names.write().await.insert(log.node, log.name); + names.insert(log.node, log.name); } } } } } - } -} - -/* - * networking utils - */ - -fn make_ws_url(our_ip: &str, ip: &str, port: &u16) -> Result { - // if we have the same public IP as target, route locally, - // otherwise they will appear offline due to loopback stuff - let ip = if our_ip == ip { "localhost" } else { ip }; - match url::Url::parse(&format!("ws://{}:{}/ws", ip, port)) { - Ok(v) => Ok(v), - Err(_) => Err(SendErrorKind::Offline), - } -} - -/* - * handshake utils - */ - -/// take in handshake and PKI identity, and confirm that the handshake is valid. -/// takes in optional nonce, which must be the one that connection initiator created. -fn validate_handshake( - handshake: &Handshake, - their_id: &Identity, -) -> Result>, String> { - let their_networking_key = signature::UnparsedPublicKey::new( - &signature::ED25519, - hex::decode(&strip_0x(&their_id.networking_key)) - .map_err(|_| "failed to decode networking key")?, - ); - - if !(their_networking_key - .verify( - // TODO use language-neutral serialization here too - &bincode::serialize(their_id).map_err(|_| "failed to serialize their identity")?, - &handshake.id_signature, - ) - .is_ok() - && their_networking_key - .verify( - &handshake.ephemeral_public_key, - &handshake.ephemeral_public_key_signature, - ) - .is_ok()) - { - // improper signatures on identity info, close connection - return Err("got improperly signed networking info".into()); - } - - match PublicKey::::from_sec1_bytes(&handshake.ephemeral_public_key) { - Ok(v) => return Ok(Arc::new(v)), - Err(_) => return Err("error".into()), - }; -} - -/// given an identity and networking key-pair, produces a handshake message along -/// with an ephemeral secret to be used in a specific connection. -fn make_secret_and_handshake( - our: &Identity, - keypair: Arc, - target: &str, - id: Option, -) -> (Arc>, Handshake) { - // produce ephemeral keys for DH exchange and subsequent symmetric encryption - let ephemeral_secret = Arc::new(EphemeralSecret::::random( - &mut rand::rngs::OsRng, - )); - let ephemeral_public_key = ephemeral_secret.public_key(); - // sign the ephemeral public key with our networking management key - let signed_pk = keypair - .sign(&ephemeral_public_key.to_sec1_bytes()) - .as_ref() - .to_vec(); - - // before signing our identity, convert router names to namehashes - // to match the exact onchain representation of our identity - let mut our_onchain_id = our.clone(); - our_onchain_id.allowed_routers = our - .allowed_routers - .clone() - .into_iter() - .map(|namehash| { - let hash = crate::namehash(&namehash); - let mut result = [0u8; 32]; - result.copy_from_slice(hash.as_bytes()); - format!("0x{}", hex::encode(result)) - }) - .collect(); - - // TODO use language-neutral serialization here too - let signed_id = keypair - .sign(&bincode::serialize(&our_onchain_id).unwrap()) - .as_ref() - .to_vec(); - - let handshake = Handshake { - id: id.unwrap_or(rand::random()), - from: our.name.clone(), - target: target.to_string(), - id_signature: signed_id, - ephemeral_public_key: ephemeral_public_key.to_sec1_bytes().to_vec(), - ephemeral_public_key_signature: signed_pk, - }; - - (ephemeral_secret, handshake) -} - -fn strip_0x(s: &str) -> String { - if s.starts_with("0x") { - s[2..].to_string() - } else { - s.to_string() + Ok(()) } } diff --git a/src/net/types.rs b/src/net/types.rs index 251c40f2..4141e718 100644 --- a/src/net/types.rs +++ b/src/net/types.rs @@ -1,71 +1,108 @@ use crate::types::*; -use anyhow::Result; -use elliptic_curve::ecdh::SharedSecret; -use ethers::prelude::k256::Secp256k1; +use futures::stream::{SplitSink, SplitStream}; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, sync::Arc}; use tokio::net::TcpStream; -use tokio::sync::{mpsc, RwLock}; -use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; +use tokio::sync::mpsc::UnboundedSender; +use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; -pub type PeerKeys = Arc>)>>>; -pub type Peers = Arc>>; -pub type WebSocket = WebSocketStream>; -pub type MessageResult = Result; -pub type ErrorShuttle = mpsc::UnboundedSender; +/// Sent to a node when you want to connect directly to them. +/// Sent in the 'e, ee, s, es' and 's, se' phases of XX noise protocol pattern. +#[derive(Debug, Deserialize, Serialize)] +pub struct HandshakePayload { + pub name: NodeId, + // signature is created by their networking key, of their static key + // someone could reuse this signature, but then they will be unable + // to encrypt messages to us. + pub signature: Vec, + /// Set to true when you want them to act as a router for you, sending + /// messages from potentially many remote sources over this connection, + /// including from the router itself. + /// This is not relevant in a handshake sent from the receiver side. + pub proxy_request: bool, + pub protocol_version: u8, +} + +/// Sent to a node when you want them to connect you to an indirect node. +/// If the receiver of the request has an open connection to your target, +/// and is willing, they will send a message to the target prompting them +/// to build the other side of the connection, at which point they will +/// hold open a Passthrough for you two. +/// +/// Alternatively, if the receiver does not have an open connection but the +/// target is a direct node, they can create a Passthrough for you two if +/// they are willing to proxy for you. +/// +/// Sent in the 'e' phase of XX noise protocol pattern. +#[derive(Debug, Deserialize, Serialize)] +pub struct RoutingRequest { + pub source: NodeId, + // signature is created by their networking key, of the [target, router name].concat() + // someone could reuse this signature, and TODO need to make sure that's useless. + pub signature: Vec, + pub target: NodeId, + pub protocol_version: u8, +} + +pub enum Connection { + Peer(PeerConnection), + Passthrough(PassthroughConnection), + PendingPassthrough(PendingPassthroughConnection), +} + +pub struct PeerConnection { + pub noise: snow::TransportState, + pub buf: Vec, + pub write_stream: SplitSink>, tungstenite::Message>, + pub read_stream: SplitStream>>, +} + +pub struct PassthroughConnection { + pub write_stream_1: SplitSink>, tungstenite::Message>, + pub read_stream_1: SplitStream>>, + pub write_stream_2: SplitSink>, tungstenite::Message>, + pub read_stream_2: SplitStream>>, +} + +pub struct PendingPassthroughConnection { + pub target: NodeId, + pub write_stream: SplitSink>, tungstenite::Message>, + pub read_stream: SplitStream>>, +} + +// TODO upgrade from hashmaps +pub type Peers = HashMap>; +pub type PKINames = HashMap; // TODO maybe U256 to String +pub type OnchainPKI = HashMap; +pub type PendingPassthroughs = HashMap<(NodeId, NodeId), PendingPassthroughConnection>; -/// stored in mapping by their username pub struct Peer { pub identity: Identity, - // send messages here to have them encrypted and sent across an active connection - pub sender: mpsc::UnboundedSender<(PeerMessage, Option)>, - // send encrypted messages from this peer here to have them decrypted and sent to kernel - pub decrypter: mpsc::UnboundedSender<(Vec, ErrorShuttle)>, - pub socket_tx: mpsc::UnboundedSender<(NetworkMessage, Option)>, -} - -/// parsed from Binary websocket message -/// TODO add a version number somewhere in the serialized format!! -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum NetworkMessage { - Ack(u64), - Nack(u64), - Msg { - id: u64, - from: String, - to: String, - contents: Vec, - }, - Handshake(Handshake), - HandshakeAck(Handshake), - // only used in implementation, not part of protocol - Ping, - Pong, -} - -pub enum PeerMessage { - Raw(KernelMessage), - Net(NetworkMessage), -} - -/// contains identity and encryption keys, used in initial handshake. -/// parsed from Text websocket message -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Handshake { - pub id: u64, - pub from: String, - pub target: String, - pub id_signature: Vec, - pub ephemeral_public_key: Vec, - pub ephemeral_public_key_signature: Vec, + /// If true, we are routing for them and have a RoutingClientConnection + /// associated with them. We can send them prompts to establish Passthroughs. + pub routing_for: bool, + pub sender: UnboundedSender, } #[derive(Clone, Debug, Serialize, Deserialize)] pub enum NetActions { + /// Received from a router of ours when they have a new pending passthrough for us. + /// We should respond (if we desire) by using them to initialize a routed connection + /// with the NodeId given. + ConnectionRequest(NodeId), + /// can only receive from trusted source, for now just ourselves locally, + /// in the future could get from remote provider QnsUpdate(QnsUpdate), QnsBatchUpdate(Vec), } +/// For now, only sent in response to a ConnectionRequest +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetResponses { + Attempting(NodeId), + Rejected(NodeId), +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct QnsUpdate { pub name: String, // actual username / domain name diff --git a/src/net2/utils.rs b/src/net/utils.rs similarity index 99% rename from src/net2/utils.rs rename to src/net/utils.rs index addf5bbb..3b412ad6 100644 --- a/src/net2/utils.rs +++ b/src/net/utils.rs @@ -1,4 +1,4 @@ -use crate::net2::{types::*, MESSAGE_MAX_SIZE, TIMEOUT}; +use crate::net::{types::*, MESSAGE_MAX_SIZE, TIMEOUT}; use crate::types::*; use anyhow::{anyhow, Result}; use futures::stream::{SplitSink, SplitStream}; diff --git a/src/net2/mod.rs b/src/net2/mod.rs deleted file mode 100644 index 7894c041..00000000 --- a/src/net2/mod.rs +++ /dev/null @@ -1,1186 +0,0 @@ -use crate::net2::{types::*, utils::*}; -use crate::types::*; -use anyhow::{anyhow, Result}; -use futures::{SinkExt, StreamExt}; -use rand::seq::SliceRandom; -use ring::signature::Ed25519KeyPair; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; -use tokio::net::TcpListener; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; -use tokio::task::JoinSet; -use tokio::time; -use tokio_tungstenite::{accept_async, connect_async, MaybeTlsStream, WebSocketStream}; - -mod types; -mod utils; - -// only used in connection initialization, otherwise, nacks and Responses are only used for "timeouts" -const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); - -// 10 MB -- TODO analyze as desired, apps can always chunk data into many messages -const MESSAGE_MAX_SIZE: u32 = 10_485_800; - -/// Entry point from the main kernel task. Runs forever, spawns listener and sender tasks. -pub async fn networking( - our: Identity, - our_ip: String, - keypair: Arc, - kernel_message_tx: MessageSender, - network_error_tx: NetworkErrorSender, - print_tx: PrintSender, - self_message_tx: MessageSender, - message_rx: MessageReceiver, -) -> Result<()> { - println!("networking!\r"); - println!("our identity: {:#?}\r", our); - // branch on whether we are a direct or indirect node - match &our.ws_routing { - None => { - // indirect node: run the indirect networking strategy - indirect_networking( - our, - our_ip, - keypair, - kernel_message_tx, - network_error_tx, - print_tx, - self_message_tx, - message_rx, - ) - .await - } - Some((ip, port)) => { - // direct node: run the direct networking strategy - if &our_ip != ip { - return Err(anyhow!( - "net: fatal error: IP address mismatch: {} != {}, update your QNS identity", - our_ip, - ip - )); - } - let tcp = match TcpListener::bind(format!("0.0.0.0:{}", port)).await { - Ok(tcp) => tcp, - Err(_e) => { - return Err(anyhow!( - "net: fatal error: can't listen on port {}, update your QNS identity or free up that port", - port, - )); - } - }; - direct_networking( - our, - our_ip, - tcp, - keypair, - kernel_message_tx, - network_error_tx, - print_tx, - self_message_tx, - message_rx, - ) - .await - } - } -} - -async fn indirect_networking( - our: Identity, - our_ip: String, - keypair: Arc, - kernel_message_tx: MessageSender, - network_error_tx: NetworkErrorSender, - print_tx: PrintSender, - self_message_tx: MessageSender, - mut message_rx: MessageReceiver, -) -> Result<()> { - println!("indirect_networking\r"); - let mut pki: OnchainPKI = HashMap::new(); - let mut peers: Peers = HashMap::new(); - // mapping from QNS namehash to username - let mut names: PKINames = HashMap::new(); - - let mut peer_connections = JoinSet::<(NodeId, Option)>::new(); - let mut active_routers = HashSet::::new(); - - // before opening up the main loop, go through our allowed routers - // and attempt to connect to all of them, saving the successfully - // connected-to ones in our router-set - connect_to_routers( - &our, - &our_ip, - &keypair, - &mut active_routers, - &pki, - &mut peers, - &mut peer_connections, - kernel_message_tx.clone(), - ) - .await; - - loop { - tokio::select! { - // 1. receive messages from kernel and send out over connections, - // making new connections through our router-set as needed - Some(km) = message_rx.recv() => { - // got a message from kernel to send out over the network - let target = &km.target.node; - // if the message is for us, it's either a protocol-level "hello" message, - // or a debugging command issued from our terminal. handle it here: - if target == &our.name { - match handle_local_message( - &our, - &our_ip, - &keypair, - km, - &mut peers, - &mut pki, - &mut peer_connections, - None, - None, - Some(&active_routers), - &mut names, - &kernel_message_tx, - &print_tx, - ) - .await { - Ok(()) => {}, - Err(e) => { - print_tx.send(Printout { - verbosity: 0, - content: format!("net: error handling local message: {}", e) - }).await?; - } - } - } - // if the message is for a peer we currently have a connection with, - // try to send it to them - else if let Some(peer) = peers.get_mut(target) { - peer.sender.send(km)?; - } - else if let Some(peer_id) = pki.get(target) { - // if the message is for a *direct* peer we don't have a connection with, - // try to establish a connection with them - // TODO: here, we can *choose* to use our routers so as not to reveal - // networking information about ourselves to the target. - if peer_id.ws_routing.is_some() { - match init_connection(&our, &our_ip, peer_id, &keypair, None, false).await { - Ok((peer_name, direct_conn)) => { - let (peer_tx, peer_rx) = unbounded_channel::(); - let peer = Arc::new(Peer { - identity: peer_id.clone(), - routing_for: false, - sender: peer_tx, - }); - peers.insert(peer_name, peer.clone()); - peer.sender.send(km)?; - peer_connections.spawn(maintain_connection( - peer, - direct_conn, - peer_rx, - kernel_message_tx.clone(), - )); - } - Err(e) => { - println!("net: error initializing connection: {}\r", e); - error_offline(km, &network_error_tx).await?; - } - } - } - // if the message is for an *indirect* peer we don't have a connection with, - // do some routing: in a randomized order, go through their listed routers - // on chain and try to get one of them to build a proxied connection to - // this node for you - else { - let sent = time::timeout(TIMEOUT, - init_connection_via_router( - &our, - &our_ip, - &keypair, - km.clone(), - peer_id, - &pki, - &names, - &mut peers, - &mut peer_connections, - kernel_message_tx.clone() - )).await; - if !sent.unwrap_or(false) { - // none of the routers worked! - println!("net: error initializing routed connection\r"); - error_offline(km, &network_error_tx).await?; - } - } - } - // peer cannot be found in PKI, throw an offline error - else { - error_offline(km, &network_error_tx).await?; - } - } - // 2. deal with active connections that die by removing the associated peer - // if the peer is one of our routers, remove them from router-set - Some(Ok((dead_peer, maybe_resend))) = peer_connections.join_next() => { - peers.remove(&dead_peer); - active_routers.remove(&dead_peer); - match maybe_resend { - None => {}, - Some(km) => { - self_message_tx.send(km).await?; - } - } - } - // 3. periodically attempt to connect to any allowed routers that we - // are not connected to - _ = time::sleep(time::Duration::from_secs(3)) => { - connect_to_routers( - &our, - &our_ip, - &keypair, - &mut active_routers, - &pki, - &mut peers, - &mut peer_connections, - kernel_message_tx.clone(), - ) - .await; - } - } - } -} - -async fn connect_to_routers( - our: &Identity, - our_ip: &str, - keypair: &Ed25519KeyPair, - active_routers: &mut HashSet, - pki: &OnchainPKI, - peers: &mut Peers, - peer_connections: &mut JoinSet<(NodeId, Option)>, - kernel_message_tx: MessageSender, -) { - for router in &our.allowed_routers { - if active_routers.contains(router) { - continue; - } - let Some(router_id) = pki.get(router) else { - continue; - }; - match init_connection(our, our_ip, router_id, keypair, None, true).await { - Ok((peer_name, direct_conn)) => { - let (peer_tx, peer_rx) = unbounded_channel::(); - let peer = Arc::new(Peer { - identity: router_id.clone(), - routing_for: false, - sender: peer_tx, - }); - println!("net: connected to router {}\r", peer_name); - peers.insert(peer_name.clone(), peer.clone()); - active_routers.insert(peer_name); - peer_connections.spawn(maintain_connection( - peer, - direct_conn, - peer_rx, - kernel_message_tx.clone(), - )); - } - Err(_e) => continue, - } - } -} - -async fn direct_networking( - our: Identity, - our_ip: String, - tcp: TcpListener, - keypair: Arc, - kernel_message_tx: MessageSender, - network_error_tx: NetworkErrorSender, - print_tx: PrintSender, - self_message_tx: MessageSender, - mut message_rx: MessageReceiver, -) -> Result<()> { - println!("direct_networking\r"); - let mut pki: OnchainPKI = HashMap::new(); - let mut peers: Peers = HashMap::new(); - // mapping from QNS namehash to username - let mut names: PKINames = HashMap::new(); - - let mut peer_connections = JoinSet::<(NodeId, Option)>::new(); - let mut forwarding_connections = JoinSet::<()>::new(); - let mut pending_passthroughs: PendingPassthroughs = HashMap::new(); - - loop { - tokio::select! { - // 1. receive messages from kernel and send out over our connections, - // making new connections as needed - Some(km) = message_rx.recv() => { - // got a message from kernel to send out over the network - let target = &km.target.node; - // if the message is for us, it's either a protocol-level "hello" message, - // or a debugging command issued from our terminal. handle it here: - if target == &our.name { - match handle_local_message( - &our, - &our_ip, - &keypair, - km, - &mut peers, - &mut pki, - &mut peer_connections, - Some(&mut pending_passthroughs), - Some(&forwarding_connections), - None, - &mut names, - &kernel_message_tx, - &print_tx, - ) - .await { - Ok(()) => {}, - Err(e) => { - print_tx.send(Printout { - verbosity: 0, - content: format!("net: error handling local message: {}", e) - }).await?; - } - } - } - // if the message is for a peer we currently have a connection with, - // try to send it to them - else if let Some(peer) = peers.get_mut(target) { - peer.sender.send(km)?; - } - else if let Some(peer_id) = pki.get(target) { - // if the message is for a *direct* peer we don't have a connection with, - // try to establish a connection with them - if peer_id.ws_routing.is_some() { - match init_connection(&our, &our_ip, peer_id, &keypair, None, false).await { - Ok((peer_name, direct_conn)) => { - let (peer_tx, peer_rx) = unbounded_channel::(); - let peer = Arc::new(Peer { - identity: peer_id.clone(), - routing_for: false, - sender: peer_tx, - }); - peers.insert(peer_name, peer.clone()); - peer.sender.send(km)?; - peer_connections.spawn(maintain_connection( - peer, - direct_conn, - peer_rx, - kernel_message_tx.clone(), - )); - } - Err(e) => { - println!("net: error initializing connection: {}\r", e); - error_offline(km, &network_error_tx).await?; - } - } - } - // if the message is for an *indirect* peer we don't have a connection with, - // do some routing: in a randomized order, go through their listed routers - // on chain and try to get one of them to build a proxied connection to - // this node for you - else { - let sent = time::timeout(TIMEOUT, - init_connection_via_router( - &our, - &our_ip, - &keypair, - km.clone(), - peer_id, - &pki, - &names, - &mut peers, - &mut peer_connections, - kernel_message_tx.clone() - )).await; - if !sent.unwrap_or(false) { - // none of the routers worked! - println!("net: error initializing routed connection\r"); - error_offline(km, &network_error_tx).await?; - } - } - } - // peer cannot be found in PKI, throw an offline error - else { - error_offline(km, &network_error_tx).await?; - } - } - // 2. receive incoming TCP connections - Ok((stream, _socket_addr)) = tcp.accept() => { - // TODO we can perform some amount of validation here - // to prevent some amount of potential DDoS attacks. - // can also block based on socket_addr - match accept_async(MaybeTlsStream::Plain(stream)).await { - Ok(websocket) => { - let (peer_id, routing_for, conn) = - match recv_connection( - &our, - &our_ip, - &pki, - &peers, - &mut pending_passthroughs, - &keypair, - websocket).await - { - Ok(res) => res, - Err(e) => { - println!("net: recv_connection failed: {e}\r"); - continue; - } - }; - // if conn is direct, add peer - // if passthrough, add to our forwarding connections joinset - match conn { - Connection::Peer(peer_conn) => { - let (peer_tx, peer_rx) = unbounded_channel::(); - let peer = Arc::new(Peer { - identity: peer_id, - routing_for, - sender: peer_tx, - }); - peers.insert(peer.identity.name.clone(), peer.clone()); - peer_connections.spawn(maintain_connection( - peer, - peer_conn, - peer_rx, - kernel_message_tx.clone(), - )); - } - Connection::Passthrough(passthrough_conn) => { - forwarding_connections.spawn(maintain_passthrough( - passthrough_conn, - )); - } - Connection::PendingPassthrough(pending_conn) => { - pending_passthroughs.insert( - (peer_id.name.clone(), pending_conn.target.clone()), - pending_conn - ); - } - } - } - // ignore connections we failed to accept...? - Err(_) => {} - } - } - // 3. deal with active connections that die by removing the associated peer - Some(Ok((dead_peer, maybe_resend))) = peer_connections.join_next() => { - peers.remove(&dead_peer); - match maybe_resend { - None => {}, - Some(km) => { - self_message_tx.send(km).await?; - } - } - } - } - } -} - -async fn init_connection_via_router( - our: &Identity, - our_ip: &str, - keypair: &Ed25519KeyPair, - km: KernelMessage, - peer_id: &Identity, - pki: &OnchainPKI, - names: &PKINames, - peers: &mut Peers, - peer_connections: &mut JoinSet<(NodeId, Option)>, - kernel_message_tx: MessageSender, -) -> bool { - println!("init_connection_via_router\r"); - let routers_shuffled = { - let mut routers = peer_id.allowed_routers.clone(); - routers.shuffle(&mut rand::thread_rng()); - routers - }; - for router_namehash in &routers_shuffled { - let Some(router_name) = names.get(router_namehash) else { - continue; - }; - let router_id = match pki.get(router_name) { - None => continue, - Some(id) => id, - }; - match init_connection(&our, &our_ip, peer_id, &keypair, Some(router_id), false).await { - Ok((peer_name, direct_conn)) => { - let (peer_tx, peer_rx) = unbounded_channel::(); - let peer = Arc::new(Peer { - identity: peer_id.clone(), - routing_for: false, - sender: peer_tx, - }); - peers.insert(peer_name, peer.clone()); - peer.sender.send(km).unwrap(); - peer_connections.spawn(maintain_connection( - peer, - direct_conn, - peer_rx, - kernel_message_tx.clone(), - )); - return true; - } - Err(_) => continue, - } - } - return false; -} - -async fn maintain_connection( - peer: Arc, - mut conn: PeerConnection, - mut peer_rx: UnboundedReceiver, - kernel_message_tx: MessageSender, - // network_error_tx: NetworkErrorSender, - // print_tx: PrintSender, -) -> (NodeId, Option) { - println!("maintain_connection\r"); - loop { - tokio::select! { - recv_result = recv_uqbar_message(&mut conn) => { - match recv_result { - Ok(km) => { - if km.source.node != peer.identity.name { - println!("net: got message with spoofed source\r"); - return (peer.identity.name.clone(), None) - } - kernel_message_tx.send(km).await.expect("net error: fatal: kernel died"); - } - Err(e) => { - println!("net: error receiving message: {}\r", e); - return (peer.identity.name.clone(), None) - } - } - }, - maybe_recv = peer_rx.recv() => { - match maybe_recv { - Some(km) => { - // TODO error handle - match send_uqbar_message(&km, &mut conn).await { - Ok(()) => continue, - Err(e) => { - println!("net: error sending message: {}\r", e); - return (peer.identity.name.clone(), Some(km)) - } - } - } - None => { - println!("net: peer disconnected\r"); - return (peer.identity.name.clone(), None) - } - } - }, - } - } -} - -/// match the streams -/// TODO optimize performance of this -async fn maintain_passthrough(mut conn: PassthroughConnection) { - println!("maintain_passthrough\r"); - loop { - tokio::select! { - maybe_recv = conn.read_stream_1.next() => { - match maybe_recv { - Some(Ok(msg)) => { - conn.write_stream_2.send(msg).await.expect("net error: fatal: kernel died"); - } - _ => { - println!("net: passthrough broke\r"); - return - } - } - }, - maybe_recv = conn.read_stream_2.next() => { - match maybe_recv { - Some(Ok(msg)) => { - conn.write_stream_1.send(msg).await.expect("net error: fatal: kernel died"); - } - _ => { - println!("net: passthrough broke\r"); - return - } - } - }, - } - } -} - -async fn recv_connection( - our: &Identity, - our_ip: &str, - pki: &OnchainPKI, - peers: &Peers, - pending_passthroughs: &mut PendingPassthroughs, - keypair: &Ed25519KeyPair, - websocket: WebSocketStream>, -) -> Result<(Identity, bool, Connection)> { - println!("recv_connection\r"); - let mut buf = vec![0u8; 65535]; - let (mut noise, our_static_key) = build_responder(); - let (mut write_stream, mut read_stream) = websocket.split(); - - // before we begin XX handshake pattern, check first message over socket - let first_message = &ws_recv(&mut read_stream).await?; - - // if the first message contains a "routing request", - // we see if the target is someone we are actively routing for, - // and create a Passthrough connection if so. - // a Noise 'e' message with have len 32 - if first_message.len() != 32 { - let (their_id, target_name) = validate_routing_request(&our.name, &first_message, pki)?; - let (id, conn) = create_passthrough( - our, - our_ip, - their_id, - target_name, - pki, - peers, - pending_passthroughs, - write_stream, - read_stream, - ) - .await?; - return Ok((id, false, conn)); - } - - // <- e - noise.read_message(first_message, &mut buf)?; - - // -> e, ee, s, es - send_uqbar_handshake( - &our, - keypair, - &our_static_key, - &mut noise, - &mut buf, - &mut write_stream, - false, - ) - .await?; - - // <- s, se - let their_handshake = recv_uqbar_handshake(&mut noise, &mut buf, &mut read_stream).await?; - - // now validate this handshake payload against the QNS PKI - let their_id = pki - .get(&their_handshake.name) - .ok_or(anyhow!("unknown QNS name"))?; - validate_handshake( - &their_handshake, - noise - .get_remote_static() - .ok_or(anyhow!("noise error: missing remote pubkey"))?, - their_id, - )?; - - // Transition the state machine into transport mode now that the handshake is complete. - let noise = noise.into_transport_mode()?; - println!("handshake complete, noise session received\r"); - - // TODO if their handshake indicates they want us to proxy - // for them (aka act as a router for them) we can choose - // whether to do so here. - Ok(( - their_id.clone(), - their_handshake.proxy_request, - Connection::Peer(PeerConnection { - noise, - buf, - write_stream, - read_stream, - }), - )) -} - -async fn recv_connection_via_router( - our: &Identity, - our_ip: &str, - their_name: &str, - pki: &OnchainPKI, - keypair: &Ed25519KeyPair, - router: &Identity, -) -> Result<(Identity, PeerConnection)> { - println!("recv_connection_via_router\r"); - let mut buf = vec![0u8; 65535]; - let (mut noise, our_static_key) = build_responder(); - - let Some((ref ip, ref port)) = router.ws_routing else { - return Err(anyhow!("router has no routing information")); - }; - let Ok(ws_url) = make_ws_url(our_ip, ip, port) else { - return Err(anyhow!("failed to parse websocket url")); - }; - let Ok(Ok((websocket, _response))) = time::timeout(TIMEOUT, connect_async(ws_url)).await else { - return Err(anyhow!("failed to connect to target")); - }; - let (mut write_stream, mut read_stream) = websocket.split(); - - // before beginning XX handshake pattern, send a routing request - let message = bincode::serialize(&RoutingRequest { - source: our.name.clone(), - signature: keypair - .sign([their_name, router.name.as_str()].concat().as_bytes()) - .as_ref() - .to_vec(), - target: their_name.to_string(), - protocol_version: 1, - })?; - ws_send(&mut write_stream, &message).await?; - - // <- e - noise.read_message(&ws_recv(&mut read_stream).await?, &mut buf)?; - - // -> e, ee, s, es - send_uqbar_handshake( - &our, - keypair, - &our_static_key, - &mut noise, - &mut buf, - &mut write_stream, - false, - ) - .await?; - - // <- s, se - let their_handshake = recv_uqbar_handshake(&mut noise, &mut buf, &mut read_stream).await?; - - // now validate this handshake payload against the QNS PKI - let their_id = pki - .get(&their_handshake.name) - .ok_or(anyhow!("unknown QNS name"))?; - validate_handshake( - &their_handshake, - noise - .get_remote_static() - .ok_or(anyhow!("noise error: missing remote pubkey"))?, - their_id, - )?; - - // Transition the state machine into transport mode now that the handshake is complete. - let noise = noise.into_transport_mode()?; - println!("handshake complete, noise session received\r"); - - Ok(( - their_id.clone(), - PeerConnection { - noise, - buf, - write_stream, - read_stream, - }, - )) -} - -async fn init_connection( - our: &Identity, - our_ip: &str, - peer_id: &Identity, - keypair: &Ed25519KeyPair, - use_router: Option<&Identity>, - proxy_request: bool, -) -> Result<(String, PeerConnection)> { - println!("init_connection\r"); - let mut buf = vec![0u8; 65535]; - let (mut noise, our_static_key) = build_initiator(); - - let (mut write_stream, mut read_stream) = match use_router { - None => { - let Some((ref ip, ref port)) = peer_id.ws_routing else { - return Err(anyhow!("target has no routing information")); - }; - let Ok(ws_url) = make_ws_url(our_ip, ip, port) else { - return Err(anyhow!("failed to parse websocket url")); - }; - let Ok(Ok((websocket, _response))) = - time::timeout(TIMEOUT, connect_async(ws_url)).await - else { - return Err(anyhow!("failed to connect to target")); - }; - websocket.split() - } - Some(router_id) => { - let Some((ref ip, ref port)) = router_id.ws_routing else { - return Err(anyhow!("router has no routing information")); - }; - let Ok(ws_url) = make_ws_url(our_ip, ip, port) else { - return Err(anyhow!("failed to parse websocket url")); - }; - let Ok(Ok((websocket, _response))) = - time::timeout(TIMEOUT, connect_async(ws_url)).await - else { - return Err(anyhow!("failed to connect to target")); - }; - websocket.split() - } - }; - - // if this is a routed request, before starting XX handshake pattern, send a - // routing request message over socket - if use_router.is_some() { - let message = bincode::serialize(&RoutingRequest { - source: our.name.clone(), - signature: keypair - .sign( - [&peer_id.name, use_router.unwrap().name.as_str()] - .concat() - .as_bytes(), - ) - .as_ref() - .to_vec(), - target: peer_id.name.clone(), - protocol_version: 1, - })?; - ws_send(&mut write_stream, &message).await?; - } - - // -> e - let len = noise.write_message(&[], &mut buf)?; - ws_send(&mut write_stream, &buf[..len]).await?; - - // <- e, ee, s, es - let their_handshake = recv_uqbar_handshake(&mut noise, &mut buf, &mut read_stream).await?; - - // now validate this handshake payload against the QNS PKI - validate_handshake( - &their_handshake, - noise - .get_remote_static() - .ok_or(anyhow!("noise error: missing remote pubkey"))?, - peer_id, - )?; - - // -> s, se - send_uqbar_handshake( - &our, - keypair, - &our_static_key, - &mut noise, - &mut buf, - &mut write_stream, - proxy_request, - ) - .await?; - - let noise = noise.into_transport_mode()?; - println!("handshake complete, noise session initiated\r"); - - Ok(( - their_handshake.name, - PeerConnection { - noise, - buf, - write_stream, - read_stream, - }, - )) -} - -/// net module only handles incoming local requests, will never return a response -async fn handle_local_message( - our: &Identity, - our_ip: &str, - keypair: &Ed25519KeyPair, - km: KernelMessage, - peers: &mut Peers, - pki: &mut OnchainPKI, - peer_connections: &mut JoinSet<(NodeId, Option)>, - pending_passthroughs: Option<&mut PendingPassthroughs>, - forwarding_connections: Option<&JoinSet<()>>, - active_routers: Option<&HashSet>, - names: &mut PKINames, - kernel_message_tx: &MessageSender, - print_tx: &PrintSender, -) -> Result<()> { - println!("handle_local_message\r"); - let ipc = match km.message { - Message::Request(request) => request.ipc, - Message::Response((response, _context)) => { - // these are received as a router, when we send ConnectionRequests - // to a node we do routing for. - match serde_json::from_slice::(&response.ipc)? { - NetResponses::Attempting(_) => { - // TODO anything here? - } - NetResponses::Rejected(to) => { - // drop from our pending map - // this will drop the socket, causing initiator to see it as failed - pending_passthroughs - .ok_or(anyhow!("got net response as non-router"))? - .remove(&(to, km.source.node)); - } - } - return Ok(()); - } - }; - - if km.source.node != our.name { - if let Ok(act) = serde_json::from_slice::(&ipc) { - match act { - NetActions::QnsBatchUpdate(_) | NetActions::QnsUpdate(_) => { - // for now, we don't get these from remote. - } - NetActions::ConnectionRequest(from) => { - // someone wants to open a passthrough with us through a router! - // if we are an indirect node, and source is one of our routers, - // respond by attempting to init a matching passthrough. - // TODO can discriminate more here.. - if our.allowed_routers.contains(&km.source.node) { - let Ok((peer_id, peer_conn)) = time::timeout( - TIMEOUT, - recv_connection_via_router( - our, - our_ip, - &from, - pki, - keypair, - &peers - .get(&km.source.node) - .ok_or(anyhow!("unknown router"))? - .identity, - ), - ) - .await? - else { - return Err(anyhow!("someone tried to connect to us but it timed out")); - }; - let (peer_tx, peer_rx) = unbounded_channel::(); - let peer = Arc::new(Peer { - identity: peer_id, - routing_for: false, - sender: peer_tx, - }); - peers.insert(peer.identity.name.clone(), peer.clone()); - peer_connections.spawn(maintain_connection( - peer, - peer_conn, - peer_rx, - kernel_message_tx.clone(), - )); - } else { - kernel_message_tx - .send(KernelMessage { - id: km.id, - source: Address { - node: our.name.clone(), - process: ProcessId::from_str("net:sys:uqbar").unwrap(), - }, - target: km.rsvp.unwrap_or(km.source), - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - ipc: serde_json::to_vec(&NetResponses::Rejected(from))?, - metadata: None, - }, - None, - )), - payload: None, - signed_capabilities: None, - }) - .await?; - } - } - } - return Ok(()); - }; - // if we can't parse this to a netaction, treat it as a hello and print it - // respond to a text message with a simple "delivered" response - print_tx - .send(Printout { - verbosity: 0, - content: format!( - "\x1b[3;32m{}: {}\x1b[0m", - km.source.node, - std::str::from_utf8(&ipc).unwrap_or("!!message parse error!!") - ), - }) - .await?; - kernel_message_tx - .send(KernelMessage { - id: km.id, - source: Address { - node: our.name.clone(), - process: ProcessId::from_str("net:sys:uqbar").unwrap(), - }, - target: km.rsvp.unwrap_or(km.source), - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - ipc: "delivered".as_bytes().to_vec(), - metadata: None, - }, - None, - )), - payload: None, - signed_capabilities: None, - }) - .await?; - Ok(()) - } else { - // available commands: "peers", "pki", "names", "diagnostics" - // first parse as raw string, then deserialize to NetActions object - match std::str::from_utf8(&ipc) { - Ok("peers") => { - print_tx - .send(Printout { - verbosity: 0, - content: format!("{:#?}", peers.keys()), - }) - .await?; - } - Ok("pki") => { - print_tx - .send(Printout { - verbosity: 0, - content: format!("{:#?}", pki), - }) - .await?; - } - Ok("names") => { - print_tx - .send(Printout { - verbosity: 0, - content: format!("{:#?}", names), - }) - .await?; - } - Ok("diagnostics") => { - print_tx - .send(Printout { - verbosity: 0, - content: format!("our Identity: {:#?}", our), - }) - .await?; - print_tx - .send(Printout { - verbosity: 0, - content: format!("we have connections with peers: {:#?}", peers.keys()), - }) - .await?; - print_tx - .send(Printout { - verbosity: 0, - content: format!("we have {} entries in the PKI", pki.len()), - }) - .await?; - print_tx - .send(Printout { - verbosity: 0, - content: format!( - "we have {} open peer connections", - peer_connections.len() - ), - }) - .await?; - if pending_passthroughs.is_some() { - print_tx - .send(Printout { - verbosity: 0, - content: format!( - "we have {} pending passthrough connections", - pending_passthroughs.unwrap().len() - ), - }) - .await?; - } - if forwarding_connections.is_some() { - print_tx - .send(Printout { - verbosity: 0, - content: format!( - "we have {} open passthrough connections", - forwarding_connections.unwrap().len() - ), - }) - .await?; - } - if active_routers.is_some() { - print_tx - .send(Printout { - verbosity: 0, - content: format!( - "we have {} active routers", - active_routers.unwrap().len() - ), - }) - .await?; - } - } - _ => { - let Ok(act) = serde_json::from_slice::(&ipc) else { - print_tx - .send(Printout { - verbosity: 0, - content: "net: got unknown command".into(), - }) - .await?; - return Ok(()); - }; - match act { - NetActions::ConnectionRequest(_) => { - // we shouldn't receive these from ourselves. - } - NetActions::QnsUpdate(log) => { - print_tx - .send(Printout { - verbosity: 1, - content: format!("net: got QNS update for {}", log.name), - }) - .await?; - - pki.insert( - log.name.clone(), - Identity { - name: log.name.clone(), - networking_key: log.public_key, - ws_routing: if log.ip == "0.0.0.0".to_string() || log.port == 0 { - None - } else { - Some((log.ip, log.port)) - }, - allowed_routers: log.routers, - }, - ); - names.insert(log.node, log.name); - } - NetActions::QnsBatchUpdate(log_list) => { - print_tx - .send(Printout { - verbosity: 1, - content: format!( - "net: got QNS update with {} peers", - log_list.len() - ), - }) - .await?; - for log in log_list { - pki.insert( - log.name.clone(), - Identity { - name: log.name.clone(), - networking_key: log.public_key, - ws_routing: if log.ip == "0.0.0.0".to_string() || log.port == 0 - { - None - } else { - Some((log.ip, log.port)) - }, - allowed_routers: log.routers, - }, - ); - names.insert(log.node, log.name); - } - } - } - } - } - Ok(()) - } -} diff --git a/src/net2/types.rs b/src/net2/types.rs deleted file mode 100644 index 4141e718..00000000 --- a/src/net2/types.rs +++ /dev/null @@ -1,115 +0,0 @@ -use crate::types::*; -use futures::stream::{SplitSink, SplitStream}; -use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, sync::Arc}; -use tokio::net::TcpStream; -use tokio::sync::mpsc::UnboundedSender; -use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; - -/// Sent to a node when you want to connect directly to them. -/// Sent in the 'e, ee, s, es' and 's, se' phases of XX noise protocol pattern. -#[derive(Debug, Deserialize, Serialize)] -pub struct HandshakePayload { - pub name: NodeId, - // signature is created by their networking key, of their static key - // someone could reuse this signature, but then they will be unable - // to encrypt messages to us. - pub signature: Vec, - /// Set to true when you want them to act as a router for you, sending - /// messages from potentially many remote sources over this connection, - /// including from the router itself. - /// This is not relevant in a handshake sent from the receiver side. - pub proxy_request: bool, - pub protocol_version: u8, -} - -/// Sent to a node when you want them to connect you to an indirect node. -/// If the receiver of the request has an open connection to your target, -/// and is willing, they will send a message to the target prompting them -/// to build the other side of the connection, at which point they will -/// hold open a Passthrough for you two. -/// -/// Alternatively, if the receiver does not have an open connection but the -/// target is a direct node, they can create a Passthrough for you two if -/// they are willing to proxy for you. -/// -/// Sent in the 'e' phase of XX noise protocol pattern. -#[derive(Debug, Deserialize, Serialize)] -pub struct RoutingRequest { - pub source: NodeId, - // signature is created by their networking key, of the [target, router name].concat() - // someone could reuse this signature, and TODO need to make sure that's useless. - pub signature: Vec, - pub target: NodeId, - pub protocol_version: u8, -} - -pub enum Connection { - Peer(PeerConnection), - Passthrough(PassthroughConnection), - PendingPassthrough(PendingPassthroughConnection), -} - -pub struct PeerConnection { - pub noise: snow::TransportState, - pub buf: Vec, - pub write_stream: SplitSink>, tungstenite::Message>, - pub read_stream: SplitStream>>, -} - -pub struct PassthroughConnection { - pub write_stream_1: SplitSink>, tungstenite::Message>, - pub read_stream_1: SplitStream>>, - pub write_stream_2: SplitSink>, tungstenite::Message>, - pub read_stream_2: SplitStream>>, -} - -pub struct PendingPassthroughConnection { - pub target: NodeId, - pub write_stream: SplitSink>, tungstenite::Message>, - pub read_stream: SplitStream>>, -} - -// TODO upgrade from hashmaps -pub type Peers = HashMap>; -pub type PKINames = HashMap; // TODO maybe U256 to String -pub type OnchainPKI = HashMap; -pub type PendingPassthroughs = HashMap<(NodeId, NodeId), PendingPassthroughConnection>; - -pub struct Peer { - pub identity: Identity, - /// If true, we are routing for them and have a RoutingClientConnection - /// associated with them. We can send them prompts to establish Passthroughs. - pub routing_for: bool, - pub sender: UnboundedSender, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum NetActions { - /// Received from a router of ours when they have a new pending passthrough for us. - /// We should respond (if we desire) by using them to initialize a routed connection - /// with the NodeId given. - ConnectionRequest(NodeId), - /// can only receive from trusted source, for now just ourselves locally, - /// in the future could get from remote provider - QnsUpdate(QnsUpdate), - QnsBatchUpdate(Vec), -} - -/// For now, only sent in response to a ConnectionRequest -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum NetResponses { - Attempting(NodeId), - Rejected(NodeId), -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct QnsUpdate { - pub name: String, // actual username / domain name - pub owner: String, - pub node: String, // hex namehash of node - pub public_key: String, - pub ip: String, - pub port: u16, - pub routers: Vec, -}