This commit is contained in:
dr-frmr 2023-10-30 22:00:20 -04:00
parent f04b72582c
commit e25a610fd1
No known key found for this signature in database

View File

@ -2,6 +2,7 @@ use crate::types::*;
use anyhow::{anyhow, Result};
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use rand::seq::SliceRandom;
use ring::signature::{self, Ed25519KeyPair};
use serde::{Deserialize, Serialize};
use snow::params::NoiseParams;
@ -45,17 +46,43 @@ struct QnsUpdate {
#[derive(Debug, Deserialize, Serialize)]
struct HandshakePayload {
pub name: NodeId,
// signature is created by their networking key, of their static key
// someone could reuse this signature, but then they will be unable
// to encrypt messages to us.
pub signature: Vec<u8>,
pub protocol_version: u8,
}
struct OpenConnection {
#[derive(Debug, Deserialize, Serialize)]
struct RoutingRequest {
pub name: NodeId,
// signature is created by their networking key, of the [target, router name].concat()
// someone could reuse this signature, and TODO need to find a way
// to make that useless in this routing request case.
pub signature: Vec<u8>,
pub target: NodeId,
pub protocol_version: u8,
}
enum Connection {
Peer(PeerConnection),
Passthrough(PassthroughConnection),
}
struct PeerConnection {
pub noise: snow::TransportState,
pub buf: Vec<u8>,
pub write_stream: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
pub read_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
}
struct PassthroughConnection {
pub write_stream_1: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
pub read_stream_1: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
pub write_stream_2: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
pub read_stream_2: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
}
type Peers = HashMap<String, Arc<Peer>>;
type PKINames = HashMap<String, NodeId>; // TODO maybe U256 to String
type OnchainPKI = HashMap<String, Identity>;
@ -136,13 +163,12 @@ async fn direct_networking(
// mapping from QNS namehash to username
let mut names: PKINames = HashMap::new();
let mut active_connections = JoinSet::<(String, Option<KernelMessage>)>::new();
let mut peer_connections = JoinSet::<(NodeId, Option<KernelMessage>)>::new();
let mut forwarding_connections = JoinSet::<NodeId>::new();
// 1. receive messages from kernel and send out over our connections
// 2. receive incoming TCP connections
// 3. deal with active connections that die by removing the associated peer
loop {
tokio::select! {
// 1. receive messages from kernel and send out over our connections
Some(km) = message_rx.recv() => {
// got a message from kernel to send out over the network
let target = &km.target.node;
@ -177,8 +203,8 @@ async fn direct_networking(
// if the message is for a *direct* peer we don't have a connection with,
// try to establish a connection with them
if peer_id.ws_routing.is_some() {
match init_connection(&our, &our_ip, peer_id, &keypair).await {
Ok((peer_name, conn)) => {
match init_connection(&our, &our_ip, peer_id, &keypair, None).await {
Ok((peer_name, direct_conn)) => {
let (peer_tx, peer_rx) = unbounded_channel::<KernelMessage>();
let peer = Arc::new(Peer {
identity: peer_id.clone(),
@ -186,23 +212,40 @@ async fn direct_networking(
});
peers.insert(peer_name, peer.clone());
peer.sender.send(km)?;
active_connections.spawn(maintain_connection(
peer_connections.spawn(maintain_connection(
peer,
conn,
direct_conn,
peer_rx,
kernel_message_tx.clone(),
));
}
Err(e) => {
println!("net: error initializing connection: {}", e);
println!("net: error initializing connection: {}\r", e);
error_offline(km, &network_error_tx).await?;
}
}
}
// if the message is for an *indirect* peer we don't have a connection with,
// do some routing and shit
// do some routing: in a randomized order, go through their listed routers
// on chain and try to get one of them to build a proxied connection to
// this node for you
else {
todo!()
let sent = init_connection_via_router(
&our,
&our_ip,
&keypair,
km.clone(),
peer_id,
&pki,
&mut peers,
&mut peer_connections,
kernel_message_tx.clone()
).await;
if !sent {
// none of the routers worked!
println!("net: error initializing routed connection\r");
error_offline(km, &network_error_tx).await?;
}
}
}
// peer cannot be found in PKI, throw an offline error
@ -210,32 +253,45 @@ async fn direct_networking(
error_offline(km, &network_error_tx).await?;
}
}
// 2. receive incoming TCP connections
Ok((stream, _socket_addr)) = tcp.accept() => {
// TODO we can perform some amount of validation here
// to prevent some amount of potential DDoS attacks.
// can also block based on socket_addr
match accept_async(MaybeTlsStream::Plain(stream)).await {
Ok(websocket) => {
let (peer_name, conn) = recv_connection(&our, &pki, &keypair, websocket).await?;
let (peer_id, conn) = recv_connection(&our, &our_ip, &pki, &keypair, websocket).await?;
let (peer_tx, peer_rx) = unbounded_channel::<KernelMessage>();
let peer = Arc::new(Peer {
identity: pki.get(&peer_name).ok_or(anyhow!("jej"))?.clone(),
identity: peer_id,
sender: peer_tx,
});
peers.insert(peer_name, peer.clone());
println!("received incoming connection\r");
active_connections.spawn(maintain_connection(
peer,
conn,
peer_rx,
kernel_message_tx.clone(),
));
// if conn is direct, add peer
// if passthrough, add to our forwarding connections joinset
match conn {
Connection::Peer(peer_conn) => {
peers.insert(peer.identity.name.clone(), peer.clone());
peer_connections.spawn(maintain_connection(
peer,
peer_conn,
peer_rx,
kernel_message_tx.clone(),
));
}
Connection::Passthrough(passthrough_conn) => {
forwarding_connections.spawn(maintain_passthrough(
peer,
passthrough_conn,
));
}
}
}
// ignore connections we failed to accept...?
Err(_) => {}
}
}
Some(Ok((dead_peer, maybe_resend))) = active_connections.join_next() => {
// 3. deal with active connections that die by removing the associated peer
Some(Ok((dead_peer, maybe_resend))) = peer_connections.join_next() => {
peers.remove(&dead_peer);
match maybe_resend {
None => {},
@ -248,14 +304,58 @@ async fn direct_networking(
}
}
async fn init_connection_via_router(
our: &Identity,
our_ip: &str,
keypair: &Ed25519KeyPair,
km: KernelMessage,
peer_id: &Identity,
pki: &OnchainPKI,
peers: &mut Peers,
peer_connections: &mut JoinSet<(NodeId, Option<KernelMessage>)>,
kernel_message_tx: MessageSender,
) -> bool {
let routers_shuffled = {
let mut routers = peer_id.allowed_routers.clone();
routers.shuffle(&mut rand::thread_rng());
routers
};
for router in routers_shuffled {
let router_id = match pki.get(&router) {
None => continue,
Some(id) => id,
};
match init_connection(&our, &our_ip, peer_id, &keypair, Some(router_id)).await {
Ok((peer_name, direct_conn)) => {
let (peer_tx, peer_rx) = unbounded_channel::<KernelMessage>();
let peer = Arc::new(Peer {
identity: peer_id.clone(),
sender: peer_tx,
});
peers.insert(peer_name, peer.clone());
peer.sender.send(km).unwrap();
peer_connections.spawn(maintain_connection(
peer,
direct_conn,
peer_rx,
kernel_message_tx.clone(),
));
return true;
}
Err(_) => continue,
}
}
return false;
}
async fn maintain_connection(
peer: Arc<Peer>,
mut conn: OpenConnection,
mut conn: PeerConnection,
mut peer_rx: UnboundedReceiver<KernelMessage>,
kernel_message_tx: MessageSender,
// network_error_tx: NetworkErrorSender,
// print_tx: PrintSender,
) -> (String, Option<KernelMessage>) {
) -> (NodeId, Option<KernelMessage>) {
println!("maintain_connection\r");
loop {
tokio::select! {
@ -296,19 +396,69 @@ async fn maintain_connection(
}
}
/// match the streams
/// TODO optimize performance of this
async fn maintain_passthrough(peer: Arc<Peer>, mut conn: PassthroughConnection) -> NodeId {
println!("maintain_passthrough\r");
loop {
tokio::select! {
maybe_recv = conn.read_stream_1.next() => {
match maybe_recv {
Some(Ok(msg)) => {
conn.write_stream_2.send(msg).await.expect("net error: fatal: kernel died");
}
_ => {
println!("net: passthrough broke\r");
return peer.identity.name.clone()
}
}
},
maybe_recv = conn.read_stream_2.next() => {
match maybe_recv {
Some(Ok(msg)) => {
conn.write_stream_1.send(msg).await.expect("net error: fatal: kernel died");
}
_ => {
println!("net: passthrough broke\r");
return peer.identity.name.clone()
}
}
},
}
}
}
async fn recv_connection(
our: &Identity,
our_ip: &str,
pki: &OnchainPKI,
keypair: &Ed25519KeyPair,
websocket: WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>,
) -> Result<(String, OpenConnection)> {
) -> Result<(Identity, Connection)> {
println!("recv_connection\r");
let mut buf = vec![0u8; 65535];
let (mut noise, our_static_key) = build_responder();
let (mut write_stream, mut read_stream) = websocket.split();
// <- e
noise.read_message(&ws_recv(&mut read_stream).await?, &mut buf)?;
let len = noise.read_message(&ws_recv(&mut read_stream).await?, &mut buf)?;
// if the first message contains a "routing request",
// we evaluate whether we want to perform routing for them
if len != 0 {
let (their_id, target_name) = validate_routing_request(&our.name, &buf, pki)?;
// TODO evaluate whether we want to perform routing for them!
// if we do, produce this thing:
return create_passthrough(
our_ip,
their_id,
target_name,
pki,
write_stream,
read_stream,
)
.await;
}
// -> e, ee, s, es
send_uqbar_handshake(
@ -325,13 +475,15 @@ async fn recv_connection(
let their_handshake = recv_uqbar_handshake(&mut noise, &mut buf, &mut read_stream).await?;
// now validate this handshake payload against the QNS PKI
let their_id = pki
.get(&their_handshake.name)
.ok_or(anyhow!("unknown QNS name"))?;
validate_handshake(
&their_handshake,
noise
.get_remote_static()
.ok_or(anyhow!("noise error: missing remote pubkey"))?,
pki.get(&their_handshake.name)
.ok_or(anyhow!("unknown QNS name"))?,
their_id,
)?;
// Transition the state machine into transport mode now that the handshake is complete.
@ -339,13 +491,13 @@ async fn recv_connection(
println!("handshake complete, noise session received\r");
Ok((
their_handshake.name,
OpenConnection {
their_id.clone(),
Connection::Peer(PeerConnection {
noise,
buf,
write_stream,
read_stream,
},
}),
))
}
@ -354,24 +506,56 @@ async fn init_connection(
our_ip: &str,
peer_id: &Identity,
keypair: &Ed25519KeyPair,
) -> Result<(String, OpenConnection)> {
use_router: Option<&Identity>,
) -> Result<(String, PeerConnection)> {
println!("init_connection\r");
let mut buf = vec![0u8; 65535];
let (mut noise, our_static_key) = build_initiator();
let Some((ref ip, ref port)) = peer_id.ws_routing else {
return Err(anyhow!("target has no routing information"));
let (mut write_stream, mut read_stream) = match use_router {
None => {
let Some((ref ip, ref port)) = peer_id.ws_routing else {
return Err(anyhow!("target has no routing information"));
};
let Ok(ws_url) = make_ws_url(our_ip, ip, port) else {
return Err(anyhow!("failed to parse websocket url"));
};
let Ok(Ok((websocket, _response))) = timeout(TIMEOUT, connect_async(ws_url)).await else {
return Err(anyhow!("failed to connect to target"));
};
websocket.split()
}
Some(router_id) => {
let Some((ref ip, ref port)) = router_id.ws_routing else {
return Err(anyhow!("router has no routing information"));
};
let Ok(ws_url) = make_ws_url(our_ip, ip, port) else {
return Err(anyhow!("failed to parse websocket url"));
};
let Ok(Ok((websocket, _response))) = timeout(TIMEOUT, connect_async(ws_url)).await else {
return Err(anyhow!("failed to connect to target"));
};
websocket.split()
}
};
let Ok(ws_url) = make_ws_url(our_ip, ip, port) else {
return Err(anyhow!("failed to parse websocket url"));
};
let Ok(Ok((websocket, _response))) = timeout(TIMEOUT, connect_async(ws_url)).await else {
return Err(anyhow!("failed to connect to target"));
};
let (mut write_stream, mut read_stream) = websocket.split();
// -> e
let len = noise.write_message(&[], &mut buf)?;
let message = match use_router {
None => vec![],
Some(router_id) => {
let routing_request = RoutingRequest {
name: our.name.clone(),
signature: keypair
.sign([&peer_id.name, router_id.name.as_str()].concat().as_bytes())
.as_ref()
.to_vec(),
target: peer_id.name.clone(),
protocol_version: 1,
};
bincode::serialize(&routing_request)?
}
};
let len = noise.write_message(&message, &mut buf)?;
ws_send(&mut write_stream, &buf[..len]).await?;
// <- e, ee, s, es
@ -402,7 +586,7 @@ async fn init_connection(
Ok((
their_handshake.name,
OpenConnection {
PeerConnection {
noise,
buf,
write_stream,
@ -411,6 +595,58 @@ async fn init_connection(
))
}
async fn create_passthrough(
our_ip: &str,
from_id: Identity,
to_name: NodeId,
pki: &OnchainPKI,
write_stream_1: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
read_stream_1: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
) -> Result<(Identity, Connection)> {
let to_id = pki.get(&to_name).ok_or(anyhow!("unknown QNS name"))?;
let Some((ref ip, ref port)) = to_id.ws_routing else {
return Err(anyhow!("target has no routing information"));
};
let Ok(ws_url) = make_ws_url(our_ip, ip, port) else {
return Err(anyhow!("failed to parse websocket url"));
};
let Ok(Ok((websocket, _response))) = timeout(TIMEOUT, connect_async(ws_url)).await else {
return Err(anyhow!("failed to connect to target"));
};
let (write_stream_2, read_stream_2) = websocket.split();
Ok((
from_id,
Connection::Passthrough(PassthroughConnection {
write_stream_1,
read_stream_1,
write_stream_2,
read_stream_2,
}),
))
}
fn validate_routing_request(
our_name: &str,
buf: &[u8],
pki: &OnchainPKI,
) -> Result<(Identity, NodeId)> {
println!("validate_routing_request\r");
let routing_request: RoutingRequest = bincode::deserialize(buf)?;
let their_id = pki
.get(&routing_request.name)
.ok_or(anyhow!("unknown QNS name"))?;
let their_networking_key = signature::UnparsedPublicKey::new(
&signature::ED25519,
hex::decode(&strip_0x(&their_id.networking_key))?,
);
their_networking_key.verify(
&[&routing_request.target, our_name].concat().as_bytes(),
&routing_request.signature,
)?;
Ok((their_id.clone(), routing_request.target))
}
fn validate_handshake(
handshake: &HandshakePayload,
their_static_key: &[u8],
@ -429,7 +665,7 @@ fn validate_handshake(
Ok(())
}
async fn send_uqbar_message(km: &KernelMessage, conn: &mut OpenConnection) -> Result<()> {
async fn send_uqbar_message(km: &KernelMessage, conn: &mut PeerConnection) -> Result<()> {
let serialized = bincode::serialize(km)?;
if serialized.len() > MESSAGE_MAX_SIZE as usize {
return Err(anyhow!("uqbar message too large"));
@ -446,7 +682,7 @@ async fn send_uqbar_message(km: &KernelMessage, conn: &mut OpenConnection) -> Re
Ok(())
}
async fn recv_uqbar_message(conn: &mut OpenConnection) -> Result<KernelMessage> {
async fn recv_uqbar_message(conn: &mut PeerConnection) -> Result<KernelMessage> {
let outer_len = conn
.noise
.read_message(&ws_recv(&mut conn.read_stream).await?, &mut conn.buf)?;