replace old net with net2

This commit is contained in:
dr-frmr 2023-10-31 15:43:19 -04:00
parent 878be1b51e
commit 4094be9293
No known key found for this signature in database
7 changed files with 1134 additions and 2766 deletions

View File

@ -14,7 +14,7 @@ mod http_client;
mod http_server;
mod kernel;
mod keygen;
mod net2;
mod net;
mod register;
mod terminal;
mod types;
@ -255,7 +255,7 @@ async fn main() {
vfs_message_sender,
encryptor_sender,
));
tasks.spawn(net2::networking(
tasks.spawn(net::networking(
our.clone(),
our_ip.to_string(),
networking_keypair_arc.clone(),

View File

@ -1,599 +0,0 @@
use crate::net::*;
use chacha20poly1305::{
aead::{Aead, AeadCore, KeyInit, OsRng},
XChaCha20Poly1305, XNonce,
};
use elliptic_curve::ecdh::SharedSecret;
use futures::{SinkExt, StreamExt};
use ring::signature::Ed25519KeyPair;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;
use tokio_tungstenite::tungstenite;
pub async fn build_connection(
our: Identity,
keypair: Arc<Ed25519KeyPair>,
pki: OnchainPKI,
keys: PeerKeys,
peers: Peers,
websocket: WebSocket,
kernel_message_tx: MessageSender,
net_message_tx: MessageSender,
network_error_tx: NetworkErrorSender,
with: Option<String>,
) -> (
UnboundedSender<(NetworkMessage, Option<ErrorShuttle>)>,
JoinHandle<Option<String>>,
) {
// println!("building new connection\r");
let (message_tx, message_rx) = unbounded_channel::<(NetworkMessage, Option<ErrorShuttle>)>();
let handle = tokio::spawn(maintain_connection(
our,
with,
keypair,
pki,
keys,
peers,
websocket,
message_tx.clone(),
message_rx,
kernel_message_tx,
net_message_tx,
network_error_tx,
));
return (message_tx, handle);
}
/// Keeps a connection alive and handles sending and receiving of NetworkMessages through it.
/// TODO add a keepalive PING/PONG system
/// TODO kill this after a certain amount of inactivity
pub async fn maintain_connection(
our: Identity,
with: Option<String>,
keypair: Arc<Ed25519KeyPair>,
pki: OnchainPKI,
keys: PeerKeys,
peers: Peers,
websocket: WebSocket,
message_tx: UnboundedSender<(NetworkMessage, Option<ErrorShuttle>)>,
mut message_rx: UnboundedReceiver<(NetworkMessage, Option<ErrorShuttle>)>,
kernel_message_tx: MessageSender,
net_message_tx: MessageSender,
network_error_tx: NetworkErrorSender,
) -> Option<String> {
// let conn_id: u64 = rand::random();
// println!("maintaining connection {conn_id}\r");
// accept messages on the websocket in one task, and send messages in another
let (mut write_stream, mut read_stream) = websocket.split();
let (forwarding_ack_tx, mut forwarding_ack_rx) = unbounded_channel::<MessageResult>();
// manage outstanding ACKs from messages sent over the connection
// TODO replace with more performant data structure
let ack_map = Arc::new(RwLock::new(HashMap::<u64, ErrorShuttle>::new()));
let sender_ack_map = ack_map.clone();
let last_pong = Arc::new(RwLock::new(tokio::time::Instant::now()));
let ping_last_pong = last_pong.clone();
let ping_tx = message_tx.clone();
// Ping/Pong keepalive task
let ping_task = tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
if ping_last_pong.read().await.elapsed() > tokio::time::Duration::from_secs(60) {
break;
}
if let Err(_) = ping_tx.send((NetworkMessage::Ping, None)) {
// Failed to send Ping, kill the connection
break;
}
}
});
let forwarder_message_tx = message_tx.clone();
let ack_forwarder = tokio::spawn(async move {
while let Some(result) = forwarding_ack_rx.recv().await {
match result {
Ok(NetworkMessage::Ack(id)) => {
// println!("net: got forwarding ack for message {}\r", id);
forwarder_message_tx
.send((NetworkMessage::Ack(id), None))
.unwrap();
}
Ok(NetworkMessage::Nack(id)) => {
// println!("net: got forwarding nack for message {}\r", id);
forwarder_message_tx
.send((NetworkMessage::Nack(id), None))
.unwrap();
}
Ok(NetworkMessage::HandshakeAck(handshake)) => {
// println!(
// "net: got forwarding handshakeAck for message {}\r",
// handshake.id
// );
forwarder_message_tx
.send((NetworkMessage::HandshakeAck(handshake), None))
.unwrap();
}
Err((message_id, _e)) => {
// println!("net: got forwarding error from ack_rx: {:?}\r", e);
// what do we do here?
forwarder_message_tx
.send((NetworkMessage::Nack(message_id), None))
.unwrap();
}
_ => {
// println!("net: weird none ack\r");
}
}
}
});
// receive messages from over the websocket and route them to the correct peer handler,
// or create it, if necessary.
let ws_receiver = tokio::spawn(async move {
while let Some(incoming) = read_stream.next().await {
let Ok(tungstenite::Message::Binary(bin)) = incoming else {
if let Ok(tungstenite::Message::Ping(_)) = incoming {
// let _ = write_stream.send(tungstenite::Message::Pong(vec![])).await;
}
continue;
};
// TODO use a language-netural serialization format here!
let Ok(net_message) = bincode::deserialize::<NetworkMessage>(&bin) else {
// just kill the connection if we get a non-Uqbar message
break;
};
match net_message {
NetworkMessage::Pong => {
*last_pong.write().await = tokio::time::Instant::now();
continue;
}
NetworkMessage::Ping => {
// respond with a Pong
let _ = message_tx.send((NetworkMessage::Pong, None));
continue;
}
NetworkMessage::Ack(id) => {
let Some(result_tx) = ack_map.write().await.remove(&id) else {
// println!("conn {conn_id}: got unexpected Ack {id}\r");
continue;
};
// println!("conn {conn_id}: got Ack {id}\r");
let _ = result_tx.send(Ok(net_message));
continue;
}
NetworkMessage::Nack(id) => {
let Some(result_tx) = ack_map.write().await.remove(&id) else {
// println!("net: got unexpected Nack\r");
continue;
};
let _ = result_tx.send(Ok(net_message));
continue;
}
NetworkMessage::Msg {
ref id,
ref from,
ref to,
ref contents,
} => {
// println!("conn {conn_id}: handling msg {id}\r");
// if the message is *directed to us*, try to handle with the
// matching peer handler "decrypter".
//
if to == &our.name {
// if we have the peer, send the message to them.
if let Some(peer) = peers.read().await.get(from) {
let _ = peer
.decrypter
.send((contents.to_owned(), forwarding_ack_tx.clone()));
continue;
}
// if we don't have the peer, see if we have the keys to create them.
// if we don't have their keys, throw a nack.
if let Some((peer_id, secret)) = keys.read().await.get(from) {
let new_peer = create_new_peer(
our.clone(),
peer_id.clone(),
peers.clone(),
keys.clone(),
secret.clone(),
message_tx.clone(),
kernel_message_tx.clone(),
net_message_tx.clone(),
network_error_tx.clone(),
);
let _ = new_peer
.decrypter
.send((contents.to_owned(), forwarding_ack_tx.clone()));
peers.write().await.insert(peer_id.name.clone(), new_peer);
} else {
// println!("net: nacking message {id}\r");
message_tx.send((NetworkMessage::Nack(*id), None)).unwrap();
}
} else {
// if the message is *directed to someone else*, try to handle
// with the matching peer handler "sender".
//
if let Some(peer) = peers.read().await.get(to) {
let id = *id;
let to = to.clone();
match peer.sender.send((
PeerMessage::Net(net_message),
Some(forwarding_ack_tx.clone()),
)) {
Ok(_) => {}
Err(_) => {
peers.write().await.remove(&to);
message_tx.send((NetworkMessage::Nack(id), None)).unwrap();
}
}
} else {
// if we don't have the peer, throw a nack.
// println!("net: nacking message with id {id}\r");
message_tx.send((NetworkMessage::Nack(*id), None)).unwrap();
}
}
}
NetworkMessage::Handshake(ref handshake) => {
// when we get a handshake, if we are the target,
// 1. verify it against the PKI
// 2. send a response handshakeAck
// 3. create a Peer and save, replacing old one if it existed
// as long as we are the target, we also get to kill this connection
// if the handshake is invalid, since it must be directly "to" us.
if handshake.target == our.name {
let Some(peer_id) = pki.read().await.get(&handshake.from).cloned() else {
// println!(
// "net: failed handshake with unknown node {}\r",
// handshake.from
// );
message_tx
.send((NetworkMessage::Nack(handshake.id), None))
.unwrap();
break;
};
let their_ephemeral_pk = match validate_handshake(&handshake, &peer_id) {
Ok(pk) => pk,
Err(e) => {
println!("net: invalid handshake from {}: {}\r", handshake.from, e);
message_tx
.send((NetworkMessage::Nack(handshake.id), None))
.unwrap();
break;
}
};
let (secret, handshake) = make_secret_and_handshake(
&our,
keypair.clone(),
&handshake.from,
Some(handshake.id),
);
message_tx
.send((NetworkMessage::HandshakeAck(handshake), None))
.unwrap();
let secret = Arc::new(secret.diffie_hellman(&their_ephemeral_pk));
// save the handshake to our Keys map
keys.write()
.await
.insert(peer_id.name.clone(), (peer_id.clone(), secret.clone()));
let new_peer = create_new_peer(
our.clone(),
peer_id.clone(),
peers.clone(),
keys.clone(),
secret,
message_tx.clone(),
kernel_message_tx.clone(),
net_message_tx.clone(),
network_error_tx.clone(),
);
// we might be replacing an old peer, so we need to remove it first
// we can't rely on the hashmap for this, because the dropped peer
// will trigger a drop of the sender, which will kill the peer_handler
peers.write().await.remove(&peer_id.name);
peers.write().await.insert(peer_id.name.clone(), new_peer);
} else {
// if we are NOT the target,
// try to send it to the matching peer handler "sender"
if let Some(peer) = peers.read().await.get(&handshake.target) {
let _ = peer.sender.send((
PeerMessage::Net(net_message),
Some(forwarding_ack_tx.clone()),
));
} else {
// if we don't have the peer, throw a nack.
// println!("net: nacking handshake with id {}\r", handshake.id);
message_tx
.send((NetworkMessage::Nack(handshake.id), None))
.unwrap();
}
}
}
NetworkMessage::HandshakeAck(ref handshake) => {
let Some(result_tx) = ack_map.write().await.remove(&handshake.id) else {
continue;
};
let _ = result_tx.send(Ok(net_message));
}
}
}
});
tokio::select! {
_ = ws_receiver => {
// println!("ws_receiver died\r");
},
_ = ack_forwarder => {
// println!("ack_forwarder died\r");
},
_ = ping_task => {
// println!("ping_task died\r");
},
// receive messages we would like to send to peers along this connection
// and send them to the websocket
_ = async {
while let Some((message, result_tx)) = message_rx.recv().await {
// TODO use a language-netural serialization format here!
if let Ok(bytes) = bincode::serialize::<NetworkMessage>(&message) {
match &message {
NetworkMessage::Msg { id, .. } => {
// println!("conn {conn_id}: piping msg {id}\r");
sender_ack_map.write().await.insert(*id, result_tx.unwrap());
}
NetworkMessage::Handshake(h) => {
sender_ack_map.write().await.insert(h.id, result_tx.unwrap());
}
_ => {}
}
match write_stream.send(tungstenite::Message::Binary(bytes)).await {
Ok(()) => {}
Err(e) => {
// println!("net: send error: {:?}\r", e);
let id = match &message {
NetworkMessage::Msg { id, .. } => id,
NetworkMessage::Handshake(h) => &h.id,
_ => continue,
};
let Some(result_tx) = sender_ack_map.write().await.remove(&id) else {
continue;
};
// TODO learn how to handle other non-fatal websocket errors.
match e {
tungstenite::error::Error::Capacity(_)
| tungstenite::Error::Io(_) => {
let _ = result_tx.send(Err((*id, SendErrorKind::Timeout)));
}
_ => {
let _ = result_tx.send(Ok(NetworkMessage::Nack(*id)));
}
}
}
}
}
}
} => {
// println!("ws_sender died\r");
},
};
return with;
}
/// After a successful handshake, use information to spawn a new `peer_handler` task
/// and save a `Peer` in our peers mapping. Returns a sender to use for sending messages
/// to this peer, which will also be saved in its Peer struct.
pub fn create_new_peer(
our: Identity,
new_peer_id: Identity,
peers: Peers,
keys: PeerKeys,
secret: Arc<SharedSecret<Secp256k1>>,
conn_sender: UnboundedSender<(NetworkMessage, Option<ErrorShuttle>)>,
kernel_message_tx: MessageSender,
net_message_tx: MessageSender,
network_error_tx: NetworkErrorSender,
) -> Peer {
let (message_tx, message_rx) = unbounded_channel::<(PeerMessage, Option<ErrorShuttle>)>();
let (decrypter_tx, decrypter_rx) = unbounded_channel::<(Vec<u8>, ErrorShuttle)>();
let peer_id_name = new_peer_id.name.clone();
let peer_conn_sender = conn_sender.clone();
tokio::spawn(async move {
match peer_handler(
our,
peer_id_name.clone(),
secret,
message_rx,
decrypter_rx,
peer_conn_sender,
kernel_message_tx,
network_error_tx,
)
.await
{
None => {
// println!("net: dropping peer handler but not deleting\r");
}
Some(km) => {
// println!("net: ok actually deleting peer+keys now and retrying send\r");
peers.write().await.remove(&peer_id_name);
keys.write().await.remove(&peer_id_name);
let _ = net_message_tx.send(km).await;
}
}
});
return Peer {
identity: new_peer_id,
sender: message_tx,
decrypter: decrypter_tx,
socket_tx: conn_sender,
};
}
/// 1. take in messages from a specific peer, decrypt them, and send to kernel
/// 2. take in messages targeted at specific peer and either:
/// - encrypt them, and send to proper connection
/// - forward them untouched along the connection
async fn peer_handler(
our: Identity,
who: String,
secret: Arc<SharedSecret<Secp256k1>>,
mut message_rx: UnboundedReceiver<(PeerMessage, Option<ErrorShuttle>)>,
mut decrypter_rx: UnboundedReceiver<(Vec<u8>, ErrorShuttle)>,
socket_tx: UnboundedSender<(NetworkMessage, Option<ErrorShuttle>)>,
kernel_message_tx: MessageSender,
network_error_tx: NetworkErrorSender,
) -> Option<KernelMessage> {
// println!("peer_handler\r");
let mut key = [0u8; 32];
secret
.extract::<sha2::Sha256>(None)
.expand(&[], &mut key)
.unwrap();
let cipher = XChaCha20Poly1305::new(generic_array::GenericArray::from_slice(&key));
let (ack_tx, mut ack_rx) = unbounded_channel::<MessageResult>();
// TODO use a more efficient data structure
let ack_map = Arc::new(RwLock::new(HashMap::<u64, KernelMessage>::new()));
let recv_ack_map = ack_map.clone();
tokio::select! {
//
// take in messages from a specific peer, decrypt them, and send to kernel
//
_ = async {
while let Some((encrypted_bytes, result_tx)) = decrypter_rx.recv().await {
let nonce = XNonce::from_slice(&encrypted_bytes[..24]);
if let Ok(decrypted) = cipher.decrypt(&nonce, &encrypted_bytes[24..]) {
if let Ok(message) = bincode::deserialize::<KernelMessage>(&decrypted) {
if message.source.node == who {
// println!("net: got peer message {}, acking\r", message.id);
let _ = result_tx.send(Ok(NetworkMessage::Ack(message.id)));
let _ = kernel_message_tx.send(message).await;
continue;
}
println!("net: got message 'from' wrong person! cheater/liar!\r");
break;
}
println!("net: failed to deserialize message from {}\r", who);
break;
}
println!("net: failed to decrypt message from {}, could be spoofer\r", who);
break;
}
} => {
// println!("net: lost peer {who}\r");
return None
}
//
// take in messages targeted at specific peer and either:
// - encrypt them, and send to proper connection
// - forward them untouched along the connection
//
_ = async {
// if we get a result_tx, rather than track it here, let a different
// part of the code handle whatever comes back from the socket.
while let Some((message, maybe_result_tx)) = message_rx.recv().await {
// if message is raw, we should encrypt.
// otherwise, simply send
match message {
PeerMessage::Raw(message) => {
let id = message.id;
if let Ok(bytes) = bincode::serialize::<KernelMessage>(&message) {
// generating a random nonce for each message.
// this isn't really as secure as we could get: should
// add a counter and then throw away the key when we hit a
// certain # of messages. TODO.
let nonce = XChaCha20Poly1305::generate_nonce(&mut OsRng);
if let Ok(encrypted) = cipher.encrypt(&nonce, bytes.as_ref()) {
if maybe_result_tx.is_none() {
ack_map.write().await.insert(id, message);
}
match socket_tx.send((
NetworkMessage::Msg {
from: our.name.clone(),
to: who.clone(),
id: id,
contents: [nonce.to_vec(), encrypted].concat(),
},
Some(maybe_result_tx.unwrap_or(ack_tx.clone())),
)) {
Ok(()) => tokio::task::yield_now().await,
Err(tokio::sync::mpsc::error::SendError((_, result_tx))) => {
// println!("net: lost socket with {who}\r");
let _ = result_tx.unwrap().send(Ok(NetworkMessage::Nack(id)));
},
}
}
}
}
PeerMessage::Net(net_message) => {
match socket_tx.send((net_message, Some(maybe_result_tx.unwrap_or(ack_tx.clone())))) {
Ok(()) => continue,
Err(tokio::sync::mpsc::error::SendError((net_message, result_tx))) => {
// println!("net: lost *forwarding* socket with {who}\r");
let id = match net_message {
NetworkMessage::Msg { id, .. } => id,
NetworkMessage::Handshake(h) => h.id,
_ => continue,
};
let _ = result_tx.unwrap().send(Ok(NetworkMessage::Nack(id)));
break;
},
}
}
}
}
} => return None,
//
// receive acks and nacks from our socket
// throw away acks, but kill this peer and retry the send on nacks.
//
maybe_km = async {
while let Some(result) = ack_rx.recv().await {
match result {
Ok(NetworkMessage::Ack(id)) => {
// println!("net: got ack for message {}\r", id);
recv_ack_map.write().await.remove(&id);
continue;
}
Ok(NetworkMessage::Nack(id)) => {
// println!("net: got nack for message {}\r", id);
let Some(km) = recv_ack_map.write().await.remove(&id) else {
continue;
};
// when we get a Nack, **delete this peer** and try to send the message again!
return Some(km)
}
Err((message_id, e)) => {
// println!("net: got error from ack_rx: {:?}\r", e);
// in practice this is always a timeout in current implementation
let Some(km) = recv_ack_map.write().await.remove(&message_id) else {
continue;
};
let _ = network_error_tx
.send(WrappedSendError {
id: km.id,
source: km.source,
error: SendError {
kind: e,
target: km.target,
message: km.message,
payload: km.payload,
},
})
.await;
return None
}
_ => {
// println!("net: weird none ack\r");
return None
}
}
}
return None;
} => {
// println!("net: exiting peer due to nackage\r");
return maybe_km
},
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,71 +1,108 @@
use crate::types::*;
use anyhow::Result;
use elliptic_curve::ecdh::SharedSecret;
use ethers::prelude::k256::Secp256k1;
use futures::stream::{SplitSink, SplitStream};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
use tokio::net::TcpStream;
use tokio::sync::{mpsc, RwLock};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tokio::sync::mpsc::UnboundedSender;
use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream};
pub type PeerKeys = Arc<RwLock<HashMap<String, (Identity, Arc<SharedSecret<Secp256k1>>)>>>;
pub type Peers = Arc<RwLock<HashMap<String, Peer>>>;
pub type WebSocket = WebSocketStream<MaybeTlsStream<TcpStream>>;
pub type MessageResult = Result<NetworkMessage, (u64, SendErrorKind)>;
pub type ErrorShuttle = mpsc::UnboundedSender<MessageResult>;
/// Sent to a node when you want to connect directly to them.
/// Sent in the 'e, ee, s, es' and 's, se' phases of XX noise protocol pattern.
#[derive(Debug, Deserialize, Serialize)]
pub struct HandshakePayload {
pub name: NodeId,
// signature is created by their networking key, of their static key
// someone could reuse this signature, but then they will be unable
// to encrypt messages to us.
pub signature: Vec<u8>,
/// Set to true when you want them to act as a router for you, sending
/// messages from potentially many remote sources over this connection,
/// including from the router itself.
/// This is not relevant in a handshake sent from the receiver side.
pub proxy_request: bool,
pub protocol_version: u8,
}
/// Sent to a node when you want them to connect you to an indirect node.
/// If the receiver of the request has an open connection to your target,
/// and is willing, they will send a message to the target prompting them
/// to build the other side of the connection, at which point they will
/// hold open a Passthrough for you two.
///
/// Alternatively, if the receiver does not have an open connection but the
/// target is a direct node, they can create a Passthrough for you two if
/// they are willing to proxy for you.
///
/// Sent in the 'e' phase of XX noise protocol pattern.
#[derive(Debug, Deserialize, Serialize)]
pub struct RoutingRequest {
pub source: NodeId,
// signature is created by their networking key, of the [target, router name].concat()
// someone could reuse this signature, and TODO need to make sure that's useless.
pub signature: Vec<u8>,
pub target: NodeId,
pub protocol_version: u8,
}
pub enum Connection {
Peer(PeerConnection),
Passthrough(PassthroughConnection),
PendingPassthrough(PendingPassthroughConnection),
}
pub struct PeerConnection {
pub noise: snow::TransportState,
pub buf: Vec<u8>,
pub write_stream: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
pub read_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
}
pub struct PassthroughConnection {
pub write_stream_1: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
pub read_stream_1: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
pub write_stream_2: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
pub read_stream_2: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
}
pub struct PendingPassthroughConnection {
pub target: NodeId,
pub write_stream: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
pub read_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
}
// TODO upgrade from hashmaps
pub type Peers = HashMap<String, Arc<Peer>>;
pub type PKINames = HashMap<String, NodeId>; // TODO maybe U256 to String
pub type OnchainPKI = HashMap<String, Identity>;
pub type PendingPassthroughs = HashMap<(NodeId, NodeId), PendingPassthroughConnection>;
/// stored in mapping by their username
pub struct Peer {
pub identity: Identity,
// send messages here to have them encrypted and sent across an active connection
pub sender: mpsc::UnboundedSender<(PeerMessage, Option<ErrorShuttle>)>,
// send encrypted messages from this peer here to have them decrypted and sent to kernel
pub decrypter: mpsc::UnboundedSender<(Vec<u8>, ErrorShuttle)>,
pub socket_tx: mpsc::UnboundedSender<(NetworkMessage, Option<ErrorShuttle>)>,
}
/// parsed from Binary websocket message
/// TODO add a version number somewhere in the serialized format!!
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NetworkMessage {
Ack(u64),
Nack(u64),
Msg {
id: u64,
from: String,
to: String,
contents: Vec<u8>,
},
Handshake(Handshake),
HandshakeAck(Handshake),
// only used in implementation, not part of protocol
Ping,
Pong,
}
pub enum PeerMessage {
Raw(KernelMessage),
Net(NetworkMessage),
}
/// contains identity and encryption keys, used in initial handshake.
/// parsed from Text websocket message
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Handshake {
pub id: u64,
pub from: String,
pub target: String,
pub id_signature: Vec<u8>,
pub ephemeral_public_key: Vec<u8>,
pub ephemeral_public_key_signature: Vec<u8>,
/// If true, we are routing for them and have a RoutingClientConnection
/// associated with them. We can send them prompts to establish Passthroughs.
pub routing_for: bool,
pub sender: UnboundedSender<KernelMessage>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NetActions {
/// Received from a router of ours when they have a new pending passthrough for us.
/// We should respond (if we desire) by using them to initialize a routed connection
/// with the NodeId given.
ConnectionRequest(NodeId),
/// can only receive from trusted source, for now just ourselves locally,
/// in the future could get from remote provider
QnsUpdate(QnsUpdate),
QnsBatchUpdate(Vec<QnsUpdate>),
}
/// For now, only sent in response to a ConnectionRequest
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NetResponses {
Attempting(NodeId),
Rejected(NodeId),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct QnsUpdate {
pub name: String, // actual username / domain name

View File

@ -1,4 +1,4 @@
use crate::net2::{types::*, MESSAGE_MAX_SIZE, TIMEOUT};
use crate::net::{types::*, MESSAGE_MAX_SIZE, TIMEOUT};
use crate::types::*;
use anyhow::{anyhow, Result};
use futures::stream::{SplitSink, SplitStream};

File diff suppressed because it is too large Load Diff

View File

@ -1,115 +0,0 @@
use crate::types::*;
use futures::stream::{SplitSink, SplitStream};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
use tokio::net::TcpStream;
use tokio::sync::mpsc::UnboundedSender;
use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream};
/// Sent to a node when you want to connect directly to them.
/// Sent in the 'e, ee, s, es' and 's, se' phases of XX noise protocol pattern.
#[derive(Debug, Deserialize, Serialize)]
pub struct HandshakePayload {
pub name: NodeId,
// signature is created by their networking key, of their static key
// someone could reuse this signature, but then they will be unable
// to encrypt messages to us.
pub signature: Vec<u8>,
/// Set to true when you want them to act as a router for you, sending
/// messages from potentially many remote sources over this connection,
/// including from the router itself.
/// This is not relevant in a handshake sent from the receiver side.
pub proxy_request: bool,
pub protocol_version: u8,
}
/// Sent to a node when you want them to connect you to an indirect node.
/// If the receiver of the request has an open connection to your target,
/// and is willing, they will send a message to the target prompting them
/// to build the other side of the connection, at which point they will
/// hold open a Passthrough for you two.
///
/// Alternatively, if the receiver does not have an open connection but the
/// target is a direct node, they can create a Passthrough for you two if
/// they are willing to proxy for you.
///
/// Sent in the 'e' phase of XX noise protocol pattern.
#[derive(Debug, Deserialize, Serialize)]
pub struct RoutingRequest {
pub source: NodeId,
// signature is created by their networking key, of the [target, router name].concat()
// someone could reuse this signature, and TODO need to make sure that's useless.
pub signature: Vec<u8>,
pub target: NodeId,
pub protocol_version: u8,
}
pub enum Connection {
Peer(PeerConnection),
Passthrough(PassthroughConnection),
PendingPassthrough(PendingPassthroughConnection),
}
pub struct PeerConnection {
pub noise: snow::TransportState,
pub buf: Vec<u8>,
pub write_stream: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
pub read_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
}
pub struct PassthroughConnection {
pub write_stream_1: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
pub read_stream_1: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
pub write_stream_2: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
pub read_stream_2: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
}
pub struct PendingPassthroughConnection {
pub target: NodeId,
pub write_stream: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
pub read_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
}
// TODO upgrade from hashmaps
pub type Peers = HashMap<String, Arc<Peer>>;
pub type PKINames = HashMap<String, NodeId>; // TODO maybe U256 to String
pub type OnchainPKI = HashMap<String, Identity>;
pub type PendingPassthroughs = HashMap<(NodeId, NodeId), PendingPassthroughConnection>;
pub struct Peer {
pub identity: Identity,
/// If true, we are routing for them and have a RoutingClientConnection
/// associated with them. We can send them prompts to establish Passthroughs.
pub routing_for: bool,
pub sender: UnboundedSender<KernelMessage>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NetActions {
/// Received from a router of ours when they have a new pending passthrough for us.
/// We should respond (if we desire) by using them to initialize a routed connection
/// with the NodeId given.
ConnectionRequest(NodeId),
/// can only receive from trusted source, for now just ourselves locally,
/// in the future could get from remote provider
QnsUpdate(QnsUpdate),
QnsBatchUpdate(Vec<QnsUpdate>),
}
/// For now, only sent in response to a ConnectionRequest
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NetResponses {
Attempting(NodeId),
Rejected(NodeId),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct QnsUpdate {
pub name: String, // actual username / domain name
pub owner: String,
pub node: String, // hex namehash of node
pub public_key: String,
pub ip: String,
pub port: u16,
pub routers: Vec<String>,
}