nice! establishing new connections is now asynchronized

This commit is contained in:
dr-frmr 2023-11-08 19:03:38 -05:00
parent c7e37fd10d
commit d2fc275cfd
No known key found for this signature in database
5 changed files with 320 additions and 226 deletions

14
Cargo.lock generated
View File

@ -1052,6 +1052,19 @@ dependencies = [
"syn 1.0.109", "syn 1.0.109",
] ]
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown 0.14.0",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]] [[package]]
name = "data-encoding" name = "data-encoding"
version = "2.4.0" version = "2.4.0"
@ -4995,6 +5008,7 @@ dependencies = [
"chacha20poly1305 0.10.1", "chacha20poly1305 0.10.1",
"chrono", "chrono",
"crossterm", "crossterm",
"dashmap",
"digest 0.10.7", "digest 0.10.7",
"dotenv", "dotenv",
"elliptic-curve", "elliptic-curve",

View File

@ -24,6 +24,7 @@ cap-std = "2.0.0"
chacha20poly1305 = "0.10.1" chacha20poly1305 = "0.10.1"
chrono = "0.4.31" chrono = "0.4.31"
crossterm = { version = "0.26.1", features = ["event-stream", "bracketed-paste"] } crossterm = { version = "0.26.1", features = ["event-stream", "bracketed-paste"] }
dashmap = "5.5.3"
digest = "0.10" digest = "0.10"
dotenv = "0.15.0" dotenv = "0.15.0"
elliptic-curve = { version = "0.13.5", features = ["ecdh"] } elliptic-curve = { version = "0.13.5", features = ["ecdh"] }

View File

@ -1,14 +1,13 @@
use crate::net::{types::*, utils::*}; use crate::net::{types::*, utils::*};
use crate::types::*; use crate::types::*;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use dashmap::DashMap;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use ring::signature::Ed25519KeyPair; use ring::signature::Ed25519KeyPair;
use std::{ use std::{collections::HashMap, sync::Arc};
collections::{HashMap, HashSet},
sync::Arc,
};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio::task::JoinSet; use tokio::task::JoinSet;
use tokio::time; use tokio::time;
use tokio_tungstenite::{ use tokio_tungstenite::{
@ -95,18 +94,19 @@ async fn indirect_networking(
kernel_message_tx: MessageSender, kernel_message_tx: MessageSender,
network_error_tx: NetworkErrorSender, network_error_tx: NetworkErrorSender,
print_tx: PrintSender, print_tx: PrintSender,
self_message_tx: MessageSender, _self_message_tx: MessageSender,
mut message_rx: MessageReceiver, mut message_rx: MessageReceiver,
reveal_ip: bool, reveal_ip: bool,
) -> Result<()> { ) -> Result<()> {
print_debug(&print_tx, "net: starting as indirect").await; print_debug(&print_tx, "net: starting as indirect").await;
let mut pki: OnchainPKI = HashMap::new(); let pki: OnchainPKI = Arc::new(DashMap::new());
let mut peers: Peers = HashMap::new(); let peers: Peers = Arc::new(DashMap::new());
// mapping from QNS namehash to username // mapping from QNS namehash to username
let mut names: PKINames = HashMap::new(); let names: PKINames = Arc::new(DashMap::new());
let mut peer_connections = JoinSet::<(NodeId, Option<KernelMessage>)>::new(); let peer_connections = Arc::new(Mutex::new(JoinSet::<()>::new()));
// indirect-specific structure // track peers that we're already in the midst of establishing a connection with
let mut active_routers = HashSet::<NodeId>::new(); let mut pending_connections = JoinSet::<(NodeId, Result<()>)>::new();
let mut peer_message_queues = HashMap::<NodeId, Vec<KernelMessage>>::new();
loop { loop {
tokio::select! { tokio::select! {
@ -123,23 +123,23 @@ async fn indirect_networking(
&our_ip, &our_ip,
&keypair, &keypair,
km, km,
&mut peers, peers.clone(),
&mut pki, pki.clone(),
&mut peer_connections, peer_connections.clone(),
None, None,
None, None,
Some(&active_routers), names.clone(),
&mut names,
&kernel_message_tx, &kernel_message_tx,
&print_tx, &print_tx,
) )
.await { .await {
Ok(()) => {}, Ok(()) => continue,
Err(e) => { Err(e) => {
print_tx.send(Printout { print_tx.send(Printout {
verbosity: 0, verbosity: 0,
content: format!("net: error handling local message: {e}") content: format!("net: error handling local message: {e}")
}).await?; }).await?;
continue
} }
} }
} }
@ -147,117 +147,117 @@ async fn indirect_networking(
// try to send it to them // try to send it to them
else if let Some(peer) = peers.get_mut(target) { else if let Some(peer) = peers.get_mut(target) {
peer.sender.send(km)?; peer.sender.send(km)?;
continue
} }
else if let Some(peer_id) = pki.get(target) { // if we cannot send it to an existing peer-connection, need to spawn
// if the message is for a *direct* peer we don't have a connection with, // a task that will attempt to establish such a connection.
// try to establish a connection with them // if such a task already exists for that peer, we should queue the message
// here, we can *choose* to use our routers so as not to reveal // to be sent once that task completes. otherwise, it will duplicate connections.
// networking information about ourselves to the target. pending_connections.spawn(establish_new_peer_connection(
if peer_id.ws_routing.is_some() && reveal_ip { our.clone(),
print_debug(&print_tx, &format!("net: attempting to connect to {} directly", peer_id.name)).await; our_ip.clone(),
match init_connection(&our, &our_ip, peer_id, &keypair, None, false).await { keypair.clone(),
Ok(direct_conn) => { km,
save_new_peer( pki.clone(),
peer_id, names.clone(),
false, peers.clone(),
&mut peers, peer_connections.clone(),
&mut peer_connections, reveal_ip,
direct_conn, kernel_message_tx.clone(),
Some(km), network_error_tx.clone(),
&kernel_message_tx, print_tx.clone()
&print_tx, ));
).await?; }
// 2. recover the result of a pending connection and flush any message
// queue that's built up since it was spawned
Some(Ok((peer_name, result))) = pending_connections.join_next() => {
match result {
Ok(()) => {
// if we have a message queue for this peer, send it out
if let Some(queue) = peer_message_queues.remove(&peer_name) {
for km in queue {
peers.get_mut(&peer_name).unwrap().sender.send(km)?;
} }
Err(_) => { }
}
Err(_e) => {
// TODO decide if this is good behavior, but throw
// offline error for each message in this peer's queue
if let Some(queue) = peer_message_queues.remove(&peer_name) {
for km in queue {
error_offline(km, &network_error_tx).await?; error_offline(km, &network_error_tx).await?;
} }
} }
} }
// if the message is for an *indirect* peer we don't have a connection with,
// or we want to protect our node's physical networking details from non-routers,
// do some routing: in a randomized order, go through their listed routers
// on chain and try to get one of them to build a proxied connection to
// this node for you
else {
print_debug(&print_tx, &format!("net: attempting to connect to {} via router", peer_id.name)).await;
let sent = time::timeout(TIMEOUT,
init_connection_via_router(
&our,
&our_ip,
&keypair,
km.clone(),
peer_id,
&pki,
&names,
&mut peers,
&mut peer_connections,
kernel_message_tx.clone(),
print_tx.clone(),
)).await;
if !sent.unwrap_or(false) {
// none of the routers worked!
error_offline(km, &network_error_tx).await?;
}
}
}
// peer cannot be found in PKI, throw an offline error
else {
error_offline(km, &network_error_tx).await?;
}
}
// 2. deal with active connections that die by removing the associated peer
// if the peer is one of our routers, remove them from router-set
Some(Ok((dead_peer, maybe_resend))) = peer_connections.join_next() => {
print_debug(&print_tx, &format!("net: connection with {dead_peer} died")).await;
peers.remove(&dead_peer);
active_routers.remove(&dead_peer);
match maybe_resend {
None => {},
Some(km) => {
self_message_tx.send(km).await?;
}
} }
} }
// 3. periodically attempt to connect to any allowed routers that we // 3. periodically attempt to connect to any allowed routers that we
// are not connected to // are not connected to -- TODO do some exponential backoff if a router
_ = time::sleep(time::Duration::from_secs(3)) => { // is not responding.
if active_routers.len() == our.allowed_routers.len() { _ = time::sleep(time::Duration::from_secs(5)) => {
continue; tokio::spawn(connect_to_routers(
} our.clone(),
for router in &our.allowed_routers { our_ip.clone(),
if active_routers.contains(router) { keypair.clone(),
continue; pki.clone(),
} peers.clone(),
let Some(router_id) = pki.get(router) else { peer_connections.clone(),
continue; kernel_message_tx.clone(),
}; print_tx.clone()
print_debug(&print_tx, "net: attempting to connect to router").await; ));
match init_connection(&our, &our_ip, router_id, &keypair, None, true).await {
Ok(direct_conn) => {
print_tx.send(Printout {
verbosity: 0,
content: format!("now connected to router {}", router_id.name),
}).await?;
active_routers.insert(router_id.name.clone());
save_new_peer(
router_id,
false,
&mut peers,
&mut peer_connections,
direct_conn,
None,
&kernel_message_tx,
&print_tx,
).await?;
}
Err(_e) => continue,
}
}
} }
} }
} }
} }
async fn connect_to_routers(
our: Identity,
our_ip: String,
keypair: Arc<Ed25519KeyPair>,
pki: OnchainPKI,
peers: Peers,
peer_connections: Arc<Mutex<JoinSet<()>>>,
kernel_message_tx: MessageSender,
print_tx: PrintSender,
) -> Result<()> {
for router in &our.allowed_routers {
if peers.contains_key(router) {
continue;
}
let Some(router_id) = pki.get(router) else {
continue;
};
print_debug(
&print_tx,
&format!("net: attempting to connect to router {router}"),
)
.await;
match init_connection(&our, &our_ip, &router_id, &keypair, None, true).await {
Ok(direct_conn) => {
print_tx
.send(Printout {
verbosity: 0,
content: format!("connected to router {}", router_id.name),
})
.await?;
save_new_peer(
&router_id,
false,
peers.clone(),
peer_connections.clone(),
direct_conn,
None,
&kernel_message_tx,
&print_tx,
)
.await;
}
Err(_e) => continue,
}
}
Ok(())
}
async fn direct_networking( async fn direct_networking(
our: Identity, our: Identity,
our_ip: String, our_ip: String,
@ -266,18 +266,21 @@ async fn direct_networking(
kernel_message_tx: MessageSender, kernel_message_tx: MessageSender,
network_error_tx: NetworkErrorSender, network_error_tx: NetworkErrorSender,
print_tx: PrintSender, print_tx: PrintSender,
self_message_tx: MessageSender, _self_message_tx: MessageSender,
mut message_rx: MessageReceiver, mut message_rx: MessageReceiver,
) -> Result<()> { ) -> Result<()> {
print_debug(&print_tx, "net: starting as direct").await; print_debug(&print_tx, "net: starting as direct").await;
let mut pki: OnchainPKI = HashMap::new(); let pki: OnchainPKI = Arc::new(DashMap::new());
let mut peers: Peers = HashMap::new(); let peers: Peers = Arc::new(DashMap::new());
// mapping from QNS namehash to username // mapping from QNS namehash to username
let mut names: PKINames = HashMap::new(); let names: PKINames = Arc::new(DashMap::new());
let mut peer_connections = JoinSet::<(NodeId, Option<KernelMessage>)>::new(); let peer_connections = Arc::new(Mutex::new(JoinSet::<()>::new()));
// direct-specific structures // direct-specific structures
let mut forwarding_connections = JoinSet::<()>::new(); let mut forwarding_connections = JoinSet::<()>::new();
let mut pending_passthroughs: PendingPassthroughs = HashMap::new(); let mut pending_passthroughs: PendingPassthroughs = HashMap::new();
// track peers that we're already in the midst of establishing a connection with
let mut pending_connections = JoinSet::<(NodeId, Result<()>)>::new();
let mut peer_message_queues = HashMap::<NodeId, Vec<KernelMessage>>::new();
loop { loop {
tokio::select! { tokio::select! {
@ -285,95 +288,83 @@ async fn direct_networking(
// making new connections as needed // making new connections as needed
Some(km) = message_rx.recv() => { Some(km) = message_rx.recv() => {
// got a message from kernel to send out over the network // got a message from kernel to send out over the network
let target = &km.target.node;
// if the message is for us, it's either a protocol-level "hello" message, // if the message is for us, it's either a protocol-level "hello" message,
// or a debugging command issued from our terminal. handle it here: // or a debugging command issued from our terminal. handle it here:
if target == &our.name { if km.target.node == our.name {
match handle_local_message( match handle_local_message(
&our, &our,
&our_ip, &our_ip,
&keypair, &keypair,
km, km,
&mut peers, peers.clone(),
&mut pki, pki.clone(),
&mut peer_connections, peer_connections.clone(),
Some(&mut pending_passthroughs), Some(&mut pending_passthroughs),
Some(&forwarding_connections), Some(&forwarding_connections),
None, names.clone(),
&mut names,
&kernel_message_tx, &kernel_message_tx,
&print_tx, &print_tx,
) )
.await { .await {
Ok(()) => {}, Ok(()) => continue,
Err(e) => { Err(e) => {
print_tx.send(Printout { print_tx.send(Printout {
verbosity: 0, verbosity: 0,
content: format!("net: error handling local message: {}", e) content: format!("net: error handling local message: {}", e)
}).await?; }).await?;
continue;
} }
} }
} }
// if the message is for a peer we currently have a connection with, // if the message is for a peer we currently have a connection with,
// try to send it to them // try to send it to them
else if let Some(peer) = peers.get_mut(target) { else if let Some(peer) = peers.get_mut(&km.target.node) {
peer.sender.send(km)?; peer.sender.send(km)?;
continue
} }
else if let Some(peer_id) = pki.get(target) { // if we cannot send it to an existing peer-connection, need to spawn
// if the message is for a *direct* peer we don't have a connection with, // a task that will attempt to establish such a connection.
// try to establish a connection with them // if such a task already exists for that peer, we should queue the message
if peer_id.ws_routing.is_some() { // to be sent once that task completes. otherwise, it will duplicate connections.
print_debug(&print_tx, &format!("net: attempting to connect to {} directly", peer_id.name)).await; pending_connections.spawn(establish_new_peer_connection(
match init_connection(&our, &our_ip, peer_id, &keypair, None, false).await { our.clone(),
Ok(direct_conn) => { our_ip.clone(),
save_new_peer( keypair.clone(),
peer_id, km,
false, pki.clone(),
&mut peers, names.clone(),
&mut peer_connections, peers.clone(),
direct_conn, peer_connections.clone(),
Some(km), true,
&kernel_message_tx, kernel_message_tx.clone(),
&print_tx, network_error_tx.clone(),
).await?; print_tx.clone()
));
}
// 2. recover the result of a pending connection and flush any message
// queue that's built up since it was spawned
Some(Ok((peer_name, result))) = pending_connections.join_next() => {
match result {
Ok(()) => {
// if we have a message queue for this peer, send it out
if let Some(queue) = peer_message_queues.remove(&peer_name) {
for km in queue {
peers.get_mut(&peer_name).unwrap().sender.send(km)?;
} }
Err(_) => { }
}
Err(_e) => {
// TODO decide if this is good behavior, but throw
// offline error for each message in this peer's queue
if let Some(queue) = peer_message_queues.remove(&peer_name) {
for km in queue {
error_offline(km, &network_error_tx).await?; error_offline(km, &network_error_tx).await?;
} }
} }
} }
// if the message is for an *indirect* peer we don't have a connection with,
// do some routing: in a randomized order, go through their listed routers
// on chain and try to get one of them to build a proxied connection to
// this node for you
else {
print_debug(&print_tx, &format!("net: attempting to connect to {} via router", peer_id.name)).await;
let sent = time::timeout(TIMEOUT,
init_connection_via_router(
&our,
&our_ip,
&keypair,
km.clone(),
peer_id,
&pki,
&names,
&mut peers,
&mut peer_connections,
kernel_message_tx.clone(),
print_tx.clone(),
)).await;
if !sent.unwrap_or(false) {
// none of the routers worked!
error_offline(km, &network_error_tx).await?;
}
}
}
// peer cannot be found in PKI, throw an offline error
else {
error_offline(km, &network_error_tx).await?;
} }
} }
// 2. receive incoming TCP connections // 3. receive incoming TCP connections
Ok((stream, _socket_addr)) = tcp.accept() => { Ok((stream, _socket_addr)) = tcp.accept() => {
// TODO we can perform some amount of validation here // TODO we can perform some amount of validation here
// to prevent some amount of potential DDoS attacks. // to prevent some amount of potential DDoS attacks.
@ -410,13 +401,13 @@ async fn direct_networking(
save_new_peer( save_new_peer(
&peer_id, &peer_id,
routing_for, routing_for,
&mut peers, peers.clone(),
&mut peer_connections, peer_connections.clone(),
peer_conn, peer_conn,
None, None,
&kernel_message_tx, &kernel_message_tx,
&print_tx &print_tx
).await?; ).await;
} }
Connection::Passthrough(passthrough_conn) => { Connection::Passthrough(passthrough_conn) => {
forwarding_connections.spawn(maintain_passthrough( forwarding_connections.spawn(maintain_passthrough(
@ -435,18 +426,98 @@ async fn direct_networking(
Err(_) => {} Err(_) => {}
} }
} }
// 3. deal with active connections that die by removing the associated peer }
Some(Ok((dead_peer, maybe_resend))) = peer_connections.join_next() => { }
print_debug(&print_tx, &format!("net: connection with {dead_peer} died")).await; }
peers.remove(&dead_peer);
match maybe_resend { async fn establish_new_peer_connection(
None => {}, our: Identity,
Some(km) => { our_ip: String,
self_message_tx.send(km).await?; keypair: Arc<Ed25519KeyPair>,
} km: KernelMessage,
pki: OnchainPKI,
names: PKINames,
peers: Peers,
peer_connections: Arc<Mutex<JoinSet<()>>>,
reveal_ip: bool,
kernel_message_tx: MessageSender,
network_error_tx: NetworkErrorSender,
print_tx: PrintSender,
) -> (NodeId, Result<()>) {
if let Some(peer_id) = pki.get(&km.target.node) {
// if the message is for a *direct* peer we don't have a connection with,
// try to establish a connection with them
// here, we can *choose* to use our routers so as not to reveal
// networking information about ourselves to the target.
if peer_id.ws_routing.is_some() && reveal_ip {
print_debug(
&print_tx,
&format!("net: attempting to connect to {} directly", peer_id.name),
)
.await;
match init_connection(&our, &our_ip, &peer_id, &keypair, None, false).await {
Ok(direct_conn) => {
save_new_peer(
&peer_id,
false,
peers,
peer_connections,
direct_conn,
Some(km),
&kernel_message_tx,
&print_tx,
)
.await;
(peer_id.name.clone(), Ok(()))
}
Err(_) => {
let _ = error_offline(km, &network_error_tx).await;
(peer_id.name.clone(), Err(anyhow!("failed to connect to peer")))
} }
} }
} }
// if the message is for an *indirect* peer we don't have a connection with,
// or we want to protect our node's physical networking details from non-routers,
// do some routing: in a randomized order, go through their listed routers
// on chain and try to get one of them to build a proxied connection to
// this node for you
else {
print_debug(
&print_tx,
&format!("net: attempting to connect to {} via router", peer_id.name),
)
.await;
let sent = time::timeout(
TIMEOUT,
init_connection_via_router(
&our,
&our_ip,
&keypair,
km.clone(),
&peer_id,
&pki,
&names,
peers,
peer_connections,
kernel_message_tx.clone(),
print_tx.clone(),
),
)
.await;
if sent.unwrap_or(false) {
(peer_id.name.clone(), Ok(()))
} else {
// none of the routers worked!
let _ = error_offline(km, &network_error_tx).await;
(peer_id.name.clone(), Err(anyhow!("failed to connect to peer")))
}
}
}
// peer cannot be found in PKI, throw an offline error
else {
let peer_name = km.target.node.clone();
let _ = error_offline(km, &network_error_tx).await;
(peer_name, Err(anyhow!("failed to connect to peer")))
} }
} }
@ -458,8 +529,8 @@ async fn init_connection_via_router(
peer_id: &Identity, peer_id: &Identity,
pki: &OnchainPKI, pki: &OnchainPKI,
names: &PKINames, names: &PKINames,
peers: &mut Peers, peers: Peers,
peer_connections: &mut JoinSet<(NodeId, Option<KernelMessage>)>, peer_connections: Arc<Mutex<JoinSet<()>>>,
kernel_message_tx: MessageSender, kernel_message_tx: MessageSender,
print_tx: PrintSender, print_tx: PrintSender,
) -> bool { ) -> bool {
@ -472,13 +543,13 @@ async fn init_connection_via_router(
let Some(router_name) = names.get(router_namehash) else { let Some(router_name) = names.get(router_namehash) else {
continue; continue;
}; };
let router_id = match pki.get(router_name) { let router_id = match pki.get(router_name.as_str()) {
None => continue, None => continue,
Some(id) => id, Some(id) => id,
}; };
match init_connection(&our, &our_ip, peer_id, &keypair, Some(router_id), false).await { match init_connection(&our, &our_ip, peer_id, &keypair, Some(&router_id), false).await {
Ok(direct_conn) => { Ok(direct_conn) => {
return save_new_peer( save_new_peer(
peer_id, peer_id,
false, false,
peers, peers,
@ -488,8 +559,8 @@ async fn init_connection_via_router(
&kernel_message_tx, &kernel_message_tx,
&print_tx, &print_tx,
) )
.await .await;
.is_ok() return true;
} }
Err(_) => continue, Err(_) => continue,
} }
@ -562,7 +633,7 @@ async fn recv_connection(
noise noise
.get_remote_static() .get_remote_static()
.ok_or(anyhow!("noise error: missing remote pubkey"))?, .ok_or(anyhow!("noise error: missing remote pubkey"))?,
their_id, &their_id,
)?; )?;
Ok(( Ok((
@ -641,7 +712,7 @@ async fn recv_connection_via_router(
noise noise
.get_remote_static() .get_remote_static()
.ok_or(anyhow!("noise error: missing remote pubkey"))?, .ok_or(anyhow!("noise error: missing remote pubkey"))?,
their_id, &their_id,
)?; )?;
Ok(( Ok((
@ -746,13 +817,12 @@ async fn handle_local_message(
our_ip: &str, our_ip: &str,
keypair: &Ed25519KeyPair, keypair: &Ed25519KeyPair,
km: KernelMessage, km: KernelMessage,
peers: &mut Peers, peers: Peers,
pki: &mut OnchainPKI, pki: OnchainPKI,
peer_connections: &mut JoinSet<(NodeId, Option<KernelMessage>)>, peer_connections: Arc<Mutex<JoinSet<()>>>,
pending_passthroughs: Option<&mut PendingPassthroughs>, pending_passthroughs: Option<&mut PendingPassthroughs>,
forwarding_connections: Option<&JoinSet<()>>, forwarding_connections: Option<&JoinSet<()>>,
active_routers: Option<&HashSet<NodeId>>, names: PKINames,
names: &mut PKINames,
kernel_message_tx: &MessageSender, kernel_message_tx: &MessageSender,
print_tx: &PrintSender, print_tx: &PrintSender,
) -> Result<()> { ) -> Result<()> {
@ -801,7 +871,7 @@ async fn handle_local_message(
let (peer_id, peer_conn) = time::timeout( let (peer_id, peer_conn) = time::timeout(
TIMEOUT, TIMEOUT,
recv_connection_via_router( recv_connection_via_router(
our, our_ip, &from, pki, keypair, &router_id, our, our_ip, &from, &pki, keypair, &router_id,
), ),
) )
.await??; .await??;
@ -815,7 +885,7 @@ async fn handle_local_message(
&kernel_message_tx, &kernel_message_tx,
&print_tx, &print_tx,
) )
.await?; .await;
Ok(NetResponses::Accepted(from.clone())) Ok(NetResponses::Accepted(from.clone()))
} else { } else {
Ok(NetResponses::Rejected(from.clone())) Ok(NetResponses::Rejected(from.clone()))
@ -887,7 +957,13 @@ async fn handle_local_message(
let mut printout = String::new(); let mut printout = String::new();
match std::str::from_utf8(&ipc) { match std::str::from_utf8(&ipc) {
Ok("peers") => { Ok("peers") => {
printout.push_str(&format!("{:#?}", peers.keys())); printout.push_str(&format!(
"{:#?}",
peers
.iter()
.map(|p| p.identity.name.clone())
.collect::<Vec<_>>()
));
} }
Ok("pki") => { Ok("pki") => {
printout.push_str(&format!("{:#?}", pki)); printout.push_str(&format!("{:#?}", pki));
@ -898,7 +974,7 @@ async fn handle_local_message(
Ok("diagnostics") => { Ok("diagnostics") => {
printout.push_str(&format!("our Identity: {:#?}\r\n", our)); printout.push_str(&format!("our Identity: {:#?}\r\n", our));
printout.push_str(&format!("we have connections with peers:\r\n")); printout.push_str(&format!("we have connections with peers:\r\n"));
for peer in peers.values() { for peer in peers.iter() {
printout.push_str(&format!( printout.push_str(&format!(
" {}, routing_for={}\r\n", " {}, routing_for={}\r\n",
peer.identity.name, peer.routing_for, peer.identity.name, peer.routing_for,
@ -907,7 +983,7 @@ async fn handle_local_message(
printout.push_str(&format!("we have {} entries in the PKI\r\n", pki.len())); printout.push_str(&format!("we have {} entries in the PKI\r\n", pki.len()));
printout.push_str(&format!( printout.push_str(&format!(
"we have {} open peer connections\r\n", "we have {} open peer connections\r\n",
peer_connections.len() peer_connections.lock().await.len()
)); ));
if pending_passthroughs.is_some() { if pending_passthroughs.is_some() {
printout.push_str(&format!( printout.push_str(&format!(
@ -921,12 +997,6 @@ async fn handle_local_message(
forwarding_connections.unwrap().len() forwarding_connections.unwrap().len()
)); ));
} }
if active_routers.is_some() {
printout.push_str(&format!(
"we have {} active routers\r\n",
active_routers.unwrap().len()
));
}
} }
_ => { _ => {
match rmp_serde::from_slice::<NetActions>(&ipc)? { match rmp_serde::from_slice::<NetActions>(&ipc)? {

View File

@ -1,7 +1,9 @@
use crate::types::*; use crate::types::*;
use dashmap::DashMap;
use futures::stream::{SplitSink, SplitStream}; use futures::stream::{SplitSink, SplitStream};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream};
@ -70,10 +72,9 @@ pub struct PendingPassthroughConnection {
pub read_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>, pub read_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
} }
// TODO upgrade from hashmaps pub type Peers = Arc<DashMap<String, Peer>>;
pub type Peers = HashMap<String, Peer>; pub type PKINames = Arc<DashMap<String, NodeId>>;
pub type PKINames = HashMap<String, NodeId>; pub type OnchainPKI = Arc<DashMap<String, Identity>>;
pub type OnchainPKI = HashMap<String, Identity>;
pub type PendingPassthroughs = HashMap<(NodeId, NodeId), PendingPassthroughConnection>; pub type PendingPassthroughs = HashMap<(NodeId, NodeId), PendingPassthroughConnection>;
#[derive(Clone)] #[derive(Clone)]

View File

@ -5,8 +5,10 @@ use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use ring::signature::{self, Ed25519KeyPair}; use ring::signature::{self, Ed25519KeyPair};
use snow::params::NoiseParams; use snow::params::NoiseParams;
use std::sync::Arc;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::Mutex;
use tokio::task::JoinSet; use tokio::task::JoinSet;
use tokio::time::timeout; use tokio::time::timeout;
use tokio_tungstenite::{connect_async, tungstenite, MaybeTlsStream, WebSocketStream}; use tokio_tungstenite::{connect_async, tungstenite, MaybeTlsStream, WebSocketStream};
@ -20,13 +22,13 @@ lazy_static::lazy_static! {
pub async fn save_new_peer( pub async fn save_new_peer(
identity: &Identity, identity: &Identity,
routing_for: bool, routing_for: bool,
peers: &mut Peers, peers: Peers,
peer_connections: &mut JoinSet<(String, Option<KernelMessage>)>, peer_connections: Arc<Mutex<JoinSet<()>>>,
conn: PeerConnection, conn: PeerConnection,
km: Option<KernelMessage>, km: Option<KernelMessage>,
kernel_message_tx: &MessageSender, kernel_message_tx: &MessageSender,
print_tx: &PrintSender, print_tx: &PrintSender,
) -> Result<()> { ) {
print_debug( print_debug(
&print_tx, &print_tx,
&format!("net: saving new peer {}", identity.name), &format!("net: saving new peer {}", identity.name),
@ -34,7 +36,7 @@ pub async fn save_new_peer(
.await; .await;
let (peer_tx, peer_rx) = unbounded_channel::<KernelMessage>(); let (peer_tx, peer_rx) = unbounded_channel::<KernelMessage>();
if km.is_some() { if km.is_some() {
peer_tx.send(km.unwrap())? peer_tx.send(km.unwrap()).unwrap()
} }
let peer = Peer { let peer = Peer {
identity: identity.clone(), identity: identity.clone(),
@ -42,24 +44,25 @@ pub async fn save_new_peer(
sender: peer_tx, sender: peer_tx,
}; };
peers.insert(identity.name.clone(), peer.clone()); peers.insert(identity.name.clone(), peer.clone());
peer_connections.spawn(maintain_connection( peer_connections.lock().await.spawn(maintain_connection(
peer, peer,
peers,
conn, conn,
peer_rx, peer_rx,
kernel_message_tx.clone(), kernel_message_tx.clone(),
print_tx.clone(), print_tx.clone(),
)); ));
Ok(())
} }
/// should always be spawned on its own task /// should always be spawned on its own task
pub async fn maintain_connection( pub async fn maintain_connection(
peer: Peer, peer: Peer,
peers: Peers,
mut conn: PeerConnection, mut conn: PeerConnection,
mut peer_rx: UnboundedReceiver<KernelMessage>, mut peer_rx: UnboundedReceiver<KernelMessage>,
kernel_message_tx: MessageSender, kernel_message_tx: MessageSender,
print_tx: PrintSender, print_tx: PrintSender,
) -> (NodeId, Option<KernelMessage>) { ) {
let peer_name = peer.identity.name; let peer_name = peer.identity.name;
let mut last_message = std::time::Instant::now(); let mut last_message = std::time::Instant::now();
loop { loop {
@ -128,7 +131,10 @@ pub async fn maintain_connection(
} }
let mut conn = conn.write_stream.reunite(conn.read_stream).unwrap(); let mut conn = conn.write_stream.reunite(conn.read_stream).unwrap();
let _ = conn.close(None).await; let _ = conn.close(None).await;
return (peer_name, None);
print_debug(&print_tx, &format!("net: connection with {peer_name} died")).await;
peers.remove(&peer_name);
return;
} }
/// cross the streams /// cross the streams
@ -199,6 +205,8 @@ pub async fn create_passthrough(
let target_peer = peers let target_peer = peers
.get(&to_name) .get(&to_name)
.ok_or(anyhow!("can't route to that indirect node"))?; .ok_or(anyhow!("can't route to that indirect node"))?;
// TODO: if we're not router for an indirect node, we should be able to
// *use one of their routers* to create a doubly-indirect passthrough.
if !target_peer.routing_for { if !target_peer.routing_for {
return Err(anyhow!("we don't route for that indirect node")); return Err(anyhow!("we don't route for that indirect node"));
} }
@ -460,7 +468,7 @@ fn strip_0x(s: &str) -> String {
pub async fn print_debug(print_tx: &PrintSender, content: &str) { pub async fn print_debug(print_tx: &PrintSender, content: &str) {
let _ = print_tx let _ = print_tx
.send(Printout { .send(Printout {
verbosity: 0, verbosity: 1,
content: content.into(), content: content.into(),
}) })
.await; .await;