diff --git a/src/main.rs b/src/main.rs index c65a7e0f..e57ff2b6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,6 +35,11 @@ const CAP_CHANNEL_CAPACITY: usize = 1_000; const VERSION: &str = env!("CARGO_PKG_VERSION"); +/// This can and should be an environment variable / setting. It configures networking +/// such that indirect nodes always use routers, even when target is a direct node, +/// such that only their routers can ever see their physical networking details. +const REVEAL_IP: bool = true; + #[tokio::main] async fn main() { // For use with https://github.com/tokio-rs/console @@ -263,6 +268,7 @@ async fn main() { print_sender.clone(), net_message_sender, net_message_receiver, + REVEAL_IP, )); tasks.spawn(filesystem::fs_sender( our.name.clone(), @@ -320,10 +326,10 @@ async fn main() { "\x1b[38;5;196muh oh, a kernel process crashed: {}\x1b[0m", e ) - // TODO restart the task + // TODO restart the task? } else { format!("what does this mean???") - // TODO restart the task + // TODO restart the task? } } quit = terminal::terminal( diff --git a/src/net/mod.rs b/src/net/mod.rs index f4c718b5..36563344 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -20,7 +20,8 @@ mod utils; // only used in connection initialization, otherwise, nacks and Responses are only used for "timeouts" const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); -// 10 MB -- TODO analyze as desired, apps can always chunk data into many messages +/// 10 MB -- TODO analyze as desired, apps can always chunk data into many messages +/// note that this only applies to cross-network messages, not local ones. const MESSAGE_MAX_SIZE: u32 = 10_485_800; /// Entry point from the main kernel task. Runs forever, spawns listener and sender tasks. @@ -33,6 +34,7 @@ pub async fn networking( print_tx: PrintSender, self_message_tx: MessageSender, message_rx: MessageReceiver, + reveal_ip: bool, ) -> Result<()> { println!("networking!\r"); println!("our identity: {:#?}\r", our); @@ -49,6 +51,7 @@ pub async fn networking( print_tx, self_message_tx, message_rx, + reveal_ip, ) .await } @@ -95,6 +98,7 @@ async fn indirect_networking( print_tx: PrintSender, self_message_tx: MessageSender, mut message_rx: MessageReceiver, + reveal_ip: bool, ) -> Result<()> { println!("indirect_networking\r"); let mut pki: OnchainPKI = HashMap::new(); @@ -105,21 +109,6 @@ async fn indirect_networking( let mut peer_connections = JoinSet::<(NodeId, Option)>::new(); let mut active_routers = HashSet::::new(); - // before opening up the main loop, go through our allowed routers - // and attempt to connect to all of them, saving the successfully - // connected-to ones in our router-set - connect_to_routers( - &our, - &our_ip, - &keypair, - &mut active_routers, - &pki, - &mut peers, - &mut peer_connections, - kernel_message_tx.clone(), - ) - .await; - loop { tokio::select! { // 1. receive messages from kernel and send out over connections, @@ -163,9 +152,9 @@ async fn indirect_networking( else if let Some(peer_id) = pki.get(target) { // if the message is for a *direct* peer we don't have a connection with, // try to establish a connection with them - // TODO: here, we can *choose* to use our routers so as not to reveal + // here, we can *choose* to use our routers so as not to reveal // networking information about ourselves to the target. - if peer_id.ws_routing.is_some() { + if peer_id.ws_routing.is_some() && reveal_ip { match init_connection(&our, &our_ip, peer_id, &keypair, None, false).await { Ok((peer_name, direct_conn)) => { let (peer_tx, peer_rx) = unbounded_channel::(); @@ -181,6 +170,7 @@ async fn indirect_networking( direct_conn, peer_rx, kernel_message_tx.clone(), + print_tx.clone(), )); } Err(e) => { @@ -190,6 +180,7 @@ async fn indirect_networking( } } // if the message is for an *indirect* peer we don't have a connection with, + // or we want to protect our node's physical networking details from non-routers, // do some routing: in a randomized order, go through their listed routers // on chain and try to get one of them to build a proxied connection to // this node for you @@ -205,7 +196,8 @@ async fn indirect_networking( &names, &mut peers, &mut peer_connections, - kernel_message_tx.clone() + kernel_message_tx.clone(), + print_tx.clone(), )).await; if !sent.unwrap_or(false) { // none of the routers worked! @@ -234,62 +226,43 @@ async fn indirect_networking( // 3. periodically attempt to connect to any allowed routers that we // are not connected to _ = time::sleep(time::Duration::from_secs(3)) => { - connect_to_routers( - &our, - &our_ip, - &keypair, - &mut active_routers, - &pki, - &mut peers, - &mut peer_connections, - kernel_message_tx.clone(), - ) - .await; + if active_routers.len() == our.allowed_routers.len() { + continue; + } + for router in &our.allowed_routers { + if active_routers.contains(router) { + continue; + } + let Some(router_id) = pki.get(router) else { + continue; + }; + match init_connection(&our, &our_ip, router_id, &keypair, None, true).await { + Ok((peer_name, direct_conn)) => { + let (peer_tx, peer_rx) = unbounded_channel::(); + let peer = Arc::new(Peer { + identity: router_id.clone(), + routing_for: false, + sender: peer_tx, + }); + println!("net: connected to router {}\r", peer_name); + peers.insert(peer_name.clone(), peer.clone()); + active_routers.insert(peer_name); + peer_connections.spawn(maintain_connection( + peer, + direct_conn, + peer_rx, + kernel_message_tx.clone(), + print_tx.clone(), + )); + } + Err(_e) => continue, + } + } } } } } -async fn connect_to_routers( - our: &Identity, - our_ip: &str, - keypair: &Ed25519KeyPair, - active_routers: &mut HashSet, - pki: &OnchainPKI, - peers: &mut Peers, - peer_connections: &mut JoinSet<(NodeId, Option)>, - kernel_message_tx: MessageSender, -) { - for router in &our.allowed_routers { - if active_routers.contains(router) { - continue; - } - let Some(router_id) = pki.get(router) else { - continue; - }; - match init_connection(our, our_ip, router_id, keypair, None, true).await { - Ok((peer_name, direct_conn)) => { - let (peer_tx, peer_rx) = unbounded_channel::(); - let peer = Arc::new(Peer { - identity: router_id.clone(), - routing_for: false, - sender: peer_tx, - }); - println!("net: connected to router {}\r", peer_name); - peers.insert(peer_name.clone(), peer.clone()); - active_routers.insert(peer_name); - peer_connections.spawn(maintain_connection( - peer, - direct_conn, - peer_rx, - kernel_message_tx.clone(), - )); - } - Err(_e) => continue, - } - } -} - async fn direct_networking( our: Identity, our_ip: String, @@ -370,6 +343,7 @@ async fn direct_networking( direct_conn, peer_rx, kernel_message_tx.clone(), + print_tx.clone(), )); } Err(e) => { @@ -394,7 +368,8 @@ async fn direct_networking( &names, &mut peers, &mut peer_connections, - kernel_message_tx.clone() + kernel_message_tx.clone(), + print_tx.clone(), )).await; if !sent.unwrap_or(false) { // none of the routers worked! @@ -431,8 +406,11 @@ async fn direct_networking( continue; } }; - // if conn is direct, add peer - // if passthrough, add to our forwarding connections joinset + // TODO if their handshake indicates they want us to proxy + // for them (aka act as a router for them) we can choose + // whether to do so here! + // if conn is direct, add peer. if passthrough, add to our + // forwarding connections joinset match conn { Connection::Peer(peer_conn) => { let (peer_tx, peer_rx) = unbounded_channel::(); @@ -447,6 +425,7 @@ async fn direct_networking( peer_conn, peer_rx, kernel_message_tx.clone(), + print_tx.clone(), )); } Connection::Passthrough(passthrough_conn) => { @@ -491,6 +470,7 @@ async fn init_connection_via_router( peers: &mut Peers, peer_connections: &mut JoinSet<(NodeId, Option)>, kernel_message_tx: MessageSender, + print_tx: PrintSender, ) -> bool { println!("init_connection_via_router\r"); let routers_shuffled = { @@ -521,6 +501,7 @@ async fn init_connection_via_router( direct_conn, peer_rx, kernel_message_tx.clone(), + print_tx.clone(), )); return true; } @@ -535,8 +516,7 @@ async fn maintain_connection( mut conn: PeerConnection, mut peer_rx: UnboundedReceiver, kernel_message_tx: MessageSender, - // network_error_tx: NetworkErrorSender, - // print_tx: PrintSender, + print_tx: PrintSender, ) -> (NodeId, Option) { println!("maintain_connection\r"); loop { @@ -559,11 +539,22 @@ async fn maintain_connection( maybe_recv = peer_rx.recv() => { match maybe_recv { Some(km) => { - // TODO error handle match send_uqbar_message(&km, &mut conn).await { Ok(()) => continue, Err(e) => { - println!("net: error sending message: {}\r", e); + if e.to_string() == "message too large" { + // this will result in a Timeout if the message + // requested a response, otherwise nothing. so, + // we should always print something to terminal + let _ = print_tx.send(Printout { + verbosity: 0, + content: format!( + "net: tried to send too-large message, limit is {:.2}mb", + MESSAGE_MAX_SIZE as f64 / 1_048_576.0 + ), + }).await; + return (peer.identity.name.clone(), None) + } return (peer.identity.name.clone(), Some(km)) } } @@ -578,8 +569,7 @@ async fn maintain_connection( } } -/// match the streams -/// TODO optimize performance of this +/// cross the streams async fn maintain_passthrough(mut conn: PassthroughConnection) { println!("maintain_passthrough\r"); loop { @@ -682,9 +672,6 @@ async fn recv_connection( let noise = noise.into_transport_mode()?; println!("handshake complete, noise session received\r"); - // TODO if their handshake indicates they want us to proxy - // for them (aka act as a router for them) we can choose - // whether to do so here. Ok(( their_id.clone(), their_handshake.proxy_request, @@ -929,7 +916,6 @@ async fn handle_local_message( // someone wants to open a passthrough with us through a router! // if we are an indirect node, and source is one of our routers, // respond by attempting to init a matching passthrough. - // TODO can discriminate more here.. if our.allowed_routers.contains(&km.source.node) { let Ok((peer_id, peer_conn)) = time::timeout( TIMEOUT, @@ -961,6 +947,7 @@ async fn handle_local_message( peer_conn, peer_rx, kernel_message_tx.clone(), + print_tx.clone(), )); } else { kernel_message_tx diff --git a/src/net/types.rs b/src/net/types.rs index 4141e718..6a7737c1 100644 --- a/src/net/types.rs +++ b/src/net/types.rs @@ -72,7 +72,7 @@ pub struct PendingPassthroughConnection { // TODO upgrade from hashmaps pub type Peers = HashMap>; -pub type PKINames = HashMap; // TODO maybe U256 to String +pub type PKINames = HashMap; pub type OnchainPKI = HashMap; pub type PendingPassthroughs = HashMap<(NodeId, NodeId), PendingPassthroughConnection>; diff --git a/src/net/utils.rs b/src/net/utils.rs index 3b412ad6..25c69f52 100644 --- a/src/net/utils.rs +++ b/src/net/utils.rs @@ -151,7 +151,7 @@ pub fn validate_handshake( pub async fn send_uqbar_message(km: &KernelMessage, conn: &mut PeerConnection) -> Result<()> { let serialized = bincode::serialize(km)?; if serialized.len() > MESSAGE_MAX_SIZE as usize { - return Err(anyhow!("uqbar message too large")); + return Err(anyhow!("message too large")); } let len = (serialized.len() as u32).to_be_bytes();