diff --git a/src/net2/mod.rs b/src/net2/mod.rs index 6faf7523..c7d2deb2 100644 --- a/src/net2/mod.rs +++ b/src/net2/mod.rs @@ -2,6 +2,7 @@ use crate::types::*; use anyhow::{anyhow, Result}; use futures::stream::{SplitSink, SplitStream}; use futures::{SinkExt, StreamExt}; +use rand::seq::SliceRandom; use ring::signature::{self, Ed25519KeyPair}; use serde::{Deserialize, Serialize}; use snow::params::NoiseParams; @@ -45,17 +46,43 @@ struct QnsUpdate { #[derive(Debug, Deserialize, Serialize)] struct HandshakePayload { pub name: NodeId, + // signature is created by their networking key, of their static key + // someone could reuse this signature, but then they will be unable + // to encrypt messages to us. pub signature: Vec, pub protocol_version: u8, } -struct OpenConnection { +#[derive(Debug, Deserialize, Serialize)] +struct RoutingRequest { + pub name: NodeId, + // signature is created by their networking key, of the [target, router name].concat() + // someone could reuse this signature, and TODO need to find a way + // to make that useless in this routing request case. + pub signature: Vec, + pub target: NodeId, + pub protocol_version: u8, +} + +enum Connection { + Peer(PeerConnection), + Passthrough(PassthroughConnection), +} + +struct PeerConnection { pub noise: snow::TransportState, pub buf: Vec, pub write_stream: SplitSink>, tungstenite::Message>, pub read_stream: SplitStream>>, } +struct PassthroughConnection { + pub write_stream_1: SplitSink>, tungstenite::Message>, + pub read_stream_1: SplitStream>>, + pub write_stream_2: SplitSink>, tungstenite::Message>, + pub read_stream_2: SplitStream>>, +} + type Peers = HashMap>; type PKINames = HashMap; // TODO maybe U256 to String type OnchainPKI = HashMap; @@ -136,13 +163,12 @@ async fn direct_networking( // mapping from QNS namehash to username let mut names: PKINames = HashMap::new(); - let mut active_connections = JoinSet::<(String, Option)>::new(); + let mut peer_connections = JoinSet::<(NodeId, Option)>::new(); + let mut forwarding_connections = JoinSet::::new(); - // 1. receive messages from kernel and send out over our connections - // 2. receive incoming TCP connections - // 3. deal with active connections that die by removing the associated peer loop { tokio::select! { + // 1. receive messages from kernel and send out over our connections Some(km) = message_rx.recv() => { // got a message from kernel to send out over the network let target = &km.target.node; @@ -177,8 +203,8 @@ async fn direct_networking( // if the message is for a *direct* peer we don't have a connection with, // try to establish a connection with them if peer_id.ws_routing.is_some() { - match init_connection(&our, &our_ip, peer_id, &keypair).await { - Ok((peer_name, conn)) => { + match init_connection(&our, &our_ip, peer_id, &keypair, None).await { + Ok((peer_name, direct_conn)) => { let (peer_tx, peer_rx) = unbounded_channel::(); let peer = Arc::new(Peer { identity: peer_id.clone(), @@ -186,23 +212,40 @@ async fn direct_networking( }); peers.insert(peer_name, peer.clone()); peer.sender.send(km)?; - active_connections.spawn(maintain_connection( + peer_connections.spawn(maintain_connection( peer, - conn, + direct_conn, peer_rx, kernel_message_tx.clone(), )); } Err(e) => { - println!("net: error initializing connection: {}", e); + println!("net: error initializing connection: {}\r", e); error_offline(km, &network_error_tx).await?; } } } // if the message is for an *indirect* peer we don't have a connection with, - // do some routing and shit + // do some routing: in a randomized order, go through their listed routers + // on chain and try to get one of them to build a proxied connection to + // this node for you else { - todo!() + let sent = init_connection_via_router( + &our, + &our_ip, + &keypair, + km.clone(), + peer_id, + &pki, + &mut peers, + &mut peer_connections, + kernel_message_tx.clone() + ).await; + if !sent { + // none of the routers worked! + println!("net: error initializing routed connection\r"); + error_offline(km, &network_error_tx).await?; + } } } // peer cannot be found in PKI, throw an offline error @@ -210,32 +253,45 @@ async fn direct_networking( error_offline(km, &network_error_tx).await?; } } + // 2. receive incoming TCP connections Ok((stream, _socket_addr)) = tcp.accept() => { // TODO we can perform some amount of validation here // to prevent some amount of potential DDoS attacks. // can also block based on socket_addr match accept_async(MaybeTlsStream::Plain(stream)).await { Ok(websocket) => { - let (peer_name, conn) = recv_connection(&our, &pki, &keypair, websocket).await?; + let (peer_id, conn) = recv_connection(&our, &our_ip, &pki, &keypair, websocket).await?; let (peer_tx, peer_rx) = unbounded_channel::(); let peer = Arc::new(Peer { - identity: pki.get(&peer_name).ok_or(anyhow!("jej"))?.clone(), + identity: peer_id, sender: peer_tx, }); - peers.insert(peer_name, peer.clone()); - println!("received incoming connection\r"); - active_connections.spawn(maintain_connection( - peer, - conn, - peer_rx, - kernel_message_tx.clone(), - )); + // if conn is direct, add peer + // if passthrough, add to our forwarding connections joinset + match conn { + Connection::Peer(peer_conn) => { + peers.insert(peer.identity.name.clone(), peer.clone()); + peer_connections.spawn(maintain_connection( + peer, + peer_conn, + peer_rx, + kernel_message_tx.clone(), + )); + } + Connection::Passthrough(passthrough_conn) => { + forwarding_connections.spawn(maintain_passthrough( + peer, + passthrough_conn, + )); + } + } } // ignore connections we failed to accept...? Err(_) => {} } } - Some(Ok((dead_peer, maybe_resend))) = active_connections.join_next() => { + // 3. deal with active connections that die by removing the associated peer + Some(Ok((dead_peer, maybe_resend))) = peer_connections.join_next() => { peers.remove(&dead_peer); match maybe_resend { None => {}, @@ -248,14 +304,58 @@ async fn direct_networking( } } +async fn init_connection_via_router( + our: &Identity, + our_ip: &str, + keypair: &Ed25519KeyPair, + km: KernelMessage, + peer_id: &Identity, + pki: &OnchainPKI, + peers: &mut Peers, + peer_connections: &mut JoinSet<(NodeId, Option)>, + kernel_message_tx: MessageSender, +) -> bool { + let routers_shuffled = { + let mut routers = peer_id.allowed_routers.clone(); + routers.shuffle(&mut rand::thread_rng()); + routers + }; + for router in routers_shuffled { + let router_id = match pki.get(&router) { + None => continue, + Some(id) => id, + }; + match init_connection(&our, &our_ip, peer_id, &keypair, Some(router_id)).await { + Ok((peer_name, direct_conn)) => { + let (peer_tx, peer_rx) = unbounded_channel::(); + let peer = Arc::new(Peer { + identity: peer_id.clone(), + sender: peer_tx, + }); + peers.insert(peer_name, peer.clone()); + peer.sender.send(km).unwrap(); + peer_connections.spawn(maintain_connection( + peer, + direct_conn, + peer_rx, + kernel_message_tx.clone(), + )); + return true; + } + Err(_) => continue, + } + } + return false; +} + async fn maintain_connection( peer: Arc, - mut conn: OpenConnection, + mut conn: PeerConnection, mut peer_rx: UnboundedReceiver, kernel_message_tx: MessageSender, // network_error_tx: NetworkErrorSender, // print_tx: PrintSender, -) -> (String, Option) { +) -> (NodeId, Option) { println!("maintain_connection\r"); loop { tokio::select! { @@ -296,19 +396,69 @@ async fn maintain_connection( } } +/// match the streams +/// TODO optimize performance of this +async fn maintain_passthrough(peer: Arc, mut conn: PassthroughConnection) -> NodeId { + println!("maintain_passthrough\r"); + loop { + tokio::select! { + maybe_recv = conn.read_stream_1.next() => { + match maybe_recv { + Some(Ok(msg)) => { + conn.write_stream_2.send(msg).await.expect("net error: fatal: kernel died"); + } + _ => { + println!("net: passthrough broke\r"); + return peer.identity.name.clone() + } + } + }, + maybe_recv = conn.read_stream_2.next() => { + match maybe_recv { + Some(Ok(msg)) => { + conn.write_stream_1.send(msg).await.expect("net error: fatal: kernel died"); + } + _ => { + println!("net: passthrough broke\r"); + return peer.identity.name.clone() + } + } + }, + } + } +} + async fn recv_connection( our: &Identity, + our_ip: &str, pki: &OnchainPKI, keypair: &Ed25519KeyPair, websocket: WebSocketStream>, -) -> Result<(String, OpenConnection)> { +) -> Result<(Identity, Connection)> { println!("recv_connection\r"); let mut buf = vec![0u8; 65535]; let (mut noise, our_static_key) = build_responder(); let (mut write_stream, mut read_stream) = websocket.split(); // <- e - noise.read_message(&ws_recv(&mut read_stream).await?, &mut buf)?; + let len = noise.read_message(&ws_recv(&mut read_stream).await?, &mut buf)?; + + // if the first message contains a "routing request", + // we evaluate whether we want to perform routing for them + if len != 0 { + let (their_id, target_name) = validate_routing_request(&our.name, &buf, pki)?; + // TODO evaluate whether we want to perform routing for them! + // if we do, produce this thing: + return create_passthrough( + our_ip, + their_id, + target_name, + pki, + write_stream, + read_stream, + ) + .await; + } // -> e, ee, s, es send_uqbar_handshake( @@ -325,13 +475,15 @@ async fn recv_connection( let their_handshake = recv_uqbar_handshake(&mut noise, &mut buf, &mut read_stream).await?; // now validate this handshake payload against the QNS PKI + let their_id = pki + .get(&their_handshake.name) + .ok_or(anyhow!("unknown QNS name"))?; validate_handshake( &their_handshake, noise .get_remote_static() .ok_or(anyhow!("noise error: missing remote pubkey"))?, - pki.get(&their_handshake.name) - .ok_or(anyhow!("unknown QNS name"))?, + their_id, )?; // Transition the state machine into transport mode now that the handshake is complete. @@ -339,13 +491,13 @@ async fn recv_connection( println!("handshake complete, noise session received\r"); Ok(( - their_handshake.name, - OpenConnection { + their_id.clone(), + Connection::Peer(PeerConnection { noise, buf, write_stream, read_stream, - }, + }), )) } @@ -354,24 +506,56 @@ async fn init_connection( our_ip: &str, peer_id: &Identity, keypair: &Ed25519KeyPair, -) -> Result<(String, OpenConnection)> { + use_router: Option<&Identity>, +) -> Result<(String, PeerConnection)> { println!("init_connection\r"); let mut buf = vec![0u8; 65535]; let (mut noise, our_static_key) = build_initiator(); - let Some((ref ip, ref port)) = peer_id.ws_routing else { - return Err(anyhow!("target has no routing information")); + let (mut write_stream, mut read_stream) = match use_router { + None => { + let Some((ref ip, ref port)) = peer_id.ws_routing else { + return Err(anyhow!("target has no routing information")); + }; + let Ok(ws_url) = make_ws_url(our_ip, ip, port) else { + return Err(anyhow!("failed to parse websocket url")); + }; + let Ok(Ok((websocket, _response))) = timeout(TIMEOUT, connect_async(ws_url)).await else { + return Err(anyhow!("failed to connect to target")); + }; + websocket.split() + } + Some(router_id) => { + let Some((ref ip, ref port)) = router_id.ws_routing else { + return Err(anyhow!("router has no routing information")); + }; + let Ok(ws_url) = make_ws_url(our_ip, ip, port) else { + return Err(anyhow!("failed to parse websocket url")); + }; + let Ok(Ok((websocket, _response))) = timeout(TIMEOUT, connect_async(ws_url)).await else { + return Err(anyhow!("failed to connect to target")); + }; + websocket.split() + } }; - let Ok(ws_url) = make_ws_url(our_ip, ip, port) else { - return Err(anyhow!("failed to parse websocket url")); - }; - let Ok(Ok((websocket, _response))) = timeout(TIMEOUT, connect_async(ws_url)).await else { - return Err(anyhow!("failed to connect to target")); - }; - let (mut write_stream, mut read_stream) = websocket.split(); // -> e - let len = noise.write_message(&[], &mut buf)?; + let message = match use_router { + None => vec![], + Some(router_id) => { + let routing_request = RoutingRequest { + name: our.name.clone(), + signature: keypair + .sign([&peer_id.name, router_id.name.as_str()].concat().as_bytes()) + .as_ref() + .to_vec(), + target: peer_id.name.clone(), + protocol_version: 1, + }; + bincode::serialize(&routing_request)? + } + }; + let len = noise.write_message(&message, &mut buf)?; ws_send(&mut write_stream, &buf[..len]).await?; // <- e, ee, s, es @@ -402,7 +586,7 @@ async fn init_connection( Ok(( their_handshake.name, - OpenConnection { + PeerConnection { noise, buf, write_stream, @@ -411,6 +595,58 @@ async fn init_connection( )) } +async fn create_passthrough( + our_ip: &str, + from_id: Identity, + to_name: NodeId, + pki: &OnchainPKI, + write_stream_1: SplitSink>, tungstenite::Message>, + read_stream_1: SplitStream>>, +) -> Result<(Identity, Connection)> { + let to_id = pki.get(&to_name).ok_or(anyhow!("unknown QNS name"))?; + let Some((ref ip, ref port)) = to_id.ws_routing else { + return Err(anyhow!("target has no routing information")); + }; + let Ok(ws_url) = make_ws_url(our_ip, ip, port) else { + return Err(anyhow!("failed to parse websocket url")); + }; + let Ok(Ok((websocket, _response))) = timeout(TIMEOUT, connect_async(ws_url)).await else { + return Err(anyhow!("failed to connect to target")); + }; + let (write_stream_2, read_stream_2) = websocket.split(); + + Ok(( + from_id, + Connection::Passthrough(PassthroughConnection { + write_stream_1, + read_stream_1, + write_stream_2, + read_stream_2, + }), + )) +} + +fn validate_routing_request( + our_name: &str, + buf: &[u8], + pki: &OnchainPKI, +) -> Result<(Identity, NodeId)> { + println!("validate_routing_request\r"); + let routing_request: RoutingRequest = bincode::deserialize(buf)?; + let their_id = pki + .get(&routing_request.name) + .ok_or(anyhow!("unknown QNS name"))?; + let their_networking_key = signature::UnparsedPublicKey::new( + &signature::ED25519, + hex::decode(&strip_0x(&their_id.networking_key))?, + ); + their_networking_key.verify( + &[&routing_request.target, our_name].concat().as_bytes(), + &routing_request.signature, + )?; + Ok((their_id.clone(), routing_request.target)) +} + fn validate_handshake( handshake: &HandshakePayload, their_static_key: &[u8], @@ -429,7 +665,7 @@ fn validate_handshake( Ok(()) } -async fn send_uqbar_message(km: &KernelMessage, conn: &mut OpenConnection) -> Result<()> { +async fn send_uqbar_message(km: &KernelMessage, conn: &mut PeerConnection) -> Result<()> { let serialized = bincode::serialize(km)?; if serialized.len() > MESSAGE_MAX_SIZE as usize { return Err(anyhow!("uqbar message too large")); @@ -446,7 +682,7 @@ async fn send_uqbar_message(km: &KernelMessage, conn: &mut OpenConnection) -> Re Ok(()) } -async fn recv_uqbar_message(conn: &mut OpenConnection) -> Result { +async fn recv_uqbar_message(conn: &mut PeerConnection) -> Result { let outer_len = conn .noise .read_message(&ws_recv(&mut conn.read_stream).await?, &mut conn.buf)?;