mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-11-26 23:27:14 +03:00
Merge branch 'develop' into dr/app-store-rewrite
This commit is contained in:
commit
34ea8455a5
16
Cargo.lock
generated
16
Cargo.lock
generated
@ -2750,7 +2750,7 @@ dependencies = [
|
||||
"http-body 1.0.0",
|
||||
"hyper 1.3.1",
|
||||
"pin-project-lite",
|
||||
"socket2 0.5.6",
|
||||
"socket2 0.5.7",
|
||||
"tokio",
|
||||
"tower",
|
||||
"tower-service",
|
||||
@ -3129,6 +3129,7 @@ dependencies = [
|
||||
"sha2",
|
||||
"sha3",
|
||||
"snow",
|
||||
"socket2 0.5.7",
|
||||
"static_dir",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
@ -3264,7 +3265,7 @@ dependencies = [
|
||||
"sha2",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-tungstenite 0.20.1",
|
||||
"tokio-tungstenite 0.21.0",
|
||||
"toml",
|
||||
"tracing",
|
||||
"tracing-appender",
|
||||
@ -5099,9 +5100,8 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
|
||||
|
||||
[[package]]
|
||||
name = "snow"
|
||||
version = "0.9.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "850948bee068e713b8ab860fe1adc4d109676ab4c3b621fd8147f06b261f2f85"
|
||||
version = "0.9.0"
|
||||
source = "git+https://github.com/dr-frmr/snow?branch=dr/extract_cipherstates#1d4eb5f6747aa59aabb32bbbe698fb4bb7dfb9a4"
|
||||
dependencies = [
|
||||
"aes-gcm",
|
||||
"blake2",
|
||||
@ -5126,9 +5126,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.5.6"
|
||||
version = "0.5.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871"
|
||||
checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"windows-sys 0.52.0",
|
||||
@ -5483,7 +5483,7 @@ dependencies = [
|
||||
"num_cpus",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"socket2 0.5.6",
|
||||
"socket2 0.5.7",
|
||||
"tokio-macros",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
@ -80,7 +80,10 @@ serde_json = "1.0"
|
||||
serde_urlencoded = "0.7"
|
||||
sha2 = "0.10"
|
||||
sha3 = "0.10.8"
|
||||
snow = { version = "0.9.5", features = ["ring-resolver"] }
|
||||
# snow = { version = "0.9.5", features = ["ring-resolver"] }
|
||||
# unfortunately need to use forked version for async use and in-place encryption
|
||||
snow = { git = "https://github.com/dr-frmr/snow", branch = "dr/extract_cipherstates", features = ["ring-resolver"] }
|
||||
socket2 = "0.5.7"
|
||||
static_dir = "0.2.0"
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1.28", features = ["fs", "macros", "rt-multi-thread", "signal", "sync"] }
|
||||
|
File diff suppressed because one or more lines are too long
@ -14,7 +14,7 @@
|
||||
<meta httpEquiv="X-UA-Compatible" content="IE=edge" />
|
||||
<meta name="viewport"
|
||||
content="width=device-width, initial-scale=1, minimum-scale=1, maximum-scale=1.00001, viewport-fit=cover" />
|
||||
<script type="module" crossorigin src="/main:app_store:sys/assets/index-Gc4HI_PE.js"></script>
|
||||
<script type="module" crossorigin src="/main:app_store:sys/assets/index--h2dsLNv.js"></script>
|
||||
<link rel="stylesheet" crossorigin href="/main:app_store:sys/assets/index-OOHWYMdt.css">
|
||||
</head>
|
||||
|
||||
|
@ -10,9 +10,10 @@ import classNames from "classnames";
|
||||
interface ActionButtonProps extends React.HTMLAttributes<HTMLButtonElement> {
|
||||
app: AppInfo;
|
||||
isIcon?: boolean;
|
||||
permitMultiButton?: boolean;
|
||||
}
|
||||
|
||||
export default function ActionButton({ app, isIcon = false, ...props }: ActionButtonProps) {
|
||||
export default function ActionButton({ app, isIcon = false, permitMultiButton = false, ...props }: ActionButtonProps) {
|
||||
const { installed, downloaded, updatable } = useMemo(() => {
|
||||
const versions = Object.entries(app?.metadata?.properties?.code_hashes || {});
|
||||
const latestHash = (versions.find(([v]) => v === app.metadata?.properties?.current_version) || [])[1];
|
||||
@ -48,6 +49,8 @@ export default function ActionButton({ app, isIcon = false, ...props }: ActionBu
|
||||
|
||||
return (
|
||||
<>
|
||||
{/* if it's got a UI and it's updatable, show both buttons if we have space (launch will otherwise push out update) */}
|
||||
{permitMultiButton && installed && updatable && launchPath && <UpdateButton app={app} {...props} isIcon={isIcon} />}
|
||||
{(installed && launchPath)
|
||||
? <LaunchButton app={app} {...props} isIcon={isIcon} launchPath={launchPath} />
|
||||
: (installed && updatable)
|
||||
|
@ -140,7 +140,11 @@ export default function AppPage() {
|
||||
)
|
||||
)}
|
||||
</div>}
|
||||
<ActionButton app={app} className={classNames("self-center bg-orange text-lg px-12")} />
|
||||
<div className={classNames("flex-center gap-2", {
|
||||
'flex-col': isMobile,
|
||||
})}>
|
||||
<ActionButton app={app} className={classNames("self-center bg-orange text-lg px-12")} permitMultiButton />
|
||||
</div>
|
||||
{app.installed && app.state?.mirroring && (
|
||||
<button type="button" onClick={goToPublish}>
|
||||
Publish
|
||||
|
@ -123,6 +123,7 @@ fn init(our: Address) {
|
||||
Ok(()) => continue,
|
||||
Err(e) => println!("{e}"),
|
||||
}
|
||||
// checks for a request from a terminal script (different process, same package)
|
||||
} else if state.our.node == source.node && state.our.package() == source.package() {
|
||||
let Ok(action) = serde_json::from_slice::<TerminalAction>(&body) else {
|
||||
println!("failed to parse action from: {}", source);
|
||||
@ -234,6 +235,8 @@ fn handle_run(our: &Address, process: &ProcessId, args: String) -> anyhow::Resul
|
||||
})?)
|
||||
.send()?;
|
||||
}
|
||||
// inherits the blob from the previous request, `_bytes_response`,
|
||||
// containing the wasm byte code of the process
|
||||
Request::new()
|
||||
.target(("our", "kernel", "distro", "sys"))
|
||||
.body(serde_json::to_vec(&kt::KernelCommand::InitializeProcess {
|
||||
|
@ -27,8 +27,6 @@ pub fn _verify_auth_token(auth_token: &str, jwt_secret: &[u8]) -> Result<String,
|
||||
return Err(jwt::Error::Format);
|
||||
};
|
||||
|
||||
println!("hello\r");
|
||||
|
||||
let claims: Result<JwtClaims, jwt::Error> = auth_token.verify_with_key(&secret);
|
||||
|
||||
match claims {
|
||||
|
@ -789,7 +789,7 @@ async fn login_with_password(
|
||||
},
|
||||
};
|
||||
|
||||
register::assign_direct_routing(
|
||||
register::assign_routing(
|
||||
&mut our,
|
||||
kns_address,
|
||||
provider,
|
||||
|
@ -1,4 +1,4 @@
|
||||
use crate::net::types::{IdentityExt, NetData, Peer, TCP_PROTOCOL, WS_PROTOCOL};
|
||||
use crate::net::types::{IdentityExt, NetData, Peer};
|
||||
use crate::net::{tcp, utils, ws};
|
||||
use lib::types::core::{Identity, KernelMessage, NodeRouting};
|
||||
use rand::prelude::SliceRandom;
|
||||
@ -7,7 +7,6 @@ use tokio::sync::mpsc;
|
||||
/// if target is a peer, queue to be routed
|
||||
/// otherwise, create peer and initiate routing
|
||||
pub async fn send_to_peer(ext: &IdentityExt, data: &NetData, km: KernelMessage) {
|
||||
// println!("send_to_peer\r");
|
||||
if let Some(peer) = data.peers.get_mut(&km.target.node) {
|
||||
peer.sender.send(km).expect("net: peer sender was dropped");
|
||||
} else {
|
||||
@ -46,23 +45,22 @@ async fn connect_to_peer(
|
||||
peer_id: Identity,
|
||||
peer_rx: mpsc::UnboundedReceiver<KernelMessage>,
|
||||
) {
|
||||
println!("connect_to_peer\r");
|
||||
if peer_id.is_direct() {
|
||||
utils::print_debug(
|
||||
&ext.print_tx,
|
||||
&format!("net: attempting to connect to {} directly", peer_id.name),
|
||||
)
|
||||
.await;
|
||||
if let Some(port) = peer_id.get_protocol_port(TCP_PROTOCOL) {
|
||||
match tcp::init_direct(&ext, &data, &peer_id, port, false, peer_rx).await {
|
||||
if let Some((_ip, port)) = peer_id.tcp_routing() {
|
||||
match tcp::init_direct(&ext, &data, &peer_id, *port, false, peer_rx).await {
|
||||
Ok(()) => return,
|
||||
Err(peer_rx) => {
|
||||
return handle_failed_connection(&ext, &data, &peer_id, peer_rx).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(port) = peer_id.get_protocol_port(WS_PROTOCOL) {
|
||||
match ws::init_direct(&ext, &data, &peer_id, port, false, peer_rx).await {
|
||||
if let Some((_ip, port)) = peer_id.ws_routing() {
|
||||
match ws::init_direct(&ext, &data, &peer_id, *port, false, peer_rx).await {
|
||||
Ok(()) => return,
|
||||
Err(peer_rx) => {
|
||||
return handle_failed_connection(&ext, &data, &peer_id, peer_rx).await;
|
||||
@ -81,7 +79,6 @@ async fn connect_via_router(
|
||||
peer_id: &Identity,
|
||||
mut peer_rx: mpsc::UnboundedReceiver<KernelMessage>,
|
||||
) {
|
||||
println!("connect_via_router\r");
|
||||
let routers_shuffled = {
|
||||
let mut routers = match peer_id.routing {
|
||||
NodeRouting::Routers(ref routers) => routers.clone(),
|
||||
@ -103,8 +100,8 @@ async fn connect_via_router(
|
||||
None => continue,
|
||||
Some(id) => id.clone(),
|
||||
};
|
||||
if let Some(port) = router_id.get_protocol_port(TCP_PROTOCOL) {
|
||||
match tcp::init_routed(ext, data, &peer_id, &router_id, port, peer_rx).await {
|
||||
if let Some((_ip, port)) = router_id.tcp_routing() {
|
||||
match tcp::init_routed(ext, data, &peer_id, &router_id, *port, peer_rx).await {
|
||||
Ok(()) => return,
|
||||
Err(e) => {
|
||||
peer_rx = e;
|
||||
@ -112,8 +109,8 @@ async fn connect_via_router(
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(port) = router_id.get_protocol_port(WS_PROTOCOL) {
|
||||
match ws::init_routed(ext, data, &peer_id, &router_id, port, peer_rx).await {
|
||||
if let Some((_ip, port)) = router_id.ws_routing() {
|
||||
match ws::init_routed(ext, data, &peer_id, &router_id, *port, peer_rx).await {
|
||||
Ok(()) => return,
|
||||
Err(e) => {
|
||||
peer_rx = e;
|
||||
@ -131,7 +128,6 @@ pub async fn handle_failed_connection(
|
||||
peer_id: &Identity,
|
||||
mut peer_rx: mpsc::UnboundedReceiver<KernelMessage>,
|
||||
) {
|
||||
println!("handle_failed_connection\r");
|
||||
utils::print_debug(
|
||||
&ext.print_tx,
|
||||
&format!("net: failed to connect to {}", peer_id.name),
|
||||
|
@ -1,10 +1,9 @@
|
||||
use crate::net::types::{IdentityExt, NetData, Peer, TCP_PROTOCOL, WS_PROTOCOL};
|
||||
use crate::net::types::{IdentityExt, NetData, Peer};
|
||||
use crate::net::{connect, tcp, utils, ws};
|
||||
use lib::types::core::{Identity, NodeRouting};
|
||||
use tokio::{sync::mpsc, time};
|
||||
|
||||
pub async fn maintain_routers(ext: IdentityExt, data: NetData) -> anyhow::Result<()> {
|
||||
println!("maintain_routers\r");
|
||||
let NodeRouting::Routers(ref routers) = ext.our.routing else {
|
||||
return Err(anyhow::anyhow!("net: no routers to maintain"));
|
||||
};
|
||||
@ -25,7 +24,6 @@ pub async fn maintain_routers(ext: IdentityExt, data: NetData) -> anyhow::Result
|
||||
}
|
||||
|
||||
pub async fn connect_to_router(router_id: &Identity, ext: &IdentityExt, data: &NetData) {
|
||||
println!("connect_to_router\r");
|
||||
utils::print_debug(
|
||||
&ext.print_tx,
|
||||
&format!("net: attempting to connect to router {}", router_id.name),
|
||||
@ -40,16 +38,16 @@ pub async fn connect_to_router(router_id: &Identity, ext: &IdentityExt, data: &N
|
||||
sender: peer_tx.clone(),
|
||||
},
|
||||
);
|
||||
if let Some(port) = router_id.get_protocol_port(TCP_PROTOCOL) {
|
||||
match tcp::init_direct(ext, data, &router_id, port, true, peer_rx).await {
|
||||
if let Some((_ip, port)) = router_id.tcp_routing() {
|
||||
match tcp::init_direct(ext, data, &router_id, *port, true, peer_rx).await {
|
||||
Ok(()) => return,
|
||||
Err(peer_rx) => {
|
||||
return connect::handle_failed_connection(ext, data, router_id, peer_rx).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(port) = router_id.get_protocol_port(WS_PROTOCOL) {
|
||||
match ws::init_direct(ext, data, &router_id, port, true, peer_rx).await {
|
||||
if let Some((_ip, port)) = router_id.ws_routing() {
|
||||
match ws::init_direct(ext, data, &router_id, *port, true, peer_rx).await {
|
||||
Ok(()) => return,
|
||||
Err(peer_rx) => {
|
||||
return connect::handle_failed_connection(ext, data, router_id, peer_rx).await;
|
||||
|
@ -34,7 +34,6 @@ pub async fn networking(
|
||||
kernel_message_rx: MessageReceiver,
|
||||
_reveal_ip: bool, // only used if indirect
|
||||
) -> anyhow::Result<()> {
|
||||
println!("networking\r\n");
|
||||
let ext = IdentityExt {
|
||||
our: Arc::new(our),
|
||||
our_ip: Arc::new(our_ip),
|
||||
@ -116,7 +115,6 @@ async fn local_recv(
|
||||
mut kernel_message_rx: MessageReceiver,
|
||||
data: NetData,
|
||||
) -> anyhow::Result<()> {
|
||||
println!("local_recv\r\n");
|
||||
while let Some(km) = kernel_message_rx.recv().await {
|
||||
if km.target.node == ext.our.name {
|
||||
// handle messages sent to us
|
||||
@ -306,7 +304,6 @@ async fn handle_remote_request(
|
||||
request_body: &[u8],
|
||||
data: &NetData,
|
||||
) -> anyhow::Result<()> {
|
||||
println!("handle_remote_request\r");
|
||||
match rmp_serde::from_slice::<NetAction>(request_body) {
|
||||
Ok(NetAction::KnsBatchUpdate(_)) | Ok(NetAction::KnsUpdate(_)) => {
|
||||
// for now, we don't get these from remote, only locally.
|
||||
|
@ -21,7 +21,6 @@ pub struct PeerConnection {
|
||||
}
|
||||
|
||||
pub async fn receiver(ext: IdentityExt, data: NetData) -> anyhow::Result<()> {
|
||||
println!("tcp_receiver\r");
|
||||
let tcp_port = ext.our.get_protocol_port(TCP_PROTOCOL).unwrap();
|
||||
let tcp = match TcpListener::bind(format!("0.0.0.0:{tcp_port}")).await {
|
||||
Ok(tcp) => tcp,
|
||||
@ -83,7 +82,6 @@ pub async fn init_direct(
|
||||
proxy_request: bool,
|
||||
peer_rx: mpsc::UnboundedReceiver<KernelMessage>,
|
||||
) -> Result<(), mpsc::UnboundedReceiver<KernelMessage>> {
|
||||
println!("tcp_init_direct\r");
|
||||
match time::timeout(
|
||||
TIMEOUT,
|
||||
connect_with_handshake(ext, peer_id, port, None, proxy_request),
|
||||
@ -114,7 +112,6 @@ pub async fn init_routed(
|
||||
router_port: u16,
|
||||
peer_rx: mpsc::UnboundedReceiver<KernelMessage>,
|
||||
) -> Result<(), mpsc::UnboundedReceiver<KernelMessage>> {
|
||||
println!("tcp_init_routed\r");
|
||||
match time::timeout(
|
||||
TIMEOUT,
|
||||
connect_with_handshake(ext, peer_id, router_port, Some(router_id), false),
|
||||
@ -149,7 +146,6 @@ async fn recv_connection(
|
||||
data: NetData,
|
||||
mut stream: TcpStream,
|
||||
) -> anyhow::Result<()> {
|
||||
println!("tcp_recv_connection\r");
|
||||
// before we begin XX handshake pattern, check first message over socket
|
||||
let (len, first_message) = utils::recv_raw(&mut stream).await?;
|
||||
|
||||
@ -236,7 +232,6 @@ async fn connect_with_handshake(
|
||||
use_router: Option<&Identity>,
|
||||
proxy_request: bool,
|
||||
) -> anyhow::Result<PeerConnection> {
|
||||
println!("tcp_connect_with_handshake\r");
|
||||
let ip = match use_router {
|
||||
None => peer_id
|
||||
.get_ip()
|
||||
@ -316,7 +311,6 @@ pub async fn recv_via_router(
|
||||
peer_id: Identity,
|
||||
router_id: Identity,
|
||||
) {
|
||||
println!("tcp_recv_via_router\r");
|
||||
let Some((ip, port)) = router_id.tcp_routing() else {
|
||||
return;
|
||||
};
|
||||
@ -359,7 +353,6 @@ async fn connect_with_handshake_via_router(
|
||||
router_id: &Identity,
|
||||
mut stream: TcpStream,
|
||||
) -> anyhow::Result<PeerConnection> {
|
||||
println!("tcp_connect_with_handshake_via_router\r");
|
||||
// before beginning XX handshake pattern, send a routing request
|
||||
utils::send_raw(
|
||||
&mut stream,
|
||||
|
@ -6,7 +6,7 @@ use crate::net::{
|
||||
use lib::types::core::{KernelMessage, MessageSender, NodeId, PrintSender};
|
||||
use {
|
||||
tokio::io::{AsyncReadExt, AsyncWriteExt},
|
||||
tokio::net::TcpStream,
|
||||
tokio::net::{tcp::OwnedReadHalf, tcp::OwnedWriteHalf, TcpStream},
|
||||
tokio::sync::mpsc::UnboundedReceiver,
|
||||
};
|
||||
|
||||
@ -19,64 +19,83 @@ pub async fn maintain_connection(
|
||||
kernel_message_tx: MessageSender,
|
||||
print_tx: PrintSender,
|
||||
) {
|
||||
println!("tcp_maintain_connection\r");
|
||||
let mut last_message = std::time::Instant::now();
|
||||
loop {
|
||||
tokio::select! {
|
||||
recv_result = recv_protocol_message(&mut conn) => {
|
||||
match recv_result {
|
||||
Ok(km) => {
|
||||
if km.source.node != peer_name {
|
||||
print_loud(
|
||||
&print_tx,
|
||||
&format!(
|
||||
"net: got message with spoofed source from {peer_name}!"
|
||||
),
|
||||
).await;
|
||||
break
|
||||
} else {
|
||||
kernel_message_tx.send(km).await.expect("net: fatal: kernel receiver died");
|
||||
continue
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
print_debug(&print_tx, &format!("net: error receiving message: {e}")).await;
|
||||
break
|
||||
let sock_ref = socket2::SockRef::from(&conn.stream);
|
||||
let mut ka = socket2::TcpKeepalive::new();
|
||||
ka = ka.with_time(std::time::Duration::from_secs(30));
|
||||
ka = ka.with_interval(std::time::Duration::from_secs(30));
|
||||
sock_ref
|
||||
.set_tcp_keepalive(&ka)
|
||||
.expect("failed to set tcp keepalive");
|
||||
|
||||
let (mut read_stream, mut write_stream) = conn.stream.into_split();
|
||||
let initiator = conn.noise.is_initiator();
|
||||
let snow::CipherStates(c1, c2) = conn.noise.extract_cipherstates();
|
||||
let (mut our_cipher, mut their_cipher) = if initiator {
|
||||
// if initiator, we write with first and read with second
|
||||
(c1, c2)
|
||||
} else {
|
||||
// if responder, we read with first and write with second
|
||||
(c2, c1)
|
||||
};
|
||||
|
||||
let write_buf = &mut [0; 65536];
|
||||
let write = async move {
|
||||
while let Some(km) = peer_rx.recv().await {
|
||||
let Ok(()) =
|
||||
send_protocol_message(&km, &mut our_cipher, write_buf, &mut write_stream).await
|
||||
else {
|
||||
break;
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
let read_buf = &mut conn.buf;
|
||||
let read_peer_name = peer_name.clone();
|
||||
let read_print_tx = print_tx.clone();
|
||||
let read = async move {
|
||||
loop {
|
||||
match recv_protocol_message(&mut their_cipher, read_buf, &mut read_stream).await {
|
||||
Ok(km) => {
|
||||
if km.source.node != read_peer_name {
|
||||
print_loud(
|
||||
&read_print_tx,
|
||||
&format!("net: got message with spoofed source from {read_peer_name}!"),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
} else {
|
||||
kernel_message_tx
|
||||
.send(km)
|
||||
.await
|
||||
.expect("net: fatal: kernel receiver died");
|
||||
}
|
||||
}
|
||||
},
|
||||
maybe_recv = peer_rx.recv() => {
|
||||
match maybe_recv {
|
||||
Some(km) => {
|
||||
match send_protocol_message(&km, &mut conn).await {
|
||||
Ok(()) => {
|
||||
last_message = std::time::Instant::now();
|
||||
continue
|
||||
}
|
||||
Err(e) => {
|
||||
print_debug(&print_tx, &format!("net: error sending message: {e}")).await;
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
None => break
|
||||
}
|
||||
},
|
||||
// if a message has not been sent or received in 2 hours, close the connection
|
||||
_ = tokio::time::sleep(std::time::Duration::from_secs(7200)) => {
|
||||
if last_message.elapsed().as_secs() > 7200 {
|
||||
break
|
||||
Err(e) => {
|
||||
print_debug(
|
||||
&read_print_tx,
|
||||
&format!("net: error receiving message: {e}"),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
_ = write => (),
|
||||
_ = read => (),
|
||||
}
|
||||
let _ = conn.stream.shutdown().await;
|
||||
|
||||
print_debug(&print_tx, &format!("net: connection lost with {peer_name}")).await;
|
||||
peers.remove(&peer_name);
|
||||
}
|
||||
|
||||
pub async fn send_protocol_message(
|
||||
async fn send_protocol_message(
|
||||
km: &KernelMessage,
|
||||
conn: &mut PeerConnection,
|
||||
cipher: &mut snow::CipherState,
|
||||
buf: &mut [u8],
|
||||
stream: &mut OwnedWriteHalf,
|
||||
) -> anyhow::Result<()> {
|
||||
let serialized = rmp_serde::to_vec(km)?;
|
||||
if serialized.len() > MESSAGE_MAX_SIZE as usize {
|
||||
@ -84,35 +103,35 @@ pub async fn send_protocol_message(
|
||||
}
|
||||
|
||||
let outer_len = (serialized.len() as u32).to_be_bytes();
|
||||
conn.stream.write_all(&outer_len).await?;
|
||||
stream.write_all(&outer_len).await?;
|
||||
|
||||
// 65519 = 65535 - 16 (TAGLEN)
|
||||
for payload in serialized.chunks(65519) {
|
||||
let len = conn.noise.write_message(payload, &mut conn.buf)? as u16;
|
||||
conn.stream.write_all(&len.to_be_bytes()).await?;
|
||||
conn.stream.write_all(&conn.buf[..len as usize]).await?;
|
||||
let len = cipher.encrypt(payload, buf)? as u16;
|
||||
stream.write_all(&len.to_be_bytes()).await?;
|
||||
stream.write_all(&buf[..len as usize]).await?;
|
||||
}
|
||||
Ok(conn.stream.flush().await?)
|
||||
Ok(stream.flush().await?)
|
||||
}
|
||||
|
||||
/// any error in receiving a message will result in the connection being closed.
|
||||
pub async fn recv_protocol_message(conn: &mut PeerConnection) -> anyhow::Result<KernelMessage> {
|
||||
let mut outer_len = [0; 4];
|
||||
conn.stream.read_exact(&mut outer_len).await?;
|
||||
let outer_len = u32::from_be_bytes(outer_len);
|
||||
async fn recv_protocol_message(
|
||||
cipher: &mut snow::CipherState,
|
||||
buf: &mut [u8],
|
||||
stream: &mut OwnedReadHalf,
|
||||
) -> anyhow::Result<KernelMessage> {
|
||||
stream.read_exact(&mut buf[..4]).await?;
|
||||
let outer_len = u32::from_be_bytes(buf[..4].try_into().unwrap()) as usize;
|
||||
|
||||
let mut msg = vec![0; outer_len as usize];
|
||||
let mut msg = vec![0; outer_len];
|
||||
let mut ptr = 0;
|
||||
while ptr < outer_len as usize {
|
||||
while ptr < outer_len {
|
||||
let mut inner_len = [0; 2];
|
||||
conn.stream.read_exact(&mut inner_len).await?;
|
||||
stream.read_exact(&mut inner_len).await?;
|
||||
let inner_len = u16::from_be_bytes(inner_len);
|
||||
conn.stream
|
||||
.read_exact(&mut conn.buf[..inner_len as usize])
|
||||
.await?;
|
||||
let read_len = conn
|
||||
.noise
|
||||
.read_message(&conn.buf[..inner_len as usize], &mut msg[ptr..])?;
|
||||
|
||||
stream.read_exact(&mut buf[..inner_len as usize]).await?;
|
||||
let read_len = cipher.decrypt(&buf[..inner_len as usize], &mut msg[ptr..])?;
|
||||
ptr += read_len;
|
||||
}
|
||||
Ok(rmp_serde::from_slice(&msg)?)
|
||||
@ -126,7 +145,6 @@ pub async fn send_protocol_handshake(
|
||||
stream: &mut TcpStream,
|
||||
proxy_request: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
println!("tcp_send_protocol_handshake\r");
|
||||
let our_hs = rmp_serde::to_vec(&HandshakePayload {
|
||||
protocol_version: 1,
|
||||
name: ext.our.name.clone(),
|
||||
@ -147,10 +165,10 @@ pub async fn recv_protocol_handshake(
|
||||
buf: &mut [u8],
|
||||
stream: &mut TcpStream,
|
||||
) -> anyhow::Result<HandshakePayload> {
|
||||
println!("tcp_recv_protocol_handshake\r");
|
||||
let mut len = [0; 2];
|
||||
stream.read_exact(&mut len).await?;
|
||||
let msg_len = u16::from_be_bytes(len);
|
||||
|
||||
let mut msg = vec![0; msg_len as usize];
|
||||
stream.read_exact(&mut msg).await?;
|
||||
|
||||
@ -158,7 +176,7 @@ pub async fn recv_protocol_handshake(
|
||||
Ok(rmp_serde::from_slice(&buf[..len])?)
|
||||
}
|
||||
|
||||
/// make sure message is less than 65536 bytes
|
||||
/// make sure raw message is less than 65536 bytes
|
||||
pub async fn send_raw(stream: &mut TcpStream, msg: &[u8]) -> anyhow::Result<()> {
|
||||
let len = (msg.len() as u16).to_be_bytes();
|
||||
stream.write_all(&len).await?;
|
||||
@ -166,7 +184,7 @@ pub async fn send_raw(stream: &mut TcpStream, msg: &[u8]) -> anyhow::Result<()>
|
||||
Ok(stream.flush().await?)
|
||||
}
|
||||
|
||||
/// make sure message is less than 65536 bytes
|
||||
/// make sure raw message is less than 65536 bytes
|
||||
pub async fn recv_raw(stream: &mut TcpStream) -> anyhow::Result<(u16, Vec<u8>)> {
|
||||
let mut len = [0; 2];
|
||||
stream.read_exact(&mut len).await?;
|
||||
|
@ -64,6 +64,15 @@ pub enum PendingStream {
|
||||
Tcp(TcpStream),
|
||||
}
|
||||
|
||||
impl PendingStream {
|
||||
pub fn is_ws(&self) -> bool {
|
||||
matches!(self, PendingStream::WebSocket(_))
|
||||
}
|
||||
pub fn is_tcp(&self) -> bool {
|
||||
matches!(self, PendingStream::Tcp(_))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Peer {
|
||||
pub identity: Identity,
|
||||
|
@ -11,7 +11,6 @@ use {
|
||||
futures::{SinkExt, StreamExt},
|
||||
ring::signature::{self},
|
||||
snow::params::NoiseParams,
|
||||
tokio::io::AsyncWriteExt,
|
||||
tokio::time,
|
||||
tokio_tungstenite::connect_async,
|
||||
};
|
||||
@ -37,7 +36,6 @@ pub async fn create_passthrough(
|
||||
pending_passthroughs: &PendingPassthroughs,
|
||||
socket_1: PendingStream,
|
||||
) -> anyhow::Result<()> {
|
||||
println!("create_passthrough\r");
|
||||
// if the target has already generated a pending passthrough for this source,
|
||||
// immediately match them
|
||||
if let Some(((_target, _from), pending_stream)) =
|
||||
@ -46,36 +44,39 @@ pub async fn create_passthrough(
|
||||
tokio::spawn(maintain_passthrough(socket_1, pending_stream));
|
||||
return Ok(());
|
||||
}
|
||||
if let Some((ip, tcp_port)) = target_id.tcp_routing() {
|
||||
// create passthrough to direct node over tcp
|
||||
let tcp_url = make_conn_url(our_ip, ip, tcp_port, TCP_PROTOCOL)?;
|
||||
let Ok(Ok(stream_2)) =
|
||||
time::timeout(TIMEOUT, tokio::net::TcpStream::connect(tcp_url.to_string())).await
|
||||
else {
|
||||
return Err(anyhow::anyhow!("failed to connect to target"));
|
||||
};
|
||||
tokio::spawn(maintain_passthrough(socket_1, PendingStream::Tcp(stream_2)));
|
||||
return Ok(());
|
||||
}
|
||||
if let Some((ip, ws_port)) = target_id.ws_routing() {
|
||||
// create passthrough to direct node over websocket
|
||||
let ws_url = make_conn_url(our_ip, ip, ws_port, WS_PROTOCOL)?;
|
||||
let Ok(Ok((socket_2, _response))) = time::timeout(TIMEOUT, connect_async(ws_url)).await
|
||||
else {
|
||||
return Err(anyhow::anyhow!("failed to connect to target"));
|
||||
};
|
||||
tokio::spawn(maintain_passthrough(
|
||||
socket_1,
|
||||
PendingStream::WebSocket(socket_2),
|
||||
));
|
||||
return Ok(());
|
||||
if socket_1.is_tcp() {
|
||||
if let Some((ip, tcp_port)) = target_id.tcp_routing() {
|
||||
// create passthrough to direct node over tcp
|
||||
let tcp_url = make_conn_url(our_ip, ip, tcp_port, TCP_PROTOCOL)?;
|
||||
let Ok(Ok(stream_2)) =
|
||||
time::timeout(TIMEOUT, tokio::net::TcpStream::connect(tcp_url.to_string())).await
|
||||
else {
|
||||
return Err(anyhow::anyhow!("failed to connect to target"));
|
||||
};
|
||||
tokio::spawn(maintain_passthrough(socket_1, PendingStream::Tcp(stream_2)));
|
||||
return Ok(());
|
||||
}
|
||||
} else if socket_1.is_ws() {
|
||||
if let Some((ip, ws_port)) = target_id.ws_routing() {
|
||||
// create passthrough to direct node over websocket
|
||||
let ws_url = make_conn_url(our_ip, ip, ws_port, WS_PROTOCOL)?;
|
||||
let Ok(Ok((socket_2, _response))) = time::timeout(TIMEOUT, connect_async(ws_url)).await
|
||||
else {
|
||||
return Err(anyhow::anyhow!("failed to connect to target"));
|
||||
};
|
||||
tokio::spawn(maintain_passthrough(
|
||||
socket_1,
|
||||
PendingStream::WebSocket(socket_2),
|
||||
));
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
// create passthrough to indirect node that we do routing for
|
||||
let target_peer = peers
|
||||
.get(&target_id.name)
|
||||
.ok_or(anyhow::anyhow!("can't route to that indirect node"))?;
|
||||
.ok_or(anyhow::anyhow!("can't route to that node"))?;
|
||||
if !target_peer.routing_for {
|
||||
return Err(anyhow::anyhow!("we don't route for that indirect node"));
|
||||
return Err(anyhow::anyhow!("we don't route for that node"));
|
||||
}
|
||||
// send their net:distro:sys process a message, notifying it to create a *matching*
|
||||
// passthrough request, which we can pair with this pending one.
|
||||
@ -109,13 +110,11 @@ pub async fn create_passthrough(
|
||||
|
||||
/// cross the streams -- spawn on own task
|
||||
pub async fn maintain_passthrough(socket_1: PendingStream, socket_2: PendingStream) {
|
||||
println!("maintain_passthrough\r");
|
||||
use tokio::io::copy;
|
||||
// copy from ws_socket to tcp_socket and vice versa
|
||||
// do not use bidirectional because if one side closes,
|
||||
// we want to close the entire passthrough
|
||||
match (socket_1, socket_2) {
|
||||
(PendingStream::Tcp(socket_1), PendingStream::Tcp(socket_2)) => {
|
||||
// do not use bidirectional because if one side closes,
|
||||
// we want to close the entire passthrough
|
||||
use tokio::io::copy;
|
||||
let (mut r1, mut w1) = tokio::io::split(socket_1);
|
||||
let (mut r2, mut w2) = tokio::io::split(socket_2);
|
||||
tokio::select! {
|
||||
@ -123,44 +122,6 @@ pub async fn maintain_passthrough(socket_1: PendingStream, socket_2: PendingStre
|
||||
_ = copy(&mut r2, &mut w1) => {},
|
||||
}
|
||||
}
|
||||
(PendingStream::WebSocket(mut ws_socket), PendingStream::Tcp(mut tcp_socket))
|
||||
| (PendingStream::Tcp(mut tcp_socket), PendingStream::WebSocket(mut ws_socket)) => {
|
||||
let mut last_message = std::time::Instant::now();
|
||||
loop {
|
||||
tokio::select! {
|
||||
maybe_recv = ws_socket.next() => {
|
||||
match maybe_recv {
|
||||
Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(bin))) => {
|
||||
let Ok(()) = tcp_socket.write_all(&bin).await else {
|
||||
break
|
||||
};
|
||||
last_message = std::time::Instant::now();
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
},
|
||||
maybe_recv = crate::net::tcp::utils::recv_raw(&mut tcp_socket) => {
|
||||
match maybe_recv {
|
||||
Ok((_len, bin)) => {
|
||||
let Ok(()) = ws_socket.send(tokio_tungstenite::tungstenite::Message::Binary(bin)).await else {
|
||||
break
|
||||
};
|
||||
last_message = std::time::Instant::now();
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
},
|
||||
// if a message has not been sent or received in 2-4 hours, close the connection
|
||||
_ = tokio::time::sleep(std::time::Duration::from_secs(7200)) => {
|
||||
if last_message.elapsed().as_secs() > 7200 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let _ = ws_socket.close(None).await;
|
||||
let _ = tcp_socket.shutdown().await;
|
||||
}
|
||||
(PendingStream::WebSocket(mut socket_1), PendingStream::WebSocket(mut socket_2)) => {
|
||||
let mut last_message = std::time::Instant::now();
|
||||
loop {
|
||||
@ -198,6 +159,10 @@ pub async fn maintain_passthrough(socket_1: PendingStream, socket_2: PendingStre
|
||||
let _ = socket_1.close(None).await;
|
||||
let _ = socket_2.close(None).await;
|
||||
}
|
||||
_ => {
|
||||
// these foolish combinations must never occur
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -237,7 +202,6 @@ pub fn validate_routing_request(
|
||||
buf: &[u8],
|
||||
pki: &OnchainPKI,
|
||||
) -> anyhow::Result<(Identity, Identity)> {
|
||||
println!("validate_routing_request\r");
|
||||
let routing_request: RoutingRequest = rmp_serde::from_slice(buf)?;
|
||||
let from_id = pki
|
||||
.get(&routing_request.source)
|
||||
@ -288,6 +252,7 @@ pub fn build_responder() -> (snow::HandshakeState, Vec<u8>) {
|
||||
(
|
||||
builder
|
||||
.local_private_key(&keypair.private)
|
||||
.unwrap()
|
||||
.build_responder()
|
||||
.expect("net: couldn't build responder?"),
|
||||
keypair.public,
|
||||
@ -302,6 +267,7 @@ pub fn build_initiator() -> (snow::HandshakeState, Vec<u8>) {
|
||||
(
|
||||
builder
|
||||
.local_private_key(&keypair.private)
|
||||
.unwrap()
|
||||
.build_initiator()
|
||||
.expect("net: couldn't build initiator?"),
|
||||
keypair.public,
|
||||
|
@ -27,7 +27,6 @@ pub struct PeerConnection {
|
||||
pub type WebSocket = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
|
||||
|
||||
pub async fn receiver(ext: IdentityExt, data: NetData) -> Result<()> {
|
||||
println!("receiver\r");
|
||||
let ws_port = ext.our.get_protocol_port(WS_PROTOCOL).unwrap();
|
||||
let ws = match TcpListener::bind(format!("0.0.0.0:{ws_port}")).await {
|
||||
Ok(ws) => ws,
|
||||
@ -96,7 +95,6 @@ pub async fn init_direct(
|
||||
proxy_request: bool,
|
||||
peer_rx: mpsc::UnboundedReceiver<KernelMessage>,
|
||||
) -> Result<(), mpsc::UnboundedReceiver<KernelMessage>> {
|
||||
println!("init_direct\r");
|
||||
match time::timeout(
|
||||
TIMEOUT,
|
||||
connect_with_handshake(ext, peer_id, port, None, proxy_request),
|
||||
@ -127,7 +125,6 @@ pub async fn init_routed(
|
||||
router_port: u16,
|
||||
peer_rx: mpsc::UnboundedReceiver<KernelMessage>,
|
||||
) -> Result<(), mpsc::UnboundedReceiver<KernelMessage>> {
|
||||
println!("init_routed\r");
|
||||
match time::timeout(
|
||||
TIMEOUT,
|
||||
connect_with_handshake(ext, peer_id, router_port, Some(router_id), false),
|
||||
@ -165,7 +162,6 @@ pub async fn recv_via_router(
|
||||
peer_id: Identity,
|
||||
router_id: Identity,
|
||||
) {
|
||||
println!("recv_via_router\r");
|
||||
let Some((ip, port)) = router_id.ws_routing() else {
|
||||
return;
|
||||
};
|
||||
@ -207,7 +203,6 @@ async fn recv_connection(
|
||||
data: NetData,
|
||||
mut socket: WebSocket,
|
||||
) -> anyhow::Result<()> {
|
||||
println!("recv_connection\r");
|
||||
// before we begin XX handshake pattern, check first message over socket
|
||||
let first_message = &utils::recv(&mut socket).await?;
|
||||
|
||||
@ -294,7 +289,6 @@ async fn connect_with_handshake(
|
||||
use_router: Option<&Identity>,
|
||||
proxy_request: bool,
|
||||
) -> anyhow::Result<PeerConnection> {
|
||||
println!("connect_with_handshake\r");
|
||||
let mut buf = vec![0u8; 65535];
|
||||
let (mut noise, our_static_key) = build_initiator();
|
||||
|
||||
@ -376,7 +370,6 @@ async fn connect_with_handshake_via_router(
|
||||
router_id: &Identity,
|
||||
mut socket: WebSocketStream<MaybeTlsStream<TcpStream>>,
|
||||
) -> anyhow::Result<PeerConnection> {
|
||||
println!("connect_with_handshake_via_router\r");
|
||||
// before beginning XX handshake pattern, send a routing request
|
||||
socket
|
||||
.send(tungstenite::Message::binary(rmp_serde::to_vec(
|
||||
|
@ -10,6 +10,9 @@ use {
|
||||
tokio_tungstenite::tungstenite,
|
||||
};
|
||||
|
||||
type WsWriteHalf = futures::stream::SplitSink<WebSocket, tungstenite::Message>;
|
||||
type WsReadHalf = futures::stream::SplitStream<WebSocket>;
|
||||
|
||||
/// should always be spawned on its own task
|
||||
pub async fn maintain_connection(
|
||||
peer_name: NodeId,
|
||||
@ -19,85 +22,101 @@ pub async fn maintain_connection(
|
||||
kernel_message_tx: MessageSender,
|
||||
print_tx: PrintSender,
|
||||
) {
|
||||
println!("maintain_connection\r");
|
||||
let mut last_message = std::time::Instant::now();
|
||||
loop {
|
||||
tokio::select! {
|
||||
recv_result = recv_protocol_message(&mut conn) => {
|
||||
match recv_result {
|
||||
Ok(km) => {
|
||||
if km.source.node != peer_name {
|
||||
print_loud(
|
||||
&print_tx,
|
||||
&format!(
|
||||
"net: got message with spoofed source from {peer_name}!"
|
||||
),
|
||||
).await;
|
||||
break
|
||||
} else {
|
||||
kernel_message_tx.send(km).await.expect("net error: fatal: kernel receiver died");
|
||||
last_message = std::time::Instant::now();
|
||||
continue
|
||||
}
|
||||
let (mut write_stream, mut read_stream) = conn.socket.split();
|
||||
let initiator = conn.noise.is_initiator();
|
||||
let snow::CipherStates(c1, c2) = conn.noise.extract_cipherstates();
|
||||
let (mut our_cipher, mut their_cipher) = if initiator {
|
||||
// if initiator, we write with first and read with second
|
||||
(c1, c2)
|
||||
} else {
|
||||
// if responder, we read with first and write with second
|
||||
(c2, c1)
|
||||
};
|
||||
|
||||
}
|
||||
Err(_) => break
|
||||
}
|
||||
},
|
||||
maybe_recv = peer_rx.recv() => {
|
||||
match maybe_recv {
|
||||
Some(km) => {
|
||||
match send_protocol_message(&km, &mut conn).await {
|
||||
Ok(()) => {
|
||||
last_message = std::time::Instant::now();
|
||||
continue
|
||||
}
|
||||
Err(e) => {
|
||||
if e.to_string() == "message too large" {
|
||||
// this will result in a Timeout if the message
|
||||
// requested a response, otherwise nothing. so,
|
||||
// we should always print something to terminal
|
||||
print_loud(
|
||||
&print_tx,
|
||||
&format!(
|
||||
"net: tried to send too-large message, limit is {:.2}mb",
|
||||
MESSAGE_MAX_SIZE as f64 / 1_048_576.0
|
||||
),
|
||||
).await;
|
||||
}
|
||||
break
|
||||
}
|
||||
let write_buf = &mut [0; 65536];
|
||||
let write_print_tx = print_tx.clone();
|
||||
let write = async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(km) = peer_rx.recv() => {
|
||||
if let Err(e) =
|
||||
send_protocol_message(&km, &mut our_cipher, write_buf, &mut write_stream).await
|
||||
{
|
||||
if e.to_string() == "message too large" {
|
||||
// this will result in a Timeout if the message
|
||||
// requested a response, otherwise nothing. so,
|
||||
// we should always print something to terminal
|
||||
print_loud(
|
||||
&write_print_tx,
|
||||
&format!(
|
||||
"net: tried to send too-large message, limit is {:.2}mb",
|
||||
MESSAGE_MAX_SIZE as f64 / 1_048_576.0
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
break;
|
||||
}
|
||||
None => break
|
||||
}
|
||||
},
|
||||
// keepalive ping -- can adjust time based on testing
|
||||
_ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
|
||||
match conn.socket.send(tungstenite::Message::Ping(vec![])).await {
|
||||
Ok(()) => continue,
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
// if a message has not been sent or received in 2 hours, close the connection
|
||||
_ = tokio::time::sleep(std::time::Duration::from_secs(7200)) => {
|
||||
if last_message.elapsed().as_secs() > 7200 {
|
||||
break
|
||||
// keepalive ping -- note that we don't look for pongs
|
||||
// just to close if the connection is truly dead
|
||||
_ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
|
||||
match write_stream.send(tungstenite::Message::Ping(vec![])).await {
|
||||
Ok(()) => continue,
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let close_msg = match conn.socket.close(None).await {
|
||||
Ok(()) => format!("net: connection with {peer_name} closed"),
|
||||
Err(e) => format!("net: connection with {peer_name} closed: {e}"),
|
||||
};
|
||||
print_debug(&print_tx, &close_msg).await;
|
||||
|
||||
let read_buf = &mut conn.buf;
|
||||
let read_peer_name = peer_name.clone();
|
||||
let read_print_tx = print_tx.clone();
|
||||
let read = async move {
|
||||
loop {
|
||||
match recv_protocol_message(&mut their_cipher, read_buf, &mut read_stream).await {
|
||||
Ok(km) => {
|
||||
if km.source.node != read_peer_name {
|
||||
print_loud(
|
||||
&read_print_tx,
|
||||
&format!("net: got message with spoofed source from {read_peer_name}!"),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
} else {
|
||||
kernel_message_tx
|
||||
.send(km)
|
||||
.await
|
||||
.expect("net: fatal: kernel receiver died");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
print_debug(
|
||||
&read_print_tx,
|
||||
&format!("net: error receiving message: {e}"),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
_ = write => (),
|
||||
_ = read => (),
|
||||
}
|
||||
|
||||
print_debug(&print_tx, &format!("net: connection lost with {peer_name}")).await;
|
||||
peers.remove(&peer_name);
|
||||
}
|
||||
|
||||
async fn send_protocol_message(
|
||||
km: &KernelMessage,
|
||||
conn: &mut PeerConnection,
|
||||
cipher: &mut snow::CipherState,
|
||||
buf: &mut [u8],
|
||||
stream: &mut WsWriteHalf,
|
||||
) -> anyhow::Result<()> {
|
||||
let serialized = rmp_serde::to_vec(km)?;
|
||||
if serialized.len() > MESSAGE_MAX_SIZE as usize {
|
||||
@ -109,25 +128,27 @@ async fn send_protocol_message(
|
||||
|
||||
// 65519 = 65535 - 16 (TAGLEN)
|
||||
for payload in with_length_prefix.chunks(65519) {
|
||||
let len = conn.noise.write_message(payload, &mut conn.buf)?;
|
||||
conn.socket
|
||||
.feed(tungstenite::Message::binary(&conn.buf[..len]))
|
||||
let len = cipher.encrypt(payload, buf)?;
|
||||
stream
|
||||
.feed(tungstenite::Message::binary(&buf[..len]))
|
||||
.await?;
|
||||
}
|
||||
conn.socket.flush().await?;
|
||||
stream.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// any error in receiving a message will result in the connection being closed.
|
||||
async fn recv_protocol_message(conn: &mut PeerConnection) -> anyhow::Result<KernelMessage> {
|
||||
let outer_len = conn
|
||||
.noise
|
||||
.read_message(&recv(&mut conn.socket).await?, &mut conn.buf)?;
|
||||
async fn recv_protocol_message(
|
||||
cipher: &mut snow::CipherState,
|
||||
buf: &mut [u8],
|
||||
stream: &mut WsReadHalf,
|
||||
) -> anyhow::Result<KernelMessage> {
|
||||
let outer_len = cipher.decrypt(&recv_read_only(stream).await?, buf)?;
|
||||
|
||||
if outer_len < 4 {
|
||||
return Err(anyhow::anyhow!("protocol message too small!"));
|
||||
}
|
||||
let length_bytes = [conn.buf[0], conn.buf[1], conn.buf[2], conn.buf[3]];
|
||||
let length_bytes = [buf[0], buf[1], buf[2], buf[3]];
|
||||
let msg_len = u32::from_be_bytes(length_bytes);
|
||||
if msg_len > MESSAGE_MAX_SIZE {
|
||||
return Err(anyhow::anyhow!("message too large"));
|
||||
@ -135,13 +156,11 @@ async fn recv_protocol_message(conn: &mut PeerConnection) -> anyhow::Result<Kern
|
||||
|
||||
// bad
|
||||
let mut msg = Vec::with_capacity(msg_len as usize);
|
||||
msg.extend_from_slice(&conn.buf[4..outer_len]);
|
||||
msg.extend_from_slice(&buf[4..outer_len]);
|
||||
|
||||
while msg.len() < msg_len as usize {
|
||||
let len = conn
|
||||
.noise
|
||||
.read_message(&recv(&mut conn.socket).await?, &mut conn.buf)?;
|
||||
msg.extend_from_slice(&conn.buf[..len]);
|
||||
let len = cipher.decrypt(&recv_read_only(stream).await?, buf)?;
|
||||
msg.extend_from_slice(&buf[..len]);
|
||||
}
|
||||
|
||||
Ok(rmp_serde::from_slice(&msg)?)
|
||||
@ -180,7 +199,9 @@ pub async fn recv_protocol_handshake(
|
||||
}
|
||||
|
||||
/// Receive a byte array from a read stream. If this returns an error,
|
||||
/// we should close the connection. Will automatically respond to 'PING' messages with a 'PONG'.
|
||||
/// we should close the connection.
|
||||
///
|
||||
/// Will automatically respond to 'PING' messages with a 'PONG'.
|
||||
pub async fn recv(socket: &mut WebSocket) -> anyhow::Result<Vec<u8>> {
|
||||
loop {
|
||||
match socket.next().await {
|
||||
@ -194,3 +215,16 @@ pub async fn recv(socket: &mut WebSocket) -> anyhow::Result<Vec<u8>> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Receive a byte array from a read stream. If this returns an error,
|
||||
/// we should close the connection.
|
||||
pub async fn recv_read_only(socket: &mut WsReadHalf) -> anyhow::Result<Vec<u8>> {
|
||||
loop {
|
||||
match socket.next().await {
|
||||
Some(Ok(tungstenite::Message::Ping(_))) => continue,
|
||||
Some(Ok(tungstenite::Message::Pong(_))) => continue,
|
||||
Some(Ok(tungstenite::Message::Binary(bin))) => return Ok(bin),
|
||||
_ => return Err(anyhow::anyhow!("websocket closed")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
File diff suppressed because one or more lines are too long
135
kinode/src/register-ui/build/assets/index-KlEDwcBC.js
Normal file
135
kinode/src/register-ui/build/assets/index-KlEDwcBC.js
Normal file
File diff suppressed because one or more lines are too long
@ -11,7 +11,7 @@
|
||||
<meta httpEquiv="X-UA-Compatible" content="IE=edge" />
|
||||
<meta name="viewport"
|
||||
content="width=device-width, initial-scale=1, minimum-scale=1, maximum-scale=1.00001, viewport-fit=cover" />
|
||||
<script type="module" crossorigin src="/assets/index-DvcjXN70.js"></script>
|
||||
<script type="module" crossorigin src="/assets/index-DexQTHZb.js"></script>
|
||||
<link rel="stylesheet" crossorigin href="/assets/index-B00cPdAQ.css">
|
||||
</head>
|
||||
|
||||
|
@ -5,6 +5,8 @@
|
||||
"proxy": "http://localhost:8080",
|
||||
"dependencies": {
|
||||
"@babel/plugin-proposal-private-property-in-object": "^7.21.11",
|
||||
"@ensdomains/eth-ens-namehash": "^2.0.15",
|
||||
"@esbuild-plugins/node-globals-polyfill": "^0.2.3",
|
||||
"@ethersproject/hash": "^5.7.0",
|
||||
"@typechain/ethers-v5": "^11.1.1",
|
||||
"@types/node": "^16.18.50",
|
||||
@ -25,11 +27,11 @@
|
||||
"buffer": "^6.0.3",
|
||||
"classnames": "^2.5.1",
|
||||
"eslint-config-react-app": "^7.0.1",
|
||||
"eth-ens-namehash": "^2.0.8",
|
||||
"ethers": "^5.7.2",
|
||||
"idna-uts46-hx": "^6.0.4",
|
||||
"is-valid-domain": "^0.1.6",
|
||||
"jazzicon": "^1.5.0",
|
||||
"punycode": "^2.3.1",
|
||||
"react": "^18.2.0",
|
||||
"react-dom": "^18.2.0",
|
||||
"react-icons": "^5.0.1",
|
||||
@ -76,4 +78,4 @@
|
||||
"inline-source-cli": "^2.0.0"
|
||||
},
|
||||
"type": "module"
|
||||
}
|
||||
}
|
||||
|
@ -2,7 +2,7 @@ import React, { useEffect, useRef } from "react";
|
||||
import { hooks } from "../connectors/metamask";
|
||||
import { NameWrapper, ENSRegistry } from "../abis/types";
|
||||
import isValidDomain from 'is-valid-domain'
|
||||
import { hash } from 'eth-ens-namehash'
|
||||
import { hash } from '@ensdomains/eth-ens-namehash';
|
||||
import { toAscii } from 'idna-uts46-hx'
|
||||
|
||||
const {
|
||||
|
@ -1,7 +1,7 @@
|
||||
import React, { useEffect, useRef } from "react";
|
||||
import { DotOsRegistrar } from "../abis/types";
|
||||
import isValidDomain from "is-valid-domain";
|
||||
import { hash } from "eth-ens-namehash";
|
||||
import hash from "@ensdomains/eth-ens-namehash";
|
||||
import { toAscii } from "idna-uts46-hx";
|
||||
|
||||
type ClaimOsNameProps = {
|
||||
@ -46,7 +46,7 @@ function EnterKnsName({
|
||||
if (index === -1) validities.push(NAME_LENGTH);
|
||||
} else if (index !== -1) validities.splice(index, 1);
|
||||
|
||||
let normalized: string;
|
||||
let normalized = ''
|
||||
index = validities.indexOf(NAME_INVALID_PUNY);
|
||||
try {
|
||||
normalized = toAscii(name + ".os");
|
||||
@ -56,18 +56,21 @@ function EnterKnsName({
|
||||
}
|
||||
|
||||
// only check if name is valid punycode
|
||||
if (normalized! !== undefined) {
|
||||
if (normalized && normalized !== '.os') {
|
||||
index = validities.indexOf(NAME_URL);
|
||||
if (name !== "" && !isValidDomain(normalized)) {
|
||||
if (index === -1) validities.push(NAME_URL);
|
||||
} else if (index !== -1) validities.splice(index, 1);
|
||||
|
||||
index = validities.indexOf(NAME_CLAIMED);
|
||||
if (validities.length === 0 || index !== -1) {
|
||||
|
||||
if (validities.length === 0 || index !== -1 && normalized.length > 2) {
|
||||
try {
|
||||
await dotOs?.ownerOf(hash(normalized));
|
||||
if (index === -1) validities.push(NAME_CLAIMED);
|
||||
const namehash = hash.hash(normalized)
|
||||
const owner = await dotOs?.ownerOf(namehash);
|
||||
if (owner && index === -1) validities.push(NAME_CLAIMED);
|
||||
} catch (e) {
|
||||
console.error({ e })
|
||||
if (index !== -1) validities.splice(index, 1);
|
||||
}
|
||||
}
|
||||
|
2
kinode/src/register-ui/src/declarations.d.ts
vendored
2
kinode/src/register-ui/src/declarations.d.ts
vendored
@ -1,4 +1,4 @@
|
||||
declare module 'eth-ens-namehash' {
|
||||
declare module '@ensdomains/eth-ens-namehash' {
|
||||
export function hash(name: string): string;
|
||||
export function normalize(name: string): string;
|
||||
}
|
||||
|
@ -8,6 +8,9 @@ import '@unocss/reset/tailwind.css'
|
||||
import 'uno.css'
|
||||
import './index.css';
|
||||
|
||||
import { Buffer } from 'buffer';
|
||||
window.Buffer = Buffer;
|
||||
|
||||
const connectors: [MetaMask, Web3ReactHooks][] = [
|
||||
[metaMask, metaMaskHooks],
|
||||
]
|
||||
|
@ -148,7 +148,7 @@ function Login({
|
||||
|
||||
// Login or confirm new keys
|
||||
const result = await fetch(
|
||||
reset ? "/api/confirm-change-network-keys" : "login",
|
||||
reset ? "confirm-change-network-keys" : "login",
|
||||
{
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
|
@ -9,7 +9,7 @@ import KinodeHeader from "../components/KnsHeader";
|
||||
import { NetworkingInfo, PageProps } from "../lib/types";
|
||||
import { ipToNumber } from "../utils/ipToNumber";
|
||||
import { getNetworkName, setChain } from "../utils/chain";
|
||||
import { hash } from "eth-ens-namehash";
|
||||
import { hash } from "@ensdomains/eth-ens-namehash";
|
||||
import DirectCheckbox from "../components/DirectCheckbox";
|
||||
import { MAINNET_OPT_HEX, OPTIMISM_OPT_HEX } from "../constants/chainId";
|
||||
import { KinodeTitle } from "../components/KinodeTitle";
|
||||
|
@ -9,7 +9,7 @@ import { hooks } from "../connectors/metamask";
|
||||
import { useNavigate } from "react-router-dom";
|
||||
import { namehash } from "ethers/lib/utils";
|
||||
import { toAscii } from "idna-uts46-hx";
|
||||
import { hash } from "eth-ens-namehash";
|
||||
import { hash } from "@ensdomains/eth-ens-namehash";
|
||||
import isValidDomain from "is-valid-domain";
|
||||
import Loader from "../components/Loader";
|
||||
import KinodeHeader from "../components/KnsHeader";
|
||||
|
@ -2,9 +2,13 @@ import { defineConfig } from 'vite'
|
||||
import react from '@vitejs/plugin-react'
|
||||
import UnoCSS from '@unocss/vite'
|
||||
import { presetUno, presetWind, presetIcons, transformerDirectives } from 'unocss'
|
||||
import { NodeGlobalsPolyfillPlugin } from '@esbuild-plugins/node-globals-polyfill';
|
||||
|
||||
export default defineConfig({
|
||||
plugins: [
|
||||
NodeGlobalsPolyfillPlugin({
|
||||
buffer: true
|
||||
}),
|
||||
UnoCSS({
|
||||
presets: [presetUno(), presetWind(), presetIcons()],
|
||||
shortcuts: [
|
||||
|
@ -1267,6 +1267,16 @@
|
||||
"@babel/helper-validator-identifier" "^7.24.5"
|
||||
to-fast-properties "^2.0.0"
|
||||
|
||||
"@ensdomains/eth-ens-namehash@^2.0.15":
|
||||
version "2.0.15"
|
||||
resolved "https://registry.yarnpkg.com/@ensdomains/eth-ens-namehash/-/eth-ens-namehash-2.0.15.tgz#5e5f2f24ba802aff8bc19edd822c9a11200cdf4a"
|
||||
integrity sha512-JRDFP6+Hczb1E0/HhIg0PONgBYasfGfDheujmfxaZaAv/NAH4jE6Kf48WbqfRZdxt4IZI3jl3Ri7sZ1nP09lgw==
|
||||
|
||||
"@esbuild-plugins/node-globals-polyfill@^0.2.3":
|
||||
version "0.2.3"
|
||||
resolved "https://registry.yarnpkg.com/@esbuild-plugins/node-globals-polyfill/-/node-globals-polyfill-0.2.3.tgz#0e4497a2b53c9e9485e149bc92ddb228438d6bcf"
|
||||
integrity sha512-r3MIryXDeXDOZh7ih1l/yE9ZLORCd5e8vWg02azWRGj5SPTuoh69A2AIyn0Z31V/kHBfZ4HgWJ+OK3GTTwLmnw==
|
||||
|
||||
"@esbuild/aix-ppc64@0.20.2":
|
||||
version "0.20.2"
|
||||
resolved "https://registry.yarnpkg.com/@esbuild/aix-ppc64/-/aix-ppc64-0.20.2.tgz#a70f4ac11c6a1dfc18b8bbb13284155d933b9537"
|
||||
@ -4830,14 +4840,6 @@ esutils@^2.0.2:
|
||||
resolved "https://registry.npmjs.org/esutils/-/esutils-2.0.3.tgz"
|
||||
integrity sha512-kVscqXk4OCp68SZ0dkgEKVi6/8ij300KBWTJq32P/dYeWTSwK41WyTxalN1eRmA5Z9UU/LX9D7FWSmV9SAYx6g==
|
||||
|
||||
eth-ens-namehash@^2.0.8:
|
||||
version "2.0.8"
|
||||
resolved "https://registry.npmjs.org/eth-ens-namehash/-/eth-ens-namehash-2.0.8.tgz"
|
||||
integrity sha512-VWEI1+KJfz4Km//dadyvBBoBeSQ0MHTXPvr8UIXiLW6IanxvAV+DmlZAijZwAyggqGUfwQBeHf7tc9wzc1piSw==
|
||||
dependencies:
|
||||
idna-uts46-hx "^2.3.1"
|
||||
js-sha3 "^0.5.7"
|
||||
|
||||
ethers@^5.7.2:
|
||||
version "5.7.2"
|
||||
resolved "https://registry.npmjs.org/ethers/-/ethers-5.7.2.tgz"
|
||||
@ -5311,13 +5313,6 @@ idb-keyval@^6.2.1:
|
||||
resolved "https://registry.npmjs.org/idb-keyval/-/idb-keyval-6.2.1.tgz"
|
||||
integrity sha512-8Sb3veuYCyrZL+VBt9LJfZjLUPWVvqn8tG28VqYNFCo43KHcKuq+b4EiXGeuaLAQWL2YmyDgMp2aSpH9JHsEQg==
|
||||
|
||||
idna-uts46-hx@^2.3.1:
|
||||
version "2.3.1"
|
||||
resolved "https://registry.npmjs.org/idna-uts46-hx/-/idna-uts46-hx-2.3.1.tgz"
|
||||
integrity sha512-PWoF9Keq6laYdIRwwCdhTPl60xRqAloYNMQLiyUnG42VjT53oW07BXIRM+NK7eQjzXjAk2gUvX9caRxlnF9TAA==
|
||||
dependencies:
|
||||
punycode "2.1.0"
|
||||
|
||||
idna-uts46-hx@^6.0.4:
|
||||
version "6.0.4"
|
||||
resolved "https://registry.yarnpkg.com/idna-uts46-hx/-/idna-uts46-hx-6.0.4.tgz#25f9f8af628bd4150b36340e99a1f9532c815905"
|
||||
@ -5686,11 +5681,6 @@ js-sha3@0.8.0, js-sha3@^0.8.0:
|
||||
resolved "https://registry.npmjs.org/js-sha3/-/js-sha3-0.8.0.tgz"
|
||||
integrity sha512-gF1cRrHhIzNfToc802P800N8PpXS+evLLXfsVpowqmAFR9uwbi89WvXg2QspOmXL8QL86J4T1EpFu+yUkwJY3Q==
|
||||
|
||||
js-sha3@^0.5.7:
|
||||
version "0.5.7"
|
||||
resolved "https://registry.npmjs.org/js-sha3/-/js-sha3-0.5.7.tgz"
|
||||
integrity sha512-GII20kjaPX0zJ8wzkTbNDYMY7msuZcTWk8S5UOh6806Jq/wz1J8/bnr8uGU0DAUmYDjj2Mr4X1cW8v/GLYnR+g==
|
||||
|
||||
"js-tokens@^3.0.0 || ^4.0.0", js-tokens@^4.0.0:
|
||||
version "4.0.0"
|
||||
resolved "https://registry.npmjs.org/js-tokens/-/js-tokens-4.0.0.tgz"
|
||||
@ -6603,11 +6593,6 @@ proxy-compare@2.5.1:
|
||||
resolved "https://registry.npmjs.org/proxy-compare/-/proxy-compare-2.5.1.tgz"
|
||||
integrity sha512-oyfc0Tx87Cpwva5ZXezSp5V9vht1c7dZBhvuV/y3ctkgMVUmiAGDVeeB0dKhGSyT0v1ZTEQYpe/RXlBVBNuCLA==
|
||||
|
||||
punycode@2.1.0:
|
||||
version "2.1.0"
|
||||
resolved "https://registry.npmjs.org/punycode/-/punycode-2.1.0.tgz"
|
||||
integrity sha512-Yxz2kRwT90aPiWEMHVYnEf4+rhwF1tBmmZ4KepCP+Wkium9JxtWnUm1nqGwpiAHr/tnTSeHqr3wb++jgSkXjhA==
|
||||
|
||||
punycode@^2.1.1, punycode@^2.3.1:
|
||||
version "2.3.1"
|
||||
resolved "https://registry.yarnpkg.com/punycode/-/punycode-2.3.1.tgz#027422e2faec0b25e1549c3e1bd8309b9133b6e5"
|
||||
|
@ -34,9 +34,8 @@ sol! {
|
||||
bytes32 _node,
|
||||
address _sender
|
||||
) public view virtual returns (bool authed);
|
||||
|
||||
function key(bytes32) external view returns (bytes32);
|
||||
function nodes(bytes32) external view returns (address, uint96);
|
||||
|
||||
function ip(bytes32) external view returns (uint128, uint16, uint16, uint16, uint16);
|
||||
}
|
||||
|
||||
@ -549,7 +548,7 @@ async fn handle_import_keyfile(
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = assign_direct_routing(
|
||||
if let Err(e) = assign_routing(
|
||||
&mut our,
|
||||
kns_address,
|
||||
provider,
|
||||
@ -616,7 +615,7 @@ async fn handle_login(
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = assign_direct_routing(
|
||||
if let Err(e) = assign_routing(
|
||||
&mut our,
|
||||
kns_address,
|
||||
provider,
|
||||
@ -692,10 +691,15 @@ async fn confirm_change_network_keys(
|
||||
&decoded_keyfile.file_key,
|
||||
);
|
||||
|
||||
success_response(sender, our.clone(), decoded_keyfile, encoded_keyfile).await
|
||||
our.networking_key = format!(
|
||||
"0x{}",
|
||||
hex::encode(decoded_keyfile.networking_keypair.public_key().as_ref())
|
||||
);
|
||||
|
||||
success_response(sender, our, decoded_keyfile, encoded_keyfile).await
|
||||
}
|
||||
|
||||
pub async fn assign_direct_routing(
|
||||
pub async fn assign_routing(
|
||||
our: &mut Identity,
|
||||
kns_address: EthAddress,
|
||||
provider: Arc<Provider<PubSubFrontend>>,
|
||||
@ -704,6 +708,7 @@ pub async fn assign_direct_routing(
|
||||
) -> anyhow::Result<()> {
|
||||
let namehash = FixedBytes::<32>::from_slice(&keygen::namehash(&our.name));
|
||||
let ip_call = ipCall { _0: namehash }.abi_encode();
|
||||
let key_call = keyCall { _0: namehash }.abi_encode();
|
||||
let tx_input = TransactionInput::new(Bytes::from(ip_call));
|
||||
let tx = TransactionRequest {
|
||||
to: Some(kns_address),
|
||||
@ -720,6 +725,23 @@ pub async fn assign_direct_routing(
|
||||
return Err(anyhow::anyhow!("Failed to decode node IP data from PKI"));
|
||||
};
|
||||
|
||||
let key_tx_input = TransactionInput::new(Bytes::from(key_call));
|
||||
let key_tx = TransactionRequest {
|
||||
to: Some(kns_address),
|
||||
input: key_tx_input,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let Ok(public_key) = provider.call(key_tx, None).await else {
|
||||
return Err(anyhow::anyhow!("Failed to fetch node key from PKI"));
|
||||
};
|
||||
|
||||
if format!("0x{}", hex::encode(&public_key)) != our.networking_key {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Networking key from PKI does not match our saved networking key"
|
||||
));
|
||||
}
|
||||
|
||||
let node_ip = format!(
|
||||
"{}.{}.{}.{}",
|
||||
(ip >> 24) & 0xFF,
|
||||
@ -752,6 +774,8 @@ pub async fn assign_direct_routing(
|
||||
ports.insert("tcp".to_string(), tcp);
|
||||
}
|
||||
our.routing = NodeRouting::Direct { ip: node_ip, ports };
|
||||
} else {
|
||||
// indirect node
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -1030,32 +1030,27 @@ impl Identity {
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
pub fn get_protocol_port(&self, protocol: &str) -> Option<u16> {
|
||||
pub fn get_protocol_port(&self, protocol: &str) -> Option<&u16> {
|
||||
match &self.routing {
|
||||
NodeRouting::Routers(_) => None,
|
||||
NodeRouting::Direct { ports, .. } => ports.get(protocol).cloned(),
|
||||
NodeRouting::Both { ports, .. } => ports.get(protocol).cloned(),
|
||||
NodeRouting::Direct { ports, .. } | NodeRouting::Both { ports, .. } => {
|
||||
ports.get(protocol)
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn get_ip(&self) -> Option<&str> {
|
||||
match &self.routing {
|
||||
NodeRouting::Routers(_) => None,
|
||||
NodeRouting::Direct { ip, .. } => Some(ip),
|
||||
NodeRouting::Both { ip, .. } => Some(ip),
|
||||
NodeRouting::Direct { ip, .. } | NodeRouting::Both { ip, .. } => Some(ip),
|
||||
}
|
||||
}
|
||||
pub fn ws_routing(&self) -> Option<(&str, &u16)> {
|
||||
match &self.routing {
|
||||
NodeRouting::Routers(_) => None,
|
||||
NodeRouting::Direct { ip, ports } => {
|
||||
if let Some(port) = ports.get("ws") {
|
||||
Some((ip, port))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
NodeRouting::Both { ip, ports, .. } => {
|
||||
if let Some(port) = ports.get("ws") {
|
||||
NodeRouting::Direct { ip, ports } | NodeRouting::Both { ip, ports, .. } => {
|
||||
if let Some(port) = ports.get("ws")
|
||||
&& *port != 0
|
||||
{
|
||||
Some((ip, port))
|
||||
} else {
|
||||
None
|
||||
@ -1066,15 +1061,10 @@ impl Identity {
|
||||
pub fn tcp_routing(&self) -> Option<(&str, &u16)> {
|
||||
match &self.routing {
|
||||
NodeRouting::Routers(_) => None,
|
||||
NodeRouting::Direct { ip, ports } => {
|
||||
if let Some(port) = ports.get("tcp") {
|
||||
Some((ip, port))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
NodeRouting::Both { ip, ports, .. } => {
|
||||
if let Some(port) = ports.get("tcp") {
|
||||
NodeRouting::Direct { ip, ports } | NodeRouting::Both { ip, ports, .. } => {
|
||||
if let Some(port) = ports.get("tcp")
|
||||
&& *port != 0
|
||||
{
|
||||
Some((ip, port))
|
||||
} else {
|
||||
None
|
||||
@ -1084,9 +1074,8 @@ impl Identity {
|
||||
}
|
||||
pub fn routers(&self) -> Option<&Vec<NodeId>> {
|
||||
match &self.routing {
|
||||
NodeRouting::Routers(routers) => Some(routers),
|
||||
NodeRouting::Routers(routers) | NodeRouting::Both { routers, .. } => Some(routers),
|
||||
NodeRouting::Direct { .. } => None,
|
||||
NodeRouting::Both { routers, .. } => Some(routers),
|
||||
}
|
||||
}
|
||||
pub fn both_to_direct(&mut self) {
|
||||
@ -1889,10 +1878,7 @@ pub struct KnsUpdate {
|
||||
}
|
||||
|
||||
impl KnsUpdate {
|
||||
pub fn get_protocol_port(&self, protocol: &str) -> u16 {
|
||||
match self.ports.get(protocol) {
|
||||
Some(port) => *port,
|
||||
None => 0,
|
||||
}
|
||||
pub fn get_protocol_port(&self, protocol: &str) -> Option<&u16> {
|
||||
self.ports.get(protocol)
|
||||
}
|
||||
}
|
||||
|
@ -1,3 +1,5 @@
|
||||
#![feature(let_chains)]
|
||||
|
||||
pub mod core;
|
||||
pub mod eth;
|
||||
mod http;
|
||||
|
Loading…
Reference in New Issue
Block a user