tcp networking implemented

This commit is contained in:
dr-frmr 2024-05-24 01:53:20 -06:00
parent d37888cee8
commit ad39ec8fe1
No known key found for this signature in database
8 changed files with 735 additions and 242 deletions

View File

@ -1,8 +1,5 @@
use crate::net::{
tcp,
types::{IdentityExt, NetData, Peer, TCP_PROTOCOL, WS_PROTOCOL},
utils, ws,
};
use crate::net::types::{IdentityExt, NetData, Peer, TCP_PROTOCOL, WS_PROTOCOL};
use crate::net::{tcp, utils, ws};
use lib::types::core::{Identity, KernelMessage, NodeRouting};
use rand::prelude::SliceRandom;
use tokio::sync::mpsc;

View File

@ -1,8 +1,5 @@
use crate::net::{
connect, tcp,
types::{IdentityExt, NetData, Peer, TCP_PROTOCOL, WS_PROTOCOL},
utils, ws,
};
use crate::net::types::{IdentityExt, NetData, Peer, TCP_PROTOCOL, WS_PROTOCOL};
use crate::net::{connect, tcp, utils, ws};
use lib::types::core::{Identity, NodeRouting};
use tokio::{sync::mpsc, time};

View File

@ -1,27 +1,28 @@
use crate::net::{
types::{IdentityExt, NetData, TCP_PROTOCOL},
types::{IdentityExt, NetData, Peer, PendingStream, RoutingRequest, TCP_PROTOCOL},
utils::{
build_responder, print_debug, print_loud, validate_handshake, validate_routing_request,
validate_signature,
build_initiator, build_responder, create_passthrough, make_conn_url, print_debug,
validate_handshake, validate_routing_request, TIMEOUT,
},
};
use lib::types::core::{Identity, KernelMessage, NodeId, NodeRouting};
use lib::types::core::{Identity, KernelMessage};
use {
dashmap::DashMap,
futures::{SinkExt, StreamExt},
rand::seq::SliceRandom,
ring::signature::Ed25519KeyPair,
std::{collections::HashMap, sync::Arc},
anyhow::anyhow,
tokio::io::AsyncWriteExt,
tokio::net::{TcpListener, TcpStream},
tokio::{sync::mpsc, time},
};
mod utils;
pub mod utils;
/// only used in connection initialization
pub const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
pub struct PeerConnection {
pub noise: snow::TransportState,
pub buf: Vec<u8>,
pub stream: TcpStream,
}
pub async fn receiver(ext: IdentityExt, data: NetData) -> anyhow::Result<()> {
println!("tcp_receiver\r");
let tcp_port = ext.our.get_protocol_port(TCP_PROTOCOL).unwrap();
let tcp = match TcpListener::bind(format!("0.0.0.0:{tcp_port}")).await {
Ok(tcp) => tcp,
@ -83,7 +84,27 @@ pub async fn init_direct(
proxy_request: bool,
peer_rx: mpsc::UnboundedReceiver<KernelMessage>,
) -> Result<(), mpsc::UnboundedReceiver<KernelMessage>> {
todo!()
println!("tcp_init_direct\r");
match time::timeout(
TIMEOUT,
connect_with_handshake(ext, peer_id, port, None, proxy_request),
)
.await
{
Ok(Ok(connection)) => {
// maintain direct connection
tokio::spawn(utils::maintain_connection(
peer_id.name.clone(),
data.peers.clone(),
connection,
peer_rx,
ext.kernel_message_tx.clone(),
ext.print_tx.clone(),
));
Ok(())
}
_ => return Err(peer_rx),
}
}
pub async fn init_routed(
@ -91,10 +112,201 @@ pub async fn init_routed(
data: &NetData,
peer_id: &Identity,
router_id: &Identity,
port: u16,
router_port: u16,
peer_rx: mpsc::UnboundedReceiver<KernelMessage>,
) -> Result<(), mpsc::UnboundedReceiver<KernelMessage>> {
todo!()
println!("tcp_init_routed\r");
match time::timeout(
TIMEOUT,
connect_with_handshake(ext, peer_id, router_port, Some(router_id), false),
)
.await
{
Ok(Ok(connection)) => {
// maintain direct connection
tokio::spawn(utils::maintain_connection(
peer_id.name.clone(),
data.peers.clone(),
connection,
peer_rx,
ext.kernel_message_tx.clone(),
ext.print_tx.clone(),
));
Ok(())
}
Ok(Err(e)) => {
print_debug(&ext.print_tx, &format!("net: error getting routed: {e}")).await;
Err(peer_rx)
}
Err(_) => {
print_debug(&ext.print_tx, "net: timed out while getting routed").await;
Err(peer_rx)
}
}
}
async fn recv_connection(
ext: IdentityExt,
data: NetData,
mut stream: TcpStream,
) -> anyhow::Result<()> {
println!("tcp_recv_connection\r");
// before we begin XX handshake pattern, check first message over socket
let first_message = &utils::recv(&mut stream).await?;
// if the first message contains a "routing request",
// we see if the target is someone we are actively routing for,
// and create a Passthrough connection if so.
// a Noise 'e' message with have len 32
if first_message.len() != 32 {
let (from_id, target_id) =
validate_routing_request(&ext.our.name, first_message, &data.pki)?;
return create_passthrough(
&ext.our,
&ext.our_ip,
from_id,
target_id,
&data.peers,
&data.pending_passthroughs,
PendingStream::Tcp(stream),
)
.await;
}
let (mut noise, our_static_key) = build_responder();
let mut buf = vec![0u8; 65535];
// <- e
noise.read_message(first_message, &mut buf)?;
// -> e, ee, s, es
utils::send_protocol_handshake(
&ext,
&our_static_key,
&mut noise,
&mut buf,
&mut stream,
false,
)
.await?;
// <- s, se
let their_handshake = utils::recv_protocol_handshake(&mut noise, &mut buf, &mut stream).await?;
// now validate this handshake payload against the KNS PKI
let their_id = data
.pki
.get(&their_handshake.name)
.ok_or(anyhow!("unknown KNS name"))?;
validate_handshake(
&their_handshake,
noise
.get_remote_static()
.ok_or(anyhow!("noise error: missing remote pubkey"))?,
&their_id,
)?;
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
data.peers.insert(
their_id.name.clone(),
Peer {
identity: their_id.clone(),
routing_for: their_handshake.proxy_request,
sender: peer_tx,
},
);
tokio::spawn(utils::maintain_connection(
their_handshake.name,
data.peers,
PeerConnection {
noise: noise.into_transport_mode()?,
buf,
stream,
},
peer_rx,
ext.kernel_message_tx,
ext.print_tx,
));
Ok(())
}
async fn connect_with_handshake(
ext: &IdentityExt,
peer_id: &Identity,
port: u16,
use_router: Option<&Identity>,
proxy_request: bool,
) -> anyhow::Result<PeerConnection> {
println!("tcp_connect_with_handshake\r");
let mut buf = vec![0u8; 65535];
let (mut noise, our_static_key) = build_initiator();
let ip = match use_router {
None => peer_id
.get_ip()
.ok_or(anyhow!("target has no IP address"))?,
Some(router_id) => router_id
.get_ip()
.ok_or(anyhow!("router has no IP address"))?,
};
let tcp_url = make_conn_url(&ext.our_ip, ip, &port, TCP_PROTOCOL)?;
let Ok(mut stream) = tokio::net::TcpStream::connect(tcp_url.to_string()).await else {
return Err(anyhow!("failed to connect to {tcp_url}"));
};
// if this is a routed request, before starting XX handshake pattern, send a
// routing request message over socket
if use_router.is_some() {
stream
.write_all(&rmp_serde::to_vec(&RoutingRequest {
protocol_version: 1,
source: ext.our.name.clone(),
signature: ext
.keypair
.sign(
[&peer_id.name, use_router.unwrap().name.as_str()]
.concat()
.as_bytes(),
)
.as_ref()
.to_vec(),
target: peer_id.name.clone(),
})?)
.await?;
}
// -> e
let len = noise.write_message(&[], &mut buf)?;
stream.write_all(&buf[..len]).await?;
// <- e, ee, s, es
let their_handshake = utils::recv_protocol_handshake(&mut noise, &mut buf, &mut stream).await?;
// now validate this handshake payload against the KNS PKI
validate_handshake(
&their_handshake,
noise
.get_remote_static()
.ok_or(anyhow!("noise error: missing remote pubkey"))?,
peer_id,
)?;
// -> s, se
utils::send_protocol_handshake(
&ext,
&our_static_key,
&mut noise,
&mut buf,
&mut stream,
proxy_request,
)
.await?;
Ok(PeerConnection {
noise: noise.into_transport_mode()?,
buf,
stream,
})
}
pub async fn recv_via_router(
@ -103,13 +315,100 @@ pub async fn recv_via_router(
peer_id: Identity,
router_id: Identity,
) {
todo!()
println!("tcp_recv_via_router\r");
let Some((ip, port)) = router_id.ws_routing() else {
return;
};
let Ok(tcp_url) = make_conn_url(&ext.our_ip, ip, port, TCP_PROTOCOL) else {
return;
};
let Ok(stream) = tokio::net::TcpStream::connect(tcp_url.to_string()).await else {
return;
};
match connect_with_handshake_via_router(&ext, &peer_id, &router_id, stream).await {
Ok(connection) => {
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
data.peers.insert(
peer_id.name.clone(),
Peer {
identity: peer_id.clone(),
routing_for: false,
sender: peer_tx,
},
);
// maintain direct connection
tokio::spawn(utils::maintain_connection(
peer_id.name,
data.peers.clone(),
connection,
peer_rx,
ext.kernel_message_tx,
ext.print_tx,
));
}
Err(e) => {
print_debug(&ext.print_tx, &format!("net: error getting routed: {e}")).await;
}
}
}
async fn recv_connection(
ext: IdentityExt,
data: NetData,
async fn connect_with_handshake_via_router(
ext: &IdentityExt,
peer_id: &Identity,
router_id: &Identity,
mut stream: TcpStream,
) -> anyhow::Result<()> {
todo!()
) -> anyhow::Result<PeerConnection> {
println!("tcp_connect_with_handshake_via_router\r");
// before beginning XX handshake pattern, send a routing request
stream
.write_all(&rmp_serde::to_vec(&RoutingRequest {
protocol_version: 1,
source: ext.our.name.clone(),
signature: ext
.keypair
.sign(
[peer_id.name.as_str(), router_id.name.as_str()]
.concat()
.as_bytes(),
)
.as_ref()
.to_vec(),
target: peer_id.name.to_string(),
})?)
.await?;
let mut buf = vec![0u8; 65535];
let (mut noise, our_static_key) = build_responder();
// <- e
noise.read_message(&utils::recv(&mut stream).await?, &mut buf)?;
// -> e, ee, s, es
utils::send_protocol_handshake(
ext,
&our_static_key,
&mut noise,
&mut buf,
&mut stream,
false,
)
.await?;
// <- s, se
let their_handshake = utils::recv_protocol_handshake(&mut noise, &mut buf, &mut stream).await?;
// now validate this handshake payload against the KNS PKI
validate_handshake(
&their_handshake,
noise
.get_remote_static()
.ok_or(anyhow!("noise error: missing remote pubkey"))?,
&peer_id,
)?;
Ok(PeerConnection {
noise: noise.into_transport_mode()?,
buf,
stream,
})
}

View File

@ -1,19 +1,174 @@
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use crate::net::{
tcp::PeerConnection,
types::{HandshakePayload, IdentityExt, Peers},
utils::print_loud,
};
use lib::types::core::{KernelMessage, MessageSender, NodeId, PrintSender};
use {
tokio::io::{AsyncReadExt, AsyncWriteExt},
tokio::net::TcpStream,
tokio::sync::mpsc::UnboundedReceiver,
};
pub async fn recv(stream: &mut TcpStream) -> anyhow::Result<Vec<u8>> {
let mut buf = vec![0; 128];
let mut data = Vec::new();
/// should always be spawned on its own task
pub async fn maintain_connection(
peer_name: NodeId,
peers: Peers,
mut conn: PeerConnection,
mut peer_rx: UnboundedReceiver<KernelMessage>,
kernel_message_tx: MessageSender,
print_tx: PrintSender,
) {
println!("tcp_maintain_connection\r");
let mut last_message = std::time::Instant::now();
loop {
match stream.read(&mut buf).await {
Ok(0) => return Ok(data),
Ok(n) => {
data.extend_from_slice(&buf[..n]);
if n < buf.len() {
return Ok(data);
tokio::select! {
recv_result = recv_protocol_message(&mut conn) => {
match recv_result {
Ok(km) => {
if km.source.node != peer_name {
print_loud(
&print_tx,
&format!(
"net: got message with spoofed source from {peer_name}!"
),
).await;
break
} else {
kernel_message_tx.send(km).await.expect("net error: fatal: kernel receiver died");
continue
}
}
Err(_) => break
}
},
maybe_recv = peer_rx.recv() => {
match maybe_recv {
Some(km) => {
match send_protocol_message(&km, &mut conn).await {
Ok(()) => {
last_message = std::time::Instant::now();
continue
}
Err(_e) => break,
}
}
None => break
}
},
// if a message has not been sent or received in 2 hours, close the connection
_ = tokio::time::sleep(std::time::Duration::from_secs(7200)) => {
if last_message.elapsed().as_secs() > 7200 {
break
}
}
Err(e) => return Err(anyhow::anyhow!("net: error reading from stream: {}", e)),
}
}
let _ = conn.stream.shutdown().await;
peers.remove(&peer_name);
}
pub async fn send_protocol_message(
km: &KernelMessage,
conn: &mut PeerConnection,
) -> anyhow::Result<()> {
let serialized = rmp_serde::to_vec(km)?;
let len = (serialized.len() as u32).to_be_bytes();
let with_length_prefix = [len.to_vec(), serialized].concat();
// 65519 = 65535 - 16 (TAGLEN)
for payload in with_length_prefix.chunks(65519) {
let len = conn.noise.write_message(payload, &mut conn.buf)?;
conn.stream.write(&conn.buf[..len]).await?;
}
conn.stream.flush().await?;
Ok(())
}
/// any error in receiving a message will result in the connection being closed.
pub async fn recv_protocol_message(conn: &mut PeerConnection) -> anyhow::Result<KernelMessage> {
let mut len = [0u8; 4];
conn.stream.read_exact(&mut len).await?;
let outer_len = conn.noise.read_message(&len, &mut conn.buf)?;
if outer_len < 4 {
return Err(anyhow::anyhow!("protocol message too small!"));
}
let length_bytes = [conn.buf[0], conn.buf[1], conn.buf[2], conn.buf[3]];
let msg_len = u32::from_be_bytes(length_bytes);
let mut msg = Vec::with_capacity(msg_len as usize);
while msg.len() < msg_len as usize {
let ptr = msg.len();
conn.stream.read(&mut conn.buf).await?;
conn.noise.read_message(&conn.buf, &mut msg[ptr..])?;
}
Ok(rmp_serde::from_slice(&msg)?)
}
pub async fn send_protocol_handshake(
ext: &IdentityExt,
noise_static_key: &[u8],
noise: &mut snow::HandshakeState,
buf: &mut [u8],
stream: &mut TcpStream,
proxy_request: bool,
) -> anyhow::Result<()> {
let our_hs = rmp_serde::to_vec(&HandshakePayload {
protocol_version: 1,
name: ext.our.name.clone(),
signature: ext.keypair.sign(noise_static_key).as_ref().to_vec(),
proxy_request,
})
.expect("failed to serialize handshake payload");
let len = (our_hs.len() as u32).to_be_bytes();
let with_length_prefix = [len.to_vec(), our_hs].concat();
// 65519 = 65535 - 16 (TAGLEN)
for payload in with_length_prefix.chunks(65519) {
let len = noise.write_message(payload, buf)?;
stream.write(&buf[..len]).await?;
}
stream.flush().await?;
Ok(())
}
pub async fn recv_protocol_handshake(
noise: &mut snow::HandshakeState,
buf: &mut [u8],
stream: &mut TcpStream,
) -> anyhow::Result<HandshakePayload> {
let mut len = [0u8; 4];
stream.read_exact(&mut len).await?;
let outer_len = noise.read_message(&len, buf)?;
if outer_len < 4 {
return Err(anyhow::anyhow!("protocol message too small!"));
}
let length_bytes = [buf[0], buf[1], buf[2], buf[3]];
let msg_len = u32::from_be_bytes(length_bytes);
let mut msg = Vec::with_capacity(msg_len as usize);
while msg.len() < msg_len as usize {
let ptr = msg.len();
stream.read(buf).await?;
noise.read_message(&buf, &mut msg[ptr..])?;
}
Ok(rmp_serde::from_slice(&msg)?)
}
pub async fn recv(stream: &mut TcpStream) -> anyhow::Result<Vec<u8>> {
let mut len = [0u8; 4];
stream.read_exact(&mut len).await?;
let msg_len = u32::from_be_bytes(len);
let mut msg = Vec::with_capacity(msg_len as usize);
stream.read_exact(&mut msg).await?;
Ok(msg)
}

View File

@ -1,8 +1,8 @@
use lib::types::core::{
Identity, KernelMessage, MessageSender, NetworkErrorSender, NodeId, PrintSender,
};
use {
dashmap::DashMap,
lib::types::core::{
Identity, KernelMessage, MessageSender, NetworkErrorSender, NodeId, PrintSender,
},
ring::signature::Ed25519KeyPair,
serde::{Deserialize, Serialize},
std::sync::Arc,

View File

@ -1,16 +1,20 @@
use crate::net::types::{HandshakePayload, OnchainPKI, PKINames, PendingStream, RoutingRequest};
use crate::net::ws::WebSocket;
use futures::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
use crate::net::types::{
HandshakePayload, OnchainPKI, PKINames, Peers, PendingPassthroughs, PendingStream,
RoutingRequest, TCP_PROTOCOL, WS_PROTOCOL,
};
use lib::types::core::{
Address, Identity, KernelMessage, KnsUpdate, Message, MessageSender, NetworkErrorSender,
NodeRouting, PrintSender, Printout, ProcessId, Response, SendError, SendErrorKind,
WrappedSendError,
Address, Identity, KernelMessage, KnsUpdate, Message, MessageSender, NetAction,
NetworkErrorSender, NodeRouting, PrintSender, Printout, ProcessId, Request, Response,
SendError, SendErrorKind, WrappedSendError,
};
use {
futures::{SinkExt, StreamExt},
ring::signature::{self},
snow::params::NoiseParams,
tokio::io::AsyncWriteExt,
tokio::time,
tokio_tungstenite::connect_async,
};
use ring::signature::{self};
use snow::params::NoiseParams;
lazy_static::lazy_static! {
pub static ref PARAMS: NoiseParams = "Noise_XX_25519_ChaChaPoly_BLAKE2s"
@ -18,6 +22,87 @@ lazy_static::lazy_static! {
.expect("net: couldn't build noise params?");
}
pub const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
pub async fn create_passthrough(
our: &Identity,
our_ip: &str,
from_id: Identity,
target_id: Identity,
peers: &Peers,
pending_passthroughs: &PendingPassthroughs,
socket_1: PendingStream,
) -> anyhow::Result<()> {
println!("create_passthrough\r");
// if the target has already generated a pending passthrough for this source,
// immediately match them
if let Some(((_target, _from), pending_stream)) =
pending_passthroughs.remove(&(target_id.name.clone(), from_id.name.clone()))
{
tokio::spawn(maintain_passthrough(socket_1, pending_stream));
return Ok(());
}
if let Some((ip, tcp_port)) = target_id.tcp_routing() {
// create passthrough to direct node
let tcp_url = make_conn_url(our_ip, ip, tcp_port, TCP_PROTOCOL)?;
let Ok(Ok(stream_2)) =
time::timeout(TIMEOUT, tokio::net::TcpStream::connect(tcp_url.to_string())).await
else {
return Err(anyhow::anyhow!("failed to connect to target"));
};
tokio::spawn(maintain_passthrough(socket_1, PendingStream::Tcp(stream_2)));
return Ok(());
}
if let Some((ip, ws_port)) = target_id.ws_routing() {
// create passthrough to direct node
let ws_url = make_conn_url(our_ip, ip, ws_port, WS_PROTOCOL)?;
let Ok(Ok((socket_2, _response))) = time::timeout(TIMEOUT, connect_async(ws_url)).await
else {
return Err(anyhow::anyhow!("failed to connect to target"));
};
tokio::spawn(maintain_passthrough(
socket_1,
PendingStream::WebSocket(socket_2),
));
return Ok(());
}
// create passthrough to indirect node that we do routing for
let target_peer = peers
.get(&target_id.name)
.ok_or(anyhow::anyhow!("can't route to that indirect node"))?;
if !target_peer.routing_for {
return Err(anyhow::anyhow!("we don't route for that indirect node"));
}
// send their net:distro:sys process a message, notifying it to create a *matching*
// passthrough request, which we can pair with this pending one.
target_peer.sender.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.name.clone(),
process: ProcessId::new(Some("net"), "distro", "sys"),
},
target: Address {
node: target_id.name.clone(),
process: ProcessId::new(Some("net"), "distro", "sys"),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: Some(5),
body: rmp_serde::to_vec(&NetAction::ConnectionRequest(from_id.name.clone()))?,
metadata: None,
capabilities: vec![],
}),
lazy_load_blob: None,
})?;
// we'll remove this either if the above message gets a negative response,
// or if the target node connects to us with a matching passthrough.
// TODO it is currently possible to have dangling passthroughs in the map
// if the target is "connected" to us but nonresponsive.
pending_passthroughs.insert((from_id.name, target_id.name), socket_1);
Ok(())
}
/// cross the streams -- spawn on own task
pub async fn maintain_passthrough(socket_1: PendingStream, socket_2: PendingStream) {
println!("maintain_passthrough\r");
@ -34,9 +119,43 @@ pub async fn maintain_passthrough(socket_1: PendingStream, socket_2: PendingStre
_ = copy(&mut r2, &mut w1) => {},
}
}
(PendingStream::WebSocket(mut ws_socket), PendingStream::Tcp(tcp_socket))
| (PendingStream::Tcp(tcp_socket), PendingStream::WebSocket(mut ws_socket)) => {
todo!();
(PendingStream::WebSocket(mut ws_socket), PendingStream::Tcp(mut tcp_socket))
| (PendingStream::Tcp(mut tcp_socket), PendingStream::WebSocket(mut ws_socket)) => {
let mut last_message = std::time::Instant::now();
loop {
tokio::select! {
maybe_recv = ws_socket.next() => {
match maybe_recv {
Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(bin))) => {
let Ok(()) = tcp_socket.write_all(&bin).await else {
break
};
last_message = std::time::Instant::now();
}
_ => break,
}
},
maybe_recv = crate::net::tcp::utils::recv(&mut tcp_socket) => {
match maybe_recv {
Ok(bin) => {
let Ok(()) = ws_socket.send(tokio_tungstenite::tungstenite::Message::Binary(bin)).await else {
break
};
last_message = std::time::Instant::now();
}
_ => break,
}
},
// if a message has not been sent or received in 2-4 hours, close the connection
_ = tokio::time::sleep(std::time::Duration::from_secs(7200)) => {
if last_message.elapsed().as_secs() > 7200 {
break
}
}
}
}
let _ = ws_socket.close(None).await;
let _ = tcp_socket.shutdown().await;
}
(PendingStream::WebSocket(mut socket_1), PendingStream::WebSocket(mut socket_2)) => {
let mut last_message = std::time::Instant::now();
@ -194,8 +313,17 @@ pub fn make_conn_url(
// if we have the same public IP as target, route locally,
// otherwise they will appear offline due to loopback stuff
let ip = if our_ip == ip { "localhost" } else { ip };
let url = url::Url::parse(&format!("{}://{}:{}", protocol, ip, port))?;
Ok(url)
match protocol {
TCP_PROTOCOL => {
let url = url::Url::parse(&format!("{}:{}", ip, port))?;
Ok(url)
}
WS_PROTOCOL => {
let url = url::Url::parse(&format!("{}://{}:{}", protocol, ip, port))?;
Ok(url)
}
_ => Err(anyhow::anyhow!("unknown protocol")),
}
}
pub async fn error_offline(km: KernelMessage, network_error_tx: &NetworkErrorSender) {

View File

@ -1,8 +1,8 @@
use crate::net::{
types::{IdentityExt, NetData, Peer, RoutingRequest, WS_PROTOCOL},
types::{IdentityExt, NetData, Peer, PendingStream, RoutingRequest, WS_PROTOCOL},
utils::{
build_initiator, build_responder, make_conn_url, print_debug, validate_handshake,
validate_routing_request,
build_initiator, build_responder, create_passthrough, make_conn_url, print_debug,
validate_handshake, validate_routing_request, TIMEOUT,
},
};
use lib::types::core::{Identity, KernelMessage};
@ -18,9 +18,6 @@ use {
mod utils;
/// only used in connection initialization, otherwise, nacks and Responses are only used for "timeouts"
pub const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
/// 10 MB -- TODO analyze as desired, apps can always chunk data into many messages
/// note that this only applies to cross-network messages, not local ones.
pub const MESSAGE_MAX_SIZE: u32 = 10_485_800;
@ -33,7 +30,7 @@ pub struct PeerConnection {
pub type WebSocket = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
pub async fn receiver(ext: IdentityExt, net_data: NetData) -> Result<()> {
pub async fn receiver(ext: IdentityExt, data: NetData) -> Result<()> {
println!("receiver\r");
let ws_port = ext.our.get_protocol_port(WS_PROTOCOL).unwrap();
let ws = match TcpListener::bind(format!("0.0.0.0:{ws_port}")).await {
@ -58,7 +55,7 @@ pub async fn receiver(ext: IdentityExt, net_data: NetData) -> Result<()> {
}
Ok((stream, socket_addr)) => {
let ext = ext.clone();
let net_data = net_data.clone();
let data = data.clone();
tokio::spawn(async move {
let Ok(Ok(websocket)) =
time::timeout(TIMEOUT, accept_async(MaybeTlsStream::Plain(stream))).await
@ -70,7 +67,7 @@ pub async fn receiver(ext: IdentityExt, net_data: NetData) -> Result<()> {
&format!("net: got WS connection from {socket_addr}"),
)
.await;
match time::timeout(TIMEOUT, recv_connection(ext.clone(), net_data, websocket))
match time::timeout(TIMEOUT, recv_connection(ext.clone(), data, websocket))
.await
{
Ok(Ok(())) => return,
@ -209,6 +206,91 @@ pub async fn recv_via_router(
}
}
async fn recv_connection(
ext: IdentityExt,
data: NetData,
mut socket: WebSocket,
) -> anyhow::Result<()> {
println!("recv_connection\r");
// before we begin XX handshake pattern, check first message over socket
let first_message = &utils::recv(&mut socket).await?;
// if the first message contains a "routing request",
// we see if the target is someone we are actively routing for,
// and create a Passthrough connection if so.
// a Noise 'e' message with have len 32
if first_message.len() != 32 {
let (from_id, target_id) =
validate_routing_request(&ext.our.name, first_message, &data.pki)?;
return create_passthrough(
&ext.our,
&ext.our_ip,
from_id,
target_id,
&data.peers,
&data.pending_passthroughs,
PendingStream::WebSocket(socket),
)
.await;
}
let (mut noise, our_static_key) = build_responder();
let mut buf = vec![0u8; 65535];
// <- e
noise.read_message(first_message, &mut buf)?;
// -> e, ee, s, es
utils::send_protocol_handshake(
&ext,
&our_static_key,
&mut noise,
&mut buf,
&mut socket,
false,
)
.await?;
// <- s, se
let their_handshake = utils::recv_protocol_handshake(&mut noise, &mut buf, &mut socket).await?;
// now validate this handshake payload against the KNS PKI
let their_id = data
.pki
.get(&their_handshake.name)
.ok_or(anyhow!("unknown KNS name"))?;
validate_handshake(
&their_handshake,
noise
.get_remote_static()
.ok_or(anyhow!("noise error: missing remote pubkey"))?,
&their_id,
)?;
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
data.peers.insert(
their_id.name.clone(),
Peer {
identity: their_id.clone(),
routing_for: their_handshake.proxy_request,
sender: peer_tx,
},
);
tokio::spawn(utils::maintain_connection(
their_handshake.name,
data.peers,
PeerConnection {
noise: noise.into_transport_mode()?,
buf,
socket,
},
peer_rx,
ext.kernel_message_tx,
ext.print_tx,
));
Ok(())
}
async fn connect_with_handshake(
ext: &IdentityExt,
peer_id: &Identity,
@ -292,91 +374,6 @@ async fn connect_with_handshake(
})
}
async fn recv_connection(
ext: IdentityExt,
data: NetData,
mut socket: WebSocket,
) -> anyhow::Result<()> {
println!("recv_connection\r");
// before we begin XX handshake pattern, check first message over socket
let first_message = &utils::recv(&mut socket).await?;
// if the first message contains a "routing request",
// we see if the target is someone we are actively routing for,
// and create a Passthrough connection if so.
// a Noise 'e' message with have len 32
if first_message.len() != 32 {
let (from_id, target_id) =
validate_routing_request(&ext.our.name, first_message, &data.pki)?;
return utils::create_passthrough(
&ext.our,
&ext.our_ip,
from_id,
target_id,
&data.peers,
&data.pending_passthroughs,
socket,
)
.await;
}
let (mut noise, our_static_key) = build_responder();
let mut buf = vec![0u8; 65535];
// <- e
noise.read_message(first_message, &mut buf)?;
// -> e, ee, s, es
utils::send_protocol_handshake(
&ext,
&our_static_key,
&mut noise,
&mut buf,
&mut socket,
false,
)
.await?;
// <- s, se
let their_handshake = utils::recv_protocol_handshake(&mut noise, &mut buf, &mut socket).await?;
// now validate this handshake payload against the KNS PKI
let their_id = data
.pki
.get(&their_handshake.name)
.ok_or(anyhow!("unknown KNS name"))?;
validate_handshake(
&their_handshake,
noise
.get_remote_static()
.ok_or(anyhow!("noise error: missing remote pubkey"))?,
&their_id,
)?;
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
data.peers.insert(
their_id.name.clone(),
Peer {
identity: their_id.clone(),
routing_for: their_handshake.proxy_request,
sender: peer_tx,
},
);
tokio::spawn(utils::maintain_connection(
their_handshake.name,
data.peers,
PeerConnection {
noise: noise.into_transport_mode()?,
buf,
socket,
},
peer_rx,
ext.kernel_message_tx,
ext.print_tx,
));
Ok(())
}
async fn connect_with_handshake_via_router(
ext: &IdentityExt,
peer_id: &Identity,

View File

@ -1,19 +1,13 @@
use crate::net::{
types::{
HandshakePayload, IdentityExt, Peers, PendingPassthroughs, PendingStream, WS_PROTOCOL,
},
utils::{maintain_passthrough, make_conn_url, print_debug, print_loud},
ws::{PeerConnection, WebSocket, MESSAGE_MAX_SIZE, TIMEOUT},
};
use lib::core::{
Address, Identity, KernelMessage, Message, MessageSender, NetAction, NodeId, PrintSender,
ProcessId, Request,
types::{HandshakePayload, IdentityExt, Peers},
utils::{print_debug, print_loud},
ws::{PeerConnection, WebSocket, MESSAGE_MAX_SIZE},
};
use lib::core::{KernelMessage, MessageSender, NodeId, PrintSender};
use {
futures::{SinkExt, StreamExt},
tokio::sync::mpsc::UnboundedReceiver,
tokio::time,
tokio_tungstenite::{connect_async, tungstenite},
tokio_tungstenite::tungstenite,
};
/// should always be spawned on its own task
@ -101,81 +95,7 @@ pub async fn maintain_connection(
peers.remove(&peer_name);
}
pub async fn create_passthrough(
our: &Identity,
our_ip: &str,
from_id: Identity,
target_id: Identity,
peers: &Peers,
pending_passthroughs: &PendingPassthroughs,
socket_1: WebSocket,
) -> anyhow::Result<()> {
println!("create_passthrough\r");
// if the target has already generated a pending passthrough for this source,
// immediately match them
if let Some(((_target, _from), pending_stream)) =
pending_passthroughs.remove(&(target_id.name.clone(), from_id.name.clone()))
{
tokio::spawn(maintain_passthrough(
PendingStream::WebSocket(socket_1),
pending_stream,
));
return Ok(());
}
if let Some((ip, ws_port)) = target_id.ws_routing() {
// create passthrough to direct node
let ws_url = make_conn_url(our_ip, ip, ws_port, WS_PROTOCOL)?;
let Ok(Ok((socket_2, _response))) = time::timeout(TIMEOUT, connect_async(ws_url)).await
else {
return Err(anyhow::anyhow!("failed to connect to target"));
};
tokio::spawn(maintain_passthrough(
PendingStream::WebSocket(socket_1),
PendingStream::WebSocket(socket_2),
));
return Ok(());
}
// create passthrough to indirect node that we do routing for
let target_peer = peers
.get(&target_id.name)
.ok_or(anyhow::anyhow!("can't route to that indirect node"))?;
if !target_peer.routing_for {
return Err(anyhow::anyhow!("we don't route for that indirect node"));
}
// send their net:distro:sys process a message, notifying it to create a *matching*
// passthrough request, which we can pair with this pending one.
target_peer.sender.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.name.clone(),
process: ProcessId::new(Some("net"), "distro", "sys"),
},
target: Address {
node: target_id.name.clone(),
process: ProcessId::new(Some("net"), "distro", "sys"),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: Some(5),
body: rmp_serde::to_vec(&NetAction::ConnectionRequest(from_id.name.clone()))?,
metadata: None,
capabilities: vec![],
}),
lazy_load_blob: None,
})?;
// we'll remove this either if the above message gets a negative response,
// or if the target node connects to us with a matching passthrough.
// TODO it is currently possible to have dangling passthroughs in the map
// if the target is "connected" to us but nonresponsive.
pending_passthroughs.insert(
(from_id.name, target_id.name),
PendingStream::WebSocket(socket_1),
);
Ok(())
}
pub async fn send_protocol_message(
async fn send_protocol_message(
km: &KernelMessage,
conn: &mut PeerConnection,
) -> anyhow::Result<()> {
@ -199,7 +119,7 @@ pub async fn send_protocol_message(
}
/// any error in receiving a message will result in the connection being closed.
pub async fn recv_protocol_message(conn: &mut PeerConnection) -> anyhow::Result<KernelMessage> {
async fn recv_protocol_message(conn: &mut PeerConnection) -> anyhow::Result<KernelMessage> {
let outer_len = conn
.noise
.read_message(&recv(&mut conn.socket).await?, &mut conn.buf)?;