This commit is contained in:
dr-frmr 2023-10-31 16:27:41 -04:00
parent d1634a681b
commit b4ffb58831
No known key found for this signature in database
4 changed files with 80 additions and 87 deletions

View File

@ -35,6 +35,11 @@ const CAP_CHANNEL_CAPACITY: usize = 1_000;
const VERSION: &str = env!("CARGO_PKG_VERSION");
/// This can and should be an environment variable / setting. It configures networking
/// such that indirect nodes always use routers, even when target is a direct node,
/// such that only their routers can ever see their physical networking details.
const REVEAL_IP: bool = true;
#[tokio::main]
async fn main() {
// For use with https://github.com/tokio-rs/console
@ -263,6 +268,7 @@ async fn main() {
print_sender.clone(),
net_message_sender,
net_message_receiver,
REVEAL_IP,
));
tasks.spawn(filesystem::fs_sender(
our.name.clone(),
@ -320,10 +326,10 @@ async fn main() {
"\x1b[38;5;196muh oh, a kernel process crashed: {}\x1b[0m",
e
)
// TODO restart the task
// TODO restart the task?
} else {
format!("what does this mean???")
// TODO restart the task
// TODO restart the task?
}
}
quit = terminal::terminal(

View File

@ -20,7 +20,8 @@ mod utils;
// only used in connection initialization, otherwise, nacks and Responses are only used for "timeouts"
const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
// 10 MB -- TODO analyze as desired, apps can always chunk data into many messages
/// 10 MB -- TODO analyze as desired, apps can always chunk data into many messages
/// note that this only applies to cross-network messages, not local ones.
const MESSAGE_MAX_SIZE: u32 = 10_485_800;
/// Entry point from the main kernel task. Runs forever, spawns listener and sender tasks.
@ -33,6 +34,7 @@ pub async fn networking(
print_tx: PrintSender,
self_message_tx: MessageSender,
message_rx: MessageReceiver,
reveal_ip: bool,
) -> Result<()> {
println!("networking!\r");
println!("our identity: {:#?}\r", our);
@ -49,6 +51,7 @@ pub async fn networking(
print_tx,
self_message_tx,
message_rx,
reveal_ip,
)
.await
}
@ -95,6 +98,7 @@ async fn indirect_networking(
print_tx: PrintSender,
self_message_tx: MessageSender,
mut message_rx: MessageReceiver,
reveal_ip: bool,
) -> Result<()> {
println!("indirect_networking\r");
let mut pki: OnchainPKI = HashMap::new();
@ -105,21 +109,6 @@ async fn indirect_networking(
let mut peer_connections = JoinSet::<(NodeId, Option<KernelMessage>)>::new();
let mut active_routers = HashSet::<NodeId>::new();
// before opening up the main loop, go through our allowed routers
// and attempt to connect to all of them, saving the successfully
// connected-to ones in our router-set
connect_to_routers(
&our,
&our_ip,
&keypair,
&mut active_routers,
&pki,
&mut peers,
&mut peer_connections,
kernel_message_tx.clone(),
)
.await;
loop {
tokio::select! {
// 1. receive messages from kernel and send out over connections,
@ -163,9 +152,9 @@ async fn indirect_networking(
else if let Some(peer_id) = pki.get(target) {
// if the message is for a *direct* peer we don't have a connection with,
// try to establish a connection with them
// TODO: here, we can *choose* to use our routers so as not to reveal
// here, we can *choose* to use our routers so as not to reveal
// networking information about ourselves to the target.
if peer_id.ws_routing.is_some() {
if peer_id.ws_routing.is_some() && reveal_ip {
match init_connection(&our, &our_ip, peer_id, &keypair, None, false).await {
Ok((peer_name, direct_conn)) => {
let (peer_tx, peer_rx) = unbounded_channel::<KernelMessage>();
@ -181,6 +170,7 @@ async fn indirect_networking(
direct_conn,
peer_rx,
kernel_message_tx.clone(),
print_tx.clone(),
));
}
Err(e) => {
@ -190,6 +180,7 @@ async fn indirect_networking(
}
}
// if the message is for an *indirect* peer we don't have a connection with,
// or we want to protect our node's physical networking details from non-routers,
// do some routing: in a randomized order, go through their listed routers
// on chain and try to get one of them to build a proxied connection to
// this node for you
@ -205,7 +196,8 @@ async fn indirect_networking(
&names,
&mut peers,
&mut peer_connections,
kernel_message_tx.clone()
kernel_message_tx.clone(),
print_tx.clone(),
)).await;
if !sent.unwrap_or(false) {
// none of the routers worked!
@ -234,62 +226,43 @@ async fn indirect_networking(
// 3. periodically attempt to connect to any allowed routers that we
// are not connected to
_ = time::sleep(time::Duration::from_secs(3)) => {
connect_to_routers(
&our,
&our_ip,
&keypair,
&mut active_routers,
&pki,
&mut peers,
&mut peer_connections,
kernel_message_tx.clone(),
)
.await;
if active_routers.len() == our.allowed_routers.len() {
continue;
}
for router in &our.allowed_routers {
if active_routers.contains(router) {
continue;
}
let Some(router_id) = pki.get(router) else {
continue;
};
match init_connection(&our, &our_ip, router_id, &keypair, None, true).await {
Ok((peer_name, direct_conn)) => {
let (peer_tx, peer_rx) = unbounded_channel::<KernelMessage>();
let peer = Arc::new(Peer {
identity: router_id.clone(),
routing_for: false,
sender: peer_tx,
});
println!("net: connected to router {}\r", peer_name);
peers.insert(peer_name.clone(), peer.clone());
active_routers.insert(peer_name);
peer_connections.spawn(maintain_connection(
peer,
direct_conn,
peer_rx,
kernel_message_tx.clone(),
print_tx.clone(),
));
}
Err(_e) => continue,
}
}
}
}
}
}
async fn connect_to_routers(
our: &Identity,
our_ip: &str,
keypair: &Ed25519KeyPair,
active_routers: &mut HashSet<NodeId>,
pki: &OnchainPKI,
peers: &mut Peers,
peer_connections: &mut JoinSet<(NodeId, Option<KernelMessage>)>,
kernel_message_tx: MessageSender,
) {
for router in &our.allowed_routers {
if active_routers.contains(router) {
continue;
}
let Some(router_id) = pki.get(router) else {
continue;
};
match init_connection(our, our_ip, router_id, keypair, None, true).await {
Ok((peer_name, direct_conn)) => {
let (peer_tx, peer_rx) = unbounded_channel::<KernelMessage>();
let peer = Arc::new(Peer {
identity: router_id.clone(),
routing_for: false,
sender: peer_tx,
});
println!("net: connected to router {}\r", peer_name);
peers.insert(peer_name.clone(), peer.clone());
active_routers.insert(peer_name);
peer_connections.spawn(maintain_connection(
peer,
direct_conn,
peer_rx,
kernel_message_tx.clone(),
));
}
Err(_e) => continue,
}
}
}
async fn direct_networking(
our: Identity,
our_ip: String,
@ -370,6 +343,7 @@ async fn direct_networking(
direct_conn,
peer_rx,
kernel_message_tx.clone(),
print_tx.clone(),
));
}
Err(e) => {
@ -394,7 +368,8 @@ async fn direct_networking(
&names,
&mut peers,
&mut peer_connections,
kernel_message_tx.clone()
kernel_message_tx.clone(),
print_tx.clone(),
)).await;
if !sent.unwrap_or(false) {
// none of the routers worked!
@ -431,8 +406,11 @@ async fn direct_networking(
continue;
}
};
// if conn is direct, add peer
// if passthrough, add to our forwarding connections joinset
// TODO if their handshake indicates they want us to proxy
// for them (aka act as a router for them) we can choose
// whether to do so here!
// if conn is direct, add peer. if passthrough, add to our
// forwarding connections joinset
match conn {
Connection::Peer(peer_conn) => {
let (peer_tx, peer_rx) = unbounded_channel::<KernelMessage>();
@ -447,6 +425,7 @@ async fn direct_networking(
peer_conn,
peer_rx,
kernel_message_tx.clone(),
print_tx.clone(),
));
}
Connection::Passthrough(passthrough_conn) => {
@ -491,6 +470,7 @@ async fn init_connection_via_router(
peers: &mut Peers,
peer_connections: &mut JoinSet<(NodeId, Option<KernelMessage>)>,
kernel_message_tx: MessageSender,
print_tx: PrintSender,
) -> bool {
println!("init_connection_via_router\r");
let routers_shuffled = {
@ -521,6 +501,7 @@ async fn init_connection_via_router(
direct_conn,
peer_rx,
kernel_message_tx.clone(),
print_tx.clone(),
));
return true;
}
@ -535,8 +516,7 @@ async fn maintain_connection(
mut conn: PeerConnection,
mut peer_rx: UnboundedReceiver<KernelMessage>,
kernel_message_tx: MessageSender,
// network_error_tx: NetworkErrorSender,
// print_tx: PrintSender,
print_tx: PrintSender,
) -> (NodeId, Option<KernelMessage>) {
println!("maintain_connection\r");
loop {
@ -559,11 +539,22 @@ async fn maintain_connection(
maybe_recv = peer_rx.recv() => {
match maybe_recv {
Some(km) => {
// TODO error handle
match send_uqbar_message(&km, &mut conn).await {
Ok(()) => continue,
Err(e) => {
println!("net: error sending message: {}\r", e);
if e.to_string() == "message too large" {
// this will result in a Timeout if the message
// requested a response, otherwise nothing. so,
// we should always print something to terminal
let _ = print_tx.send(Printout {
verbosity: 0,
content: format!(
"net: tried to send too-large message, limit is {:.2}mb",
MESSAGE_MAX_SIZE as f64 / 1_048_576.0
),
}).await;
return (peer.identity.name.clone(), None)
}
return (peer.identity.name.clone(), Some(km))
}
}
@ -578,8 +569,7 @@ async fn maintain_connection(
}
}
/// match the streams
/// TODO optimize performance of this
/// cross the streams
async fn maintain_passthrough(mut conn: PassthroughConnection) {
println!("maintain_passthrough\r");
loop {
@ -682,9 +672,6 @@ async fn recv_connection(
let noise = noise.into_transport_mode()?;
println!("handshake complete, noise session received\r");
// TODO if their handshake indicates they want us to proxy
// for them (aka act as a router for them) we can choose
// whether to do so here.
Ok((
their_id.clone(),
their_handshake.proxy_request,
@ -929,7 +916,6 @@ async fn handle_local_message(
// someone wants to open a passthrough with us through a router!
// if we are an indirect node, and source is one of our routers,
// respond by attempting to init a matching passthrough.
// TODO can discriminate more here..
if our.allowed_routers.contains(&km.source.node) {
let Ok((peer_id, peer_conn)) = time::timeout(
TIMEOUT,
@ -961,6 +947,7 @@ async fn handle_local_message(
peer_conn,
peer_rx,
kernel_message_tx.clone(),
print_tx.clone(),
));
} else {
kernel_message_tx

View File

@ -72,7 +72,7 @@ pub struct PendingPassthroughConnection {
// TODO upgrade from hashmaps
pub type Peers = HashMap<String, Arc<Peer>>;
pub type PKINames = HashMap<String, NodeId>; // TODO maybe U256 to String
pub type PKINames = HashMap<String, NodeId>;
pub type OnchainPKI = HashMap<String, Identity>;
pub type PendingPassthroughs = HashMap<(NodeId, NodeId), PendingPassthroughConnection>;

View File

@ -151,7 +151,7 @@ pub fn validate_handshake(
pub async fn send_uqbar_message(km: &KernelMessage, conn: &mut PeerConnection) -> Result<()> {
let serialized = bincode::serialize(km)?;
if serialized.len() > MESSAGE_MAX_SIZE as usize {
return Err(anyhow!("uqbar message too large"));
return Err(anyhow!("message too large"));
}
let len = (serialized.len() as u32).to_be_bytes();