WIP: ws protocol "finished", tpc stubbed out, generic aspects finished

This commit is contained in:
dr-frmr 2024-05-23 00:47:27 -06:00
parent 8ec3c0a492
commit adc75b9923
No known key found for this signature in database
10 changed files with 442 additions and 462 deletions

View File

@ -276,7 +276,7 @@ async fn main() {
kernel_message_receiver,
network_error_receiver,
kernel_debug_message_receiver,
net_message_sender.clone(),
net_message_sender,
home_directory_path.clone(),
runtime_extensions,
// from saved eth provider config, filter for node identities which will be
@ -301,7 +301,6 @@ async fn main() {
kernel_message_sender.clone(),
network_error_sender,
print_sender.clone(),
net_message_sender,
net_message_receiver,
*matches.get_one::<bool>("reveal-ip").unwrap_or(&true),
));

View File

@ -3,13 +3,14 @@ use crate::net::{
types::{IdentityExt, NetData, Peer, TCP_PROTOCOL, WS_PROTOCOL},
utils, ws,
};
use lib::types::core::{Identity, KernelMessage, NodeRouting};
use lib::types::core::{Identity, KernelMessage, NodeId, NodeRouting};
use rand::prelude::SliceRandom;
use tokio::sync::mpsc;
/// if target is a peer, queue to be routed
/// otherwise, create peer and initiate routing
pub async fn send_to_peer(ext: &IdentityExt, data: &NetData, km: KernelMessage) {
println!("send_to_peer\r");
if let Some(peer) = data.peers.get_mut(&km.target.node) {
peer.sender.send(km).expect("net: peer sender was dropped");
} else {
@ -22,15 +23,15 @@ pub async fn send_to_peer(ext: &IdentityExt, data: &NetData, km: KernelMessage)
routing_for: false,
sender: peer_tx.clone(),
};
data.peers.insert(km.target.node.clone(), new_peer.clone());
// send message to be routed
peer_tx.send(km).unwrap();
data.peers.insert(km.target.node.clone(), new_peer);
tokio::spawn(connect_to_peer(
ext.clone(),
data.clone(),
new_peer,
km.target.node.clone(),
peer_rx,
));
// send message to be routed
peer_tx.send(km).unwrap();
}
}
@ -40,12 +41,14 @@ pub async fn send_to_peer(ext: &IdentityExt, data: &NetData, km: KernelMessage)
///
/// if we fail to connect, remove the peer from the map
/// and return an offline error for each message in the receiver
pub async fn connect_to_peer(
async fn connect_to_peer(
ext: IdentityExt,
data: NetData,
peer: Peer,
peer_name: NodeId,
peer_rx: mpsc::UnboundedReceiver<KernelMessage>,
) {
println!("connect_to_peer\r");
let peer = data.peers.get_mut(&peer_name).unwrap();
if peer.identity.is_direct() {
utils::print_debug(
&ext.print_tx,
@ -59,7 +62,7 @@ pub async fn connect_to_peer(
match tcp::init_direct(&ext, &data, &peer.identity, port, false, peer_rx).await {
Ok(()) => return,
Err(peer_rx) => {
return handle_failed_connection(ext, data, &peer.identity, peer_rx).await;
return handle_failed_connection(&ext, &data, &peer.identity, peer_rx).await;
}
}
}
@ -67,22 +70,23 @@ pub async fn connect_to_peer(
match ws::init_direct(&ext, &data, &peer.identity, port, false, peer_rx).await {
Ok(()) => return,
Err(peer_rx) => {
return handle_failed_connection(ext, data, &peer.identity, peer_rx).await;
return handle_failed_connection(&ext, &data, &peer.identity, peer_rx).await;
}
}
}
} else {
connect_via_router(ext, data, peer, peer_rx).await;
connect_via_router(&ext, &data, &peer, peer_rx).await;
}
}
/// loop through the peer's routers, attempting to connect
async fn connect_via_router(
ext: IdentityExt,
data: NetData,
peer: Peer,
ext: &IdentityExt,
data: &NetData,
peer: &Peer,
mut peer_rx: mpsc::UnboundedReceiver<KernelMessage>,
) {
println!("connect_via_router\r");
let routers_shuffled = {
let mut routers = match peer.identity.routing {
NodeRouting::Routers(ref routers) => routers.clone(),
@ -101,7 +105,7 @@ async fn connect_via_router(
Some(id) => id.clone(),
};
if let Some(port) = router_id.get_protocol_port(TCP_PROTOCOL) {
match tcp::init_routed(&ext, &data, &peer.identity, &router_id, port, peer_rx).await {
match tcp::init_routed(ext, data, &peer.identity, &router_id, port, peer_rx).await {
Ok(()) => return,
Err(e) => {
peer_rx = e;
@ -110,7 +114,7 @@ async fn connect_via_router(
}
}
if let Some(port) = router_id.get_protocol_port(WS_PROTOCOL) {
match ws::init_routed(&ext, &data, &peer.identity, &router_id, port, peer_rx).await {
match ws::init_routed(ext, data, &peer.identity, &router_id, port, peer_rx).await {
Ok(()) => return,
Err(e) => {
peer_rx = e;
@ -123,11 +127,12 @@ async fn connect_via_router(
}
pub async fn handle_failed_connection(
ext: IdentityExt,
data: NetData,
ext: &IdentityExt,
data: &NetData,
peer_id: &Identity,
mut peer_rx: mpsc::UnboundedReceiver<KernelMessage>,
) {
println!("handle_failed_connection\r");
utils::print_debug(
&ext.print_tx,
&format!("net: failed to connect to {}", peer_id.name),

View File

@ -7,15 +7,12 @@ use lib::types::core::{Identity, NodeRouting};
use tokio::{sync::mpsc, time};
pub async fn maintain_routers(ext: IdentityExt, data: NetData) -> anyhow::Result<()> {
println!("maintain_routers\r");
let NodeRouting::Routers(ref routers) = ext.our.routing else {
return Err(anyhow::anyhow!("net: no routers to maintain"));
};
loop {
for router_namehash in routers {
let Some(router_name) = data.names.get(router_namehash) else {
// namehash does not map to a known node name
continue;
};
for router_name in routers {
if data.peers.contains_key(router_name.as_str()) {
// already connected to this router
continue;
@ -24,13 +21,14 @@ pub async fn maintain_routers(ext: IdentityExt, data: NetData) -> anyhow::Result
// router does not exist in PKI that we know of
continue;
};
connect_to_router(&router_id, ext.clone(), data.clone()).await;
connect_to_router(&router_id, &ext, &data).await;
}
time::sleep(time::Duration::from_secs(4)).await;
}
}
pub async fn connect_to_router(router_id: &Identity, ext: IdentityExt, data: NetData) {
pub async fn connect_to_router(router_id: &Identity, ext: &IdentityExt, data: &NetData) {
println!("connect_to_router\r");
utils::print_debug(
&ext.print_tx,
&format!("net: attempting to connect to router {}", router_id.name),
@ -46,7 +44,7 @@ pub async fn connect_to_router(router_id: &Identity, ext: IdentityExt, data: Net
},
);
if let Some(port) = router_id.get_protocol_port(TCP_PROTOCOL) {
match tcp::init_direct(&ext, &data, &router_id, port, false, peer_rx).await {
match tcp::init_direct(ext, data, &router_id, port, false, peer_rx).await {
Ok(()) => return,
Err(peer_rx) => {
return connect::handle_failed_connection(ext, data, router_id, peer_rx).await;
@ -54,7 +52,7 @@ pub async fn connect_to_router(router_id: &Identity, ext: IdentityExt, data: Net
}
}
if let Some(port) = router_id.get_protocol_port(WS_PROTOCOL) {
match ws::init_direct(&ext, &data, &router_id, port, false, peer_rx).await {
match ws::init_direct(ext, data, &router_id, port, false, peer_rx).await {
Ok(()) => return,
Err(peer_rx) => {
return connect::handle_failed_connection(ext, data, router_id, peer_rx).await;

View File

@ -2,13 +2,16 @@ use lib::types::core::{
Address, Identity, KernelMessage, MessageReceiver, MessageSender, NetAction, NetResponse,
NetworkErrorSender, NodeRouting, PrintSender, ProcessId,
};
use types::{IdentityExt, NetData, OnchainPKI, PKINames, Peers};
use types::{
IdentityExt, NetData, OnchainPKI, PKINames, Peers, PendingPassthroughs, TCP_PROTOCOL,
WS_PROTOCOL,
};
use {dashmap::DashMap, ring::signature::Ed25519KeyPair, std::sync::Arc, tokio::task::JoinSet};
mod connect;
mod indirect;
mod tcp;
pub mod types;
mod types;
mod utils;
mod ws;
@ -28,10 +31,10 @@ pub async fn networking(
kernel_message_tx: MessageSender,
network_error_tx: NetworkErrorSender,
print_tx: PrintSender,
self_message_tx: MessageSender,
kernel_message_rx: MessageReceiver,
reveal_ip: bool, // only used if indirect
_reveal_ip: bool, // only used if indirect
) -> anyhow::Result<()> {
println!("networking\r\n");
let ext = IdentityExt {
our: Arc::new(our),
our_ip: Arc::new(our_ip),
@ -39,8 +42,7 @@ pub async fn networking(
kernel_message_tx,
network_error_tx,
print_tx,
self_message_tx,
reveal_ip,
_reveal_ip,
};
// start by initializing the structs where we'll store PKI in memory
// and store a mapping of peers we have an active route for
@ -50,8 +52,15 @@ pub async fn networking(
// this allows us to act as a translator for others, and translate
// our own router namehashes if we are indirect.
let names: PKINames = Arc::new(DashMap::new());
// only used by routers
let pending_passthroughs: PendingPassthroughs = Arc::new(DashMap::new());
let net_data = NetData { pki, peers, names };
let net_data = NetData {
pki,
peers,
names,
pending_passthroughs,
};
let mut tasks = JoinSet::<anyhow::Result<()>>::new();
@ -70,10 +79,15 @@ pub async fn networking(
));
}
utils::print_loud(&ext.print_tx, "going online as a direct node").await;
if ports.contains_key("ws") {
if !ports.contains_key(WS_PROTOCOL) && !ports.contains_key(TCP_PROTOCOL) {
return Err(anyhow::anyhow!(
"net: fatal error: need at least one networking protocol"
));
}
if ports.contains_key(WS_PROTOCOL) {
tasks.spawn(ws::receiver(ext.clone(), net_data.clone()));
}
if ports.contains_key("tcp") {
if ports.contains_key(TCP_PROTOCOL) {
tasks.spawn(tcp::receiver(ext.clone(), net_data.clone()));
}
}
@ -102,10 +116,11 @@ async fn local_recv(
mut kernel_message_rx: MessageReceiver,
data: NetData,
) -> anyhow::Result<()> {
println!("local_recv\r\n");
while let Some(km) = kernel_message_rx.recv().await {
if km.target.node == ext.our.name {
// handle messages sent to us
handle_message(&ext, &km, &data).await;
handle_message(&ext, km, &data).await;
} else {
connect::send_to_peer(&ext, &data, km).await;
}
@ -113,11 +128,11 @@ async fn local_recv(
Err(anyhow::anyhow!("net: kernel message channel was dropped"))
}
async fn handle_message(ext: &IdentityExt, km: &KernelMessage, data: &NetData) {
async fn handle_message(ext: &IdentityExt, km: KernelMessage, data: &NetData) {
match &km.message {
lib::core::Message::Request(request) => handle_request(ext, km, &request.body, data).await,
lib::core::Message::Request(request) => handle_request(ext, &km, &request.body, data).await,
lib::core::Message::Response((response, _context)) => {
handle_response(ext, km, &response.body, data).await
handle_response(&km, &response.body, data).await
}
}
}
@ -182,7 +197,10 @@ async fn handle_local_request(
crate::KNS_ADDRESS
));
printout.push_str(&format!("our Identity: {:#?}\r\n", ext.our));
printout.push_str("we have connections with peers:\r\n");
printout.push_str(&format!(
"we have connections with {} peers:\r\n",
data.peers.len()
));
for peer in data.peers.iter() {
printout.push_str(&format!(
" {}, routing_for={}\r\n",
@ -193,6 +211,16 @@ async fn handle_local_request(
"we have {} entries in the PKI\r\n",
data.pki.len()
));
if !data.pending_passthroughs.is_empty() {
printout.push_str(&format!(
"we have {} pending passthroughs:\r\n",
data.pending_passthroughs.len()
));
for p in data.pending_passthroughs.iter() {
printout.push_str(&format!(" {} -> {}\r\n", p.key().0, p.key().1));
}
}
(NetResponse::Diagnostics(printout), None)
}
NetAction::Sign => (
@ -283,14 +311,34 @@ async fn handle_remote_request(
// someone wants to open a passthrough with us through a router.
// if we are an indirect node, and source is one of our routers,
// respond by attempting to init a matching passthrough.
let allowed_routers = match ext.our.routing {
NodeRouting::Routers(ref routers) => routers,
let allowed_routers = match &ext.our.routing {
NodeRouting::Routers(routers) => routers,
_ => return,
};
if allowed_routers.contains(&km.source.node) {
// connect back to that router and open a passthrough via them
todo!();
if !allowed_routers.contains(&km.source.node) {
return;
}
let Some(router_id) = data.pki.get(&km.source.node) else {
return;
};
let Some(peer_id) = data.pki.get(&from) else {
return;
};
// pick a protocol to connect to router with
// spawn a task that has a timeout here to not block the loop
let ext = ext.clone();
let data = data.clone();
let peer_id = peer_id.clone();
let router_id = router_id.clone();
tokio::spawn(async move {
tokio::time::timeout(std::time::Duration::from_secs(5), async {
if router_id.tcp_routing().is_some() {
tcp::recv_via_router(ext, data, peer_id, router_id).await;
} else if router_id.ws_routing().is_some() {
ws::recv_via_router(ext, data, peer_id, router_id).await;
}
})
});
}
_ => {
// if we can't parse this to a NetAction, treat it as a hello and print it,
@ -309,26 +357,16 @@ async fn handle_remote_request(
// Responses are received as a router, when we send ConnectionRequests
// to a node we do routing for.
async fn handle_response(
ext: &IdentityExt,
km: &KernelMessage,
response_body: &[u8],
data: &NetData,
) {
async fn handle_response(km: &KernelMessage, response_body: &[u8], data: &NetData) {
match rmp_serde::from_slice::<lib::core::NetResponse>(response_body) {
Ok(lib::core::NetResponse::Accepted(_)) => {
// TODO anything here?
}
Ok(lib::core::NetResponse::Rejected(to)) => {
// drop from our pending map
// this will drop the socket, causing initiator to see it as failed
todo!();
// pending_passthroughs
// .ok_or(anyhow!("got net response as non-router"))?
// .remove(&(to, km.source.node));
data.pending_passthroughs
.remove(&(to, km.source.node.to_owned()));
}
_ => {
// ignore
// ignore any other response, for now
}
}
}

View File

@ -1,6 +1,9 @@
use crate::net::{
types::{IdentityExt, NetData, TCP_PROTOCOL},
utils as net_utils,
utils::{
build_responder, print_debug, print_loud, validate_handshake, validate_routing_request,
validate_signature,
},
};
use lib::types::core::{Identity, KernelMessage, NodeId, NodeRouting};
use {
@ -10,11 +13,7 @@ use {
ring::signature::Ed25519KeyPair,
std::{collections::HashMap, sync::Arc},
tokio::net::{TcpListener, TcpStream},
tokio::task::JoinSet,
tokio::{sync::mpsc, time},
tokio_tungstenite::{
accept_async, connect_async, tungstenite, MaybeTlsStream, WebSocketStream,
},
};
mod utils;
@ -22,7 +21,7 @@ mod utils;
/// only used in connection initialization
pub const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
pub async fn receiver(ext: IdentityExt, net_data: NetData) -> anyhow::Result<()> {
pub async fn receiver(ext: IdentityExt, data: NetData) -> anyhow::Result<()> {
let tcp_port = ext.our.get_protocol_port(TCP_PROTOCOL).unwrap();
let tcp = match TcpListener::bind(format!("0.0.0.0:{tcp_port}")).await {
Ok(tcp) => tcp,
@ -33,39 +32,37 @@ pub async fn receiver(ext: IdentityExt, net_data: NetData) -> anyhow::Result<()>
}
};
net_utils::print_debug(&ext.print_tx, &format!("net: listening on port {tcp_port}")).await;
print_debug(&ext.print_tx, &format!("net: listening on port {tcp_port}")).await;
loop {
match tcp.accept().await {
Err(e) => {
net_utils::print_debug(
print_debug(
&ext.print_tx,
&format!("net: error accepting TCP connection: {e}"),
)
.await;
}
Ok((stream, socket_addr)) => {
net_utils::print_debug(
print_debug(
&ext.print_tx,
&format!("net: got TCP connection from {socket_addr}"),
)
.await;
let ext = ext.clone();
let net_data = net_data.clone();
let data = data.clone();
tokio::spawn(async move {
match time::timeout(TIMEOUT, recv_connection(ext.clone(), net_data, stream))
.await
{
match time::timeout(TIMEOUT, recv_connection(ext.clone(), data, stream)).await {
Ok(Ok(())) => return,
Ok(Err(e)) => {
net_utils::print_debug(
print_debug(
&ext.print_tx,
&format!("net: error receiving TCP connection: {e}"),
)
.await
}
Err(_e) => {
net_utils::print_debug(
print_debug(
&ext.print_tx,
&format!("net: TCP connection from {socket_addr} timed out"),
)
@ -100,25 +97,19 @@ pub async fn init_routed(
todo!()
}
pub async fn recv_via_router(
ext: IdentityExt,
data: NetData,
peer_id: Identity,
router_id: Identity,
) {
todo!()
}
async fn recv_connection(
ext: IdentityExt,
net_data: NetData,
data: NetData,
mut stream: TcpStream,
) -> anyhow::Result<()> {
// before we begin XX handshake pattern, check first message over socket
let first_message = &utils::recv(&mut stream).await?;
let mut buf = vec![0u8; 65535];
let (mut noise, our_static_key) = net_utils::build_responder();
let (mut read_stream, mut write_stream) = stream.split();
// if the first message contains a "routing request",
// we see if the target is someone we are actively routing for,
// and create a Passthrough connection if so.
// a Noise 'e' message with have len 32
if first_message.len() != 32 {
todo!();
}
Ok(())
todo!()
}

View File

@ -6,7 +6,9 @@ use {
ring::signature::Ed25519KeyPair,
serde::{Deserialize, Serialize},
std::sync::Arc,
tokio::net::TcpStream,
tokio::sync::mpsc::UnboundedSender,
tokio_tungstenite::{MaybeTlsStream, WebSocketStream},
};
pub const WS_PROTOCOL: &str = "ws";
@ -55,6 +57,12 @@ pub struct RoutingRequest {
pub type Peers = Arc<DashMap<String, Peer>>;
pub type PKINames = Arc<DashMap<String, NodeId>>;
pub type OnchainPKI = Arc<DashMap<String, Identity>>;
/// (from, target) -> from's socket
pub type PendingPassthroughs = Arc<DashMap<(NodeId, NodeId), PendingStream>>;
pub enum PendingStream {
WebSocket(WebSocketStream<MaybeTlsStream<TcpStream>>),
Tcp(TcpStream),
}
#[derive(Clone)]
pub struct Peer {
@ -74,8 +82,7 @@ pub struct IdentityExt {
pub kernel_message_tx: MessageSender,
pub network_error_tx: NetworkErrorSender,
pub print_tx: PrintSender,
pub self_message_tx: MessageSender,
pub reveal_ip: bool,
pub _reveal_ip: bool, // TODO use
}
#[derive(Clone)]
@ -83,4 +90,5 @@ pub struct NetData {
pub pki: OnchainPKI,
pub peers: Peers,
pub names: PKINames,
pub pending_passthroughs: PendingPassthroughs,
}

View File

@ -1,4 +1,4 @@
use crate::net::types::{HandshakePayload, OnchainPKI, PKINames, RoutingRequest};
use crate::net::types::{HandshakePayload, OnchainPKI, PKINames, PendingStream, RoutingRequest};
use lib::types::core::{
Address, Identity, KernelMessage, KnsUpdate, Message, MessageSender, NetworkErrorSender,
NodeRouting, PrintSender, Printout, ProcessId, Response, SendError, SendErrorKind,
@ -13,6 +13,47 @@ lazy_static::lazy_static! {
.expect("net: couldn't build noise params?");
}
/// cross the streams -- spawn on own task
pub async fn maintain_passthrough(socket_1: PendingStream, socket_2: PendingStream) {
use tokio::io::copy;
// copy from ws_socket to tcp_socket and vice versa
// do not use bidirectional because if one side closes,
// we want to close the entire passthrough
match (socket_1, socket_2) {
(PendingStream::Tcp(socket_1), PendingStream::Tcp(socket_2)) => {
let (mut r1, mut w1) = tokio::io::split(socket_1);
let (mut r2, mut w2) = tokio::io::split(socket_2);
let c1 = copy(&mut r1, &mut w2);
let c2 = copy(&mut r2, &mut w1);
tokio::select! {
_ = c1 => return,
_ = c2 => return,
}
}
(PendingStream::WebSocket(mut ws_socket), PendingStream::Tcp(tcp_socket))
| (PendingStream::Tcp(tcp_socket), PendingStream::WebSocket(mut ws_socket)) => {
let (mut r1, mut w1) = tokio::io::split(ws_socket.get_mut());
let (mut r2, mut w2) = tokio::io::split(tcp_socket);
let c1 = copy(&mut r1, &mut w2);
let c2 = copy(&mut r2, &mut w1);
tokio::select! {
_ = c1 => return,
_ = c2 => return,
}
}
(PendingStream::WebSocket(mut socket_1), PendingStream::WebSocket(mut socket_2)) => {
let (mut r1, mut w1) = tokio::io::split(socket_1.get_mut());
let (mut r2, mut w2) = tokio::io::split(socket_2.get_mut());
let c1 = copy(&mut r1, &mut w2);
let c2 = copy(&mut r2, &mut w1);
tokio::select! {
_ = c1 => return,
_ = c2 => return,
}
}
}
}
pub fn ingest_log(log: KnsUpdate, pki: &OnchainPKI, names: &PKINames) {
pki.insert(
log.name.clone(),

View File

@ -1,24 +1,15 @@
use crate::net::{
types::{IdentityExt, NetData, OnchainPKI, PKINames, Peer, Peers, RoutingRequest, WS_PROTOCOL},
types::{IdentityExt, NetData, Peer, RoutingRequest, WS_PROTOCOL},
utils::{
build_initiator, build_responder, error_offline, make_conn_url, print_debug,
validate_handshake, validate_routing_request, validate_signature,
build_initiator, build_responder, make_conn_url, print_debug, validate_handshake,
validate_routing_request,
},
};
use lib::types::core::{
Address, Identity, KernelMessage, LazyLoadBlob, Message, MessageReceiver, MessageSender,
NetAction, NetResponse, NetworkErrorSender, NodeId, NodeRouting, PrintSender, Printout,
ProcessId,
};
use lib::types::core::{Identity, KernelMessage};
use {
anyhow::{anyhow, Result},
dashmap::DashMap,
futures::{SinkExt, StreamExt},
rand::seq::SliceRandom,
ring::signature::Ed25519KeyPair,
std::{collections::HashMap, sync::Arc},
futures::SinkExt,
tokio::net::{TcpListener, TcpStream},
tokio::task::JoinSet,
tokio::{sync::mpsc, time},
tokio_tungstenite::{
accept_async, connect_async, tungstenite, MaybeTlsStream, WebSocketStream,
@ -40,12 +31,10 @@ pub struct PeerConnection {
pub socket: WebSocketStream<MaybeTlsStream<TcpStream>>,
}
/// (from, target) -> from's socket
pub type PendingPassthroughs =
Arc<DashMap<(NodeId, NodeId), WebSocketStream<MaybeTlsStream<TcpStream>>>>;
pub type WebSocket = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
pub async fn receiver(ext: IdentityExt, net_data: NetData) -> Result<()> {
let pending_passthroughs: PendingPassthroughs = Arc::new(DashMap::new());
println!("receiver\r");
let ws_port = ext.our.get_protocol_port(WS_PROTOCOL).unwrap();
let ws = match TcpListener::bind(format!("0.0.0.0:{ws_port}")).await {
Ok(ws) => ws,
@ -68,38 +57,34 @@ pub async fn receiver(ext: IdentityExt, net_data: NetData) -> Result<()> {
.await;
}
Ok((stream, socket_addr)) => {
print_debug(
&ext.print_tx,
&format!("net: got WS connection from {socket_addr}"),
)
.await;
let ext = ext.clone();
let net_data = net_data.clone();
let pending_passthroughs = pending_passthroughs.clone();
tokio::spawn(async move {
let Ok(Ok(websocket)) =
time::timeout(TIMEOUT, accept_async(MaybeTlsStream::Plain(stream))).await
else {
return;
};
match time::timeout(
TIMEOUT,
recv_connection(ext.clone(), net_data, pending_passthroughs, websocket),
print_debug(
&ext.print_tx,
&format!("net: got WS connection from {socket_addr}"),
)
.await
.await;
match time::timeout(TIMEOUT, recv_connection(ext.clone(), net_data, websocket))
.await
{
Ok(Ok(())) => return,
Ok(Err(e)) => {
print_debug(
&ext.print_tx,
&format!("net: error receiving TCP connection: {e}"),
&format!("net: error receiving WS connection: {e}"),
)
.await
}
Err(_e) => {
print_debug(
&ext.print_tx,
&format!("net: TCP connection from {socket_addr} timed out"),
&format!("net: WS connection from {socket_addr} timed out"),
)
.await
}
@ -118,7 +103,27 @@ pub async fn init_direct(
proxy_request: bool,
peer_rx: mpsc::UnboundedReceiver<KernelMessage>,
) -> Result<(), mpsc::UnboundedReceiver<KernelMessage>> {
todo!()
println!("init_direct\r");
match time::timeout(
TIMEOUT,
connect_with_handshake(ext, peer_id, port, None, proxy_request),
)
.await
{
Ok(Ok(connection)) => {
// maintain direct connection
tokio::spawn(utils::maintain_connection(
peer_id.name.clone(),
data.peers.clone(),
connection,
peer_rx,
ext.kernel_message_tx.clone(),
ext.print_tx.clone(),
));
Ok(())
}
_ => return Err(peer_rx),
}
}
pub async fn init_routed(
@ -126,171 +131,163 @@ pub async fn init_routed(
data: &NetData,
peer_id: &Identity,
router_id: &Identity,
port: u16,
router_port: u16,
peer_rx: mpsc::UnboundedReceiver<KernelMessage>,
) -> Result<(), mpsc::UnboundedReceiver<KernelMessage>> {
todo!()
}
async fn establish_new_peer_connection(
our: Identity,
our_ip: String,
keypair: Arc<Ed25519KeyPair>,
km: KernelMessage,
pki: OnchainPKI,
names: PKINames,
peers: Peers,
reveal_ip: bool,
kernel_message_tx: MessageSender,
network_error_tx: NetworkErrorSender,
print_tx: PrintSender,
) -> (NodeId, Result<()>) {
if let Some(peer_id) = pki.get(&km.target.node) {
// if the message is for a *direct* peer we don't have a connection with,
// try to establish a connection with them
// here, we can *choose* to use our routers so as not to reveal
// networking information about ourselves to the target.
if peer_id.is_direct() && reveal_ip {
print_debug(
&print_tx,
&format!("net: attempting to connect to {} directly", peer_id.name),
)
.await;
match time::timeout(
TIMEOUT,
init_connection(&our, &our_ip, &peer_id, &keypair, None, false),
)
.await
{
Ok(Ok(direct_conn)) => {
todo!();
// utils::save_new_peer(
// &peer_id,
// false,
// peers,
// direct_conn,
// Some(km),
// &kernel_message_tx,
// &print_tx,
// )
// .await;
(peer_id.name.clone(), Ok(()))
}
_ => {
error_offline(km, &network_error_tx).await;
(
peer_id.name.clone(),
Err(anyhow!("failed to connect to peer")),
)
}
}
println!("init_routed\r");
match time::timeout(
TIMEOUT,
connect_with_handshake(ext, peer_id, router_port, Some(router_id), false),
)
.await
{
Ok(Ok(connection)) => {
// maintain direct connection
tokio::spawn(utils::maintain_connection(
peer_id.name.clone(),
data.peers.clone(),
connection,
peer_rx,
ext.kernel_message_tx.clone(),
ext.print_tx.clone(),
));
Ok(())
}
// if the message is for an *indirect* peer we don't have a connection with,
// or we want to protect our node's physical networking details from non-routers,
// do some routing: in a randomized order, go through their listed routers
// on chain and try to get one of them to build a proxied connection to
// this node for you
else {
print_debug(
&print_tx,
&format!("net: attempting to connect to {} via router", peer_id.name),
)
.await;
let sent = time::timeout(
TIMEOUT,
init_connection_via_router(
&our,
&our_ip,
&keypair,
km.clone(),
&peer_id,
&pki,
&names,
peers,
kernel_message_tx.clone(),
print_tx.clone(),
),
)
.await;
if sent.unwrap_or(false) {
(peer_id.name.clone(), Ok(()))
} else {
// none of the routers worked!
error_offline(km, &network_error_tx).await;
(
peer_id.name.clone(),
Err(anyhow!("failed to connect to peer")),
)
}
}
}
// peer cannot be found in PKI, throw an offline error
else {
let peer_name = km.target.node.clone();
error_offline(km, &network_error_tx).await;
(peer_name, Err(anyhow!("failed to connect to peer")))
_ => return Err(peer_rx),
}
}
async fn init_connection_via_router(
our: &Identity,
our_ip: &str,
keypair: &Ed25519KeyPair,
km: KernelMessage,
peer_id: &Identity,
pki: &OnchainPKI,
names: &PKINames,
peers: Peers,
kernel_message_tx: MessageSender,
print_tx: PrintSender,
) -> bool {
let routers_shuffled = {
let mut routers = match our.routing {
NodeRouting::Routers(ref routers) => routers.clone(),
_ => vec![],
};
routers.shuffle(&mut rand::thread_rng());
routers
/// one of our routers has a pending passthrough for us: connect to them
/// and set up a connection that will be maintained like a normal one
pub async fn recv_via_router(
ext: IdentityExt,
data: NetData,
peer_id: Identity,
router_id: Identity,
) {
println!("recv_via_router\r");
let Some((ip, port)) = router_id.ws_routing() else {
return;
};
for router_namehash in &routers_shuffled {
let Some(router_name) = names.get(router_namehash) else {
continue;
};
let router_id = match pki.get(router_name.as_str()) {
None => continue,
Some(id) => id,
};
match init_connection(our, our_ip, peer_id, keypair, Some(&router_id), false).await {
Ok(direct_conn) => {
todo!();
// utils::save_new_peer(
// peer_id,
// false,
// peers,
// direct_conn,
// Some(km),
// &kernel_message_tx,
// &print_tx,
// )
// .await;
return true;
}
Err(_) => continue,
let Ok(ws_url) = make_conn_url(&ext.our_ip, ip, port, WS_PROTOCOL) else {
return;
};
let Ok((socket, _response)) = connect_async(ws_url).await else {
return;
};
match connect_with_handshake_via_router(&ext, &peer_id, &router_id, socket).await {
Ok(connection) => {
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
data.peers.insert(
peer_id.name.clone(),
Peer {
identity: peer_id.clone(),
routing_for: false,
sender: peer_tx,
},
);
// maintain direct connection
tokio::spawn(utils::maintain_connection(
peer_id.name,
data.peers.clone(),
connection,
peer_rx,
ext.kernel_message_tx,
ext.print_tx,
));
}
_ => return,
}
false
}
async fn connect_with_handshake(
ext: &IdentityExt,
peer_id: &Identity,
port: u16,
use_router: Option<&Identity>,
proxy_request: bool,
) -> anyhow::Result<PeerConnection> {
println!("connect_with_handshake\r");
let mut buf = vec![0u8; 65535];
let (mut noise, our_static_key) = build_initiator();
let ip = match use_router {
None => peer_id
.get_ip()
.ok_or(anyhow!("target has no IP address"))?,
Some(router_id) => router_id
.get_ip()
.ok_or(anyhow!("router has no IP address"))?,
};
let ws_url = make_conn_url(&ext.our_ip, ip, &port, WS_PROTOCOL)?;
let Ok((mut socket, _response)) = connect_async(ws_url).await else {
return Err(anyhow!("failed to connect to target"));
};
// if this is a routed request, before starting XX handshake pattern, send a
// routing request message over socket
if use_router.is_some() {
let req = rmp_serde::to_vec(&RoutingRequest {
protocol_version: 1,
source: ext.our.name.clone(),
signature: ext
.keypair
.sign(
[&peer_id.name, use_router.unwrap().name.as_str()]
.concat()
.as_bytes(),
)
.as_ref()
.to_vec(),
target: peer_id.name.clone(),
})?;
socket.send(tungstenite::Message::binary(req)).await?;
}
// -> e
let len = noise.write_message(&[], &mut buf)?;
socket
.send(tungstenite::Message::binary(&buf[..len]))
.await?;
// <- e, ee, s, es
let their_handshake = utils::recv_protocol_handshake(&mut noise, &mut buf, &mut socket).await?;
// now validate this handshake payload against the KNS PKI
validate_handshake(
&their_handshake,
noise
.get_remote_static()
.ok_or(anyhow!("noise error: missing remote pubkey"))?,
peer_id,
)?;
// -> s, se
utils::send_protocol_handshake(
&ext,
&our_static_key,
&mut noise,
&mut buf,
&mut socket,
proxy_request,
)
.await?;
Ok(PeerConnection {
noise: noise.into_transport_mode()?,
buf,
socket,
})
}
async fn recv_connection(
ext: IdentityExt,
data: NetData,
mut pending_passthroughs: PendingPassthroughs,
mut socket: WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>,
mut socket: WebSocket,
) -> anyhow::Result<()> {
let (mut noise, our_static_key) = build_responder();
println!("recv_connection\r");
// before we begin XX handshake pattern, check first message over socket
let first_message = &utils::ws_recv(&mut socket).await?;
let first_message = &utils::recv(&mut socket).await?;
// if the first message contains a "routing request",
// we see if the target is someone we are actively routing for,
@ -305,12 +302,13 @@ async fn recv_connection(
from_id,
target_id,
&data.peers,
&mut pending_passthroughs,
&data.pending_passthroughs,
socket,
)
.await;
}
let (mut noise, our_static_key) = build_responder();
let mut buf = vec![0u8; 65535];
// <- e
@ -318,8 +316,7 @@ async fn recv_connection(
// -> e, ee, s, es
utils::send_protocol_handshake(
&ext.our,
&ext.keypair,
&ext,
&our_static_key,
&mut noise,
&mut buf,
@ -368,45 +365,39 @@ async fn recv_connection(
Ok(())
}
async fn recv_connection_via_router(
our: &Identity,
our_ip: &str,
their_name: &str,
pki: &OnchainPKI,
keypair: &Ed25519KeyPair,
router: &Identity,
) -> Result<(Identity, PeerConnection)> {
let mut buf = vec![0u8; 65535];
let (mut noise, our_static_key) = build_responder();
let Some((ip, port)) = router.ws_routing() else {
return Err(anyhow!("router has no routing information"));
};
let Ok(ws_url) = make_conn_url(our_ip, ip, port, "ws") else {
return Err(anyhow!("failed to parse websocket url"));
};
let Ok(Ok((mut socket, _response))) = time::timeout(TIMEOUT, connect_async(ws_url)).await
else {
return Err(anyhow!("failed to connect to target"));
};
async fn connect_with_handshake_via_router(
ext: &IdentityExt,
peer_id: &Identity,
router_id: &Identity,
mut socket: WebSocketStream<MaybeTlsStream<TcpStream>>,
) -> anyhow::Result<PeerConnection> {
println!("connect_with_handshake_via_router\r");
// before beginning XX handshake pattern, send a routing request
let req = rmp_serde::to_vec(&RoutingRequest {
protocol_version: 1,
source: our.name.clone(),
signature: keypair
.sign([their_name, router.name.as_str()].concat().as_bytes())
source: ext.our.name.clone(),
signature: ext
.keypair
.sign(
[peer_id.name.as_str(), router_id.name.as_str()]
.concat()
.as_bytes(),
)
.as_ref()
.to_vec(),
target: their_name.to_string(),
target: peer_id.name.to_string(),
})?;
socket.send(tungstenite::Message::binary(req)).await?;
let mut buf = vec![0u8; 65535];
let (mut noise, our_static_key) = build_responder();
// <- e
noise.read_message(&utils::ws_recv(&mut socket).await?, &mut buf)?;
noise.read_message(&utils::recv(&mut socket).await?, &mut buf)?;
// -> e, ee, s, es
utils::send_protocol_handshake(
our,
keypair,
ext,
&our_static_key,
&mut noise,
&mut buf,
@ -419,101 +410,14 @@ async fn recv_connection_via_router(
let their_handshake = utils::recv_protocol_handshake(&mut noise, &mut buf, &mut socket).await?;
// now validate this handshake payload against the KNS PKI
let their_id = pki
.get(&their_handshake.name)
.ok_or(anyhow!("unknown KNS name"))?;
validate_handshake(
&their_handshake,
noise
.get_remote_static()
.ok_or(anyhow!("noise error: missing remote pubkey"))?,
&their_id,
&peer_id,
)?;
Ok((
their_id.clone(),
PeerConnection {
noise: noise.into_transport_mode()?,
buf,
socket,
},
))
}
async fn init_connection(
our: &Identity,
our_ip: &str,
peer_id: &Identity,
keypair: &Ed25519KeyPair,
use_router: Option<&Identity>,
proxy_request: bool,
) -> Result<PeerConnection> {
let mut buf = vec![0u8; 65535];
let (mut noise, our_static_key) = build_initiator();
let (ref ip, ref port) = match use_router {
None => peer_id
.ws_routing()
.ok_or(anyhow!("target has no routing information"))?,
Some(router_id) => router_id
.ws_routing()
.ok_or(anyhow!("target has no routing information"))?,
};
let ws_url = make_conn_url(our_ip, ip, port, "ws")?;
let Ok(Ok((mut socket, _response))) = time::timeout(TIMEOUT, connect_async(ws_url)).await
else {
return Err(anyhow!("failed to connect to target"));
};
// if this is a routed request, before starting XX handshake pattern, send a
// routing request message over socket
if use_router.is_some() {
let req = rmp_serde::to_vec(&RoutingRequest {
protocol_version: 1,
source: our.name.clone(),
signature: keypair
.sign(
[&peer_id.name, use_router.unwrap().name.as_str()]
.concat()
.as_bytes(),
)
.as_ref()
.to_vec(),
target: peer_id.name.clone(),
})?;
socket.send(tungstenite::Message::binary(req)).await?;
}
// -> e
let len = noise.write_message(&[], &mut buf)?;
socket
.send(tungstenite::Message::binary(&buf[..len]))
.await?;
// <- e, ee, s, es
let their_handshake = utils::recv_protocol_handshake(&mut noise, &mut buf, &mut socket).await?;
// now validate this handshake payload against the KNS PKI
validate_handshake(
&their_handshake,
noise
.get_remote_static()
.ok_or(anyhow!("noise error: missing remote pubkey"))?,
peer_id,
)?;
// -> s, se
utils::send_protocol_handshake(
our,
keypair,
&our_static_key,
&mut noise,
&mut buf,
&mut socket,
proxy_request,
)
.await?;
Ok(PeerConnection {
noise: noise.into_transport_mode()?,
buf,

View File

@ -1,7 +1,9 @@
use crate::net::{
types::{HandshakePayload, Peers},
utils::{make_conn_url, print_debug, print_loud},
ws::{PeerConnection, PendingPassthroughs, MESSAGE_MAX_SIZE, TIMEOUT},
types::{
HandshakePayload, IdentityExt, Peers, PendingPassthroughs, PendingStream, WS_PROTOCOL,
},
utils::{maintain_passthrough, make_conn_url, print_debug, print_loud},
ws::{PeerConnection, WebSocket, MESSAGE_MAX_SIZE, TIMEOUT},
};
use lib::core::{
Address, Identity, KernelMessage, Message, MessageSender, NetAction, NodeId, PrintSender,
@ -9,10 +11,9 @@ use lib::core::{
};
use {
futures::{SinkExt, StreamExt},
ring::signature::Ed25519KeyPair,
tokio::sync::mpsc::UnboundedReceiver,
tokio::time,
tokio_tungstenite::{connect_async, tungstenite, MaybeTlsStream, WebSocketStream},
tokio_tungstenite::{connect_async, tungstenite},
};
/// should always be spawned on its own task
@ -24,6 +25,7 @@ pub async fn maintain_connection(
kernel_message_tx: MessageSender,
print_tx: PrintSender,
) {
println!("maintain_connection\r");
let mut last_message = std::time::Instant::now();
loop {
tokio::select! {
@ -99,75 +101,40 @@ pub async fn maintain_connection(
peers.remove(&peer_name);
}
/// cross the streams
pub async fn maintain_passthrough(
mut socket_1: WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>,
mut socket_2: WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>,
) {
let mut last_message = std::time::Instant::now();
loop {
tokio::select! {
maybe_recv = socket_1.next() => {
match maybe_recv {
Some(Ok(msg)) => {
let Ok(()) = socket_2.send(msg).await else {
break
};
last_message = std::time::Instant::now();
}
_ => break,
}
},
maybe_recv = socket_2.next() => {
match maybe_recv {
Some(Ok(msg)) => {
let Ok(()) = socket_1.send(msg).await else {
break
};
last_message = std::time::Instant::now();
}
_ => break,
}
},
// if a message has not been sent or received in ~2 hours, close the connection
_ = tokio::time::sleep(std::time::Duration::from_secs(7200)) => {
if last_message.elapsed().as_secs() > 7200 {
break
}
}
}
}
let _ = socket_1.close(None).await;
let _ = socket_2.close(None).await;
}
pub async fn create_passthrough(
our: &Identity,
our_ip: &str,
from_id: Identity,
target_id: Identity,
peers: &Peers,
pending_passthroughs: &mut PendingPassthroughs,
socket_1: WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>,
pending_passthroughs: &PendingPassthroughs,
socket_1: WebSocket,
) -> anyhow::Result<()> {
println!("create_passthrough\r");
// if the target has already generated a pending passthrough for this source,
// immediately match them
if let Some(((_to, _from), socket_2)) =
if let Some(((_target, _from), pending_stream)) =
pending_passthroughs.remove(&(target_id.name.clone(), from_id.name.clone()))
{
tokio::spawn(maintain_passthrough(socket_1, socket_2));
tokio::spawn(maintain_passthrough(
PendingStream::WebSocket(socket_1),
pending_stream,
));
return Ok(());
}
if let Some((ip, ws_port)) = target_id.ws_routing() {
// create passthrough to direct node
// TODO this won't ever happen currently since we validate
// passthrough requests as being to a node we route for
let ws_url = make_conn_url(our_ip, ip, ws_port, "ws")?;
let ws_url = make_conn_url(our_ip, ip, ws_port, WS_PROTOCOL)?;
let Ok(Ok((socket_2, _response))) = time::timeout(TIMEOUT, connect_async(ws_url)).await
else {
return Err(anyhow::anyhow!("failed to connect to target"));
};
tokio::spawn(maintain_passthrough(socket_1, socket_2));
tokio::spawn(maintain_passthrough(
PendingStream::WebSocket(socket_1),
PendingStream::WebSocket(socket_2),
));
return Ok(());
}
// create passthrough to indirect node that we do routing for
@ -200,8 +167,14 @@ pub async fn create_passthrough(
}),
lazy_load_blob: None,
})?;
pending_passthroughs.insert((from_id.name, target_id.name), socket_1);
// we'll remove this either if the above message gets a negative response,
// or if the target node connects to us with a matching passthrough.
// TODO it is currently possible to have dangling passthroughs in the map
// if the target is "connected" to us but nonresponsive.
pending_passthroughs.insert(
(from_id.name, target_id.name),
PendingStream::WebSocket(socket_1),
);
Ok(())
}
@ -232,7 +205,7 @@ pub async fn send_protocol_message(
pub async fn recv_protocol_message(conn: &mut PeerConnection) -> anyhow::Result<KernelMessage> {
let outer_len = conn
.noise
.read_message(&ws_recv(&mut conn.socket).await?, &mut conn.buf)?;
.read_message(&recv(&mut conn.socket).await?, &mut conn.buf)?;
if outer_len < 4 {
return Err(anyhow::anyhow!("protocol message too small!"));
@ -249,7 +222,7 @@ pub async fn recv_protocol_message(conn: &mut PeerConnection) -> anyhow::Result<
while msg.len() < msg_len as usize {
let len = conn
.noise
.read_message(&ws_recv(&mut conn.socket).await?, &mut conn.buf)?;
.read_message(&recv(&mut conn.socket).await?, &mut conn.buf)?;
msg.extend_from_slice(&conn.buf[..len]);
}
@ -257,18 +230,17 @@ pub async fn recv_protocol_message(conn: &mut PeerConnection) -> anyhow::Result<
}
pub async fn send_protocol_handshake(
our: &Identity,
keypair: &Ed25519KeyPair,
ext: &IdentityExt,
noise_static_key: &[u8],
noise: &mut snow::HandshakeState,
buf: &mut [u8],
socket: &mut WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>,
socket: &mut WebSocket,
proxy_request: bool,
) -> anyhow::Result<()> {
let our_hs = rmp_serde::to_vec(&HandshakePayload {
protocol_version: 1,
name: our.name.clone(),
signature: keypair.sign(noise_static_key).as_ref().to_vec(),
name: ext.our.name.clone(),
signature: ext.keypair.sign(noise_static_key).as_ref().to_vec(),
proxy_request,
})
.expect("failed to serialize handshake payload");
@ -283,17 +255,15 @@ pub async fn send_protocol_handshake(
pub async fn recv_protocol_handshake(
noise: &mut snow::HandshakeState,
buf: &mut [u8],
socket: &mut WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>,
socket: &mut WebSocket,
) -> anyhow::Result<HandshakePayload> {
let len = noise.read_message(&ws_recv(socket).await?, buf)?;
let len = noise.read_message(&recv(socket).await?, buf)?;
Ok(rmp_serde::from_slice(&buf[..len])?)
}
/// Receive a byte array from a read stream. If this returns an error,
/// we should close the connection. Will automatically respond to 'PING' messages with a 'PONG'.
pub async fn ws_recv(
socket: &mut WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>,
) -> anyhow::Result<Vec<u8>> {
pub async fn recv(socket: &mut WebSocket) -> anyhow::Result<Vec<u8>> {
loop {
match socket.next().await {
Some(Ok(tungstenite::Message::Ping(_))) => {

View File

@ -1037,6 +1037,13 @@ impl Identity {
NodeRouting::Both { ports, .. } => ports.get(protocol).cloned(),
}
}
pub fn get_ip(&self) -> Option<&str> {
match &self.routing {
NodeRouting::Routers(_) => None,
NodeRouting::Direct { ip, .. } => Some(ip),
NodeRouting::Both { ip, .. } => Some(ip),
}
}
pub fn ws_routing(&self) -> Option<(&str, &u16)> {
match &self.routing {
NodeRouting::Routers(_) => None,
@ -1056,6 +1063,25 @@ impl Identity {
}
}
}
pub fn tcp_routing(&self) -> Option<(&str, &u16)> {
match &self.routing {
NodeRouting::Routers(_) => None,
NodeRouting::Direct { ip, ports } => {
if let Some(port) = ports.get("tcp") {
Some((ip, port))
} else {
None
}
}
NodeRouting::Both { ip, ports, .. } => {
if let Some(port) = ports.get("tcp") {
Some((ip, port))
} else {
None
}
}
}
}
pub fn routers(&self) -> Option<&Vec<NodeId>> {
match &self.routing {
NodeRouting::Routers(routers) => Some(routers),