add tcp to register

This commit is contained in:
dr-frmr 2024-05-24 15:18:30 -06:00
parent ad39ec8fe1
commit 0da1cee88a
No known key found for this signature in database
2 changed files with 86 additions and 30 deletions

View File

@ -59,6 +59,7 @@ async fn main() {
create_home_directory(&home_directory_path).await;
let http_server_port = set_http_server_port(matches.get_one::<u16>("port")).await;
let ws_networking_port = matches.get_one::<u16>("ws-port");
let tcp_networking_port = matches.get_one::<u16>("tcp-port");
let verbose_mode = *matches
.get_one::<u8>("verbosity")
.expect("verbosity required");
@ -165,7 +166,8 @@ async fn main() {
mpsc::channel(TERMINAL_CHANNEL_CAPACITY);
let our_ip = find_public_ip().await;
let (wc_tcp_handle, flag_used) = setup_ws_networking(ws_networking_port.cloned()).await;
let (ws_tcp_handle, ws_flag_used) = setup_networking(ws_networking_port).await;
let (tcp_tcp_handle, tcp_flag_used) = setup_networking(tcp_networking_port).await;
#[cfg(feature = "simulation-mode")]
let (our, encoded_keyfile, decoded_keyfile) = simulate_node(
@ -186,7 +188,8 @@ async fn main() {
let (our, encoded_keyfile, decoded_keyfile) = serve_register_fe(
&home_directory_path,
our_ip.to_string(),
(wc_tcp_handle, flag_used),
(ws_tcp_handle, ws_flag_used),
(tcp_tcp_handle, tcp_flag_used),
http_server_port,
rpc.cloned(),
)
@ -474,22 +477,22 @@ async fn set_http_server_port(set_port: Option<&u16>) -> u16 {
}
}
/// Sets up WebSocket networking by finding an open port and creating a TCP listener.
/// Sets up networking by finding an open port and creating a TCP listener.
/// If a specific port is provided, it attempts to bind to it directly.
/// If no port is provided, it searches for the first available port between 9000 and 65535.
/// Returns a tuple containing the TcpListener and a boolean indicating if a specific port was used.
async fn setup_ws_networking(ws_networking_port: Option<u16>) -> (tokio::net::TcpListener, bool) {
match ws_networking_port {
async fn setup_networking(networking_port: Option<&u16>) -> (tokio::net::TcpListener, bool) {
match networking_port {
Some(port) => {
let listener = http::utils::find_open_port(port, port + 1)
let listener = http::utils::find_open_port(*port, port + 1)
.await
.expect("ws-port selected with flag could not be bound");
.expect("port selected with flag could not be bound");
(listener, true)
}
None => {
let listener = http::utils::find_open_port(9000, 65535)
.await
.expect("no ports found in range 9000-65535 for websocket server");
.expect("no ports found in range 9000-65535 for kinode networking");
(listener, false)
}
}
@ -616,6 +619,11 @@ fn build_command() -> Command {
.alias("--ws-port")
.value_parser(value_parser!(u16)),
)
.arg(
arg!(--"tcp-port" <PORT> "Kinode internal TCP protocol port [default: first unbound at or above 9000]")
.alias("--tcp-port")
.value_parser(value_parser!(u16)),
)
.arg(
arg!(--verbosity <VERBOSITY> "Verbosity level: higher is more verbose")
.default_value("0")
@ -684,6 +692,7 @@ async fn serve_register_fe(
home_directory_path: &str,
our_ip: String,
ws_networking: (tokio::net::TcpListener, bool),
tcp_networking: (tokio::net::TcpListener, bool),
http_server_port: u16,
maybe_rpc: Option<String>,
) -> (Identity, Vec<u8>, Keyfile) {
@ -700,6 +709,7 @@ async fn serve_register_fe(
kill_rx,
our_ip,
ws_networking,
tcp_networking,
http_server_port,
disk_keyfile,
maybe_rpc) => {

View File

@ -89,6 +89,7 @@ pub async fn register(
kill_rx: oneshot::Receiver<bool>,
ip: String,
ws_networking: (tokio::net::TcpListener, bool),
tcp_networking: (tokio::net::TcpListener, bool),
http_port: u16,
keyfile: Option<Vec<u8>>,
maybe_rpc: Option<String>,
@ -100,6 +101,8 @@ pub async fn register(
let ws_port = ws_networking.0.local_addr().unwrap().port();
let ws_flag_used = ws_networking.1;
let tcp_port = tcp_networking.0.local_addr().unwrap().port();
let tcp_flag_used = tcp_networking.1;
// This is a **temporary** identity, passed to the UI.
// If it is confirmed through a /boot or /confirm-change-network-keys,
@ -109,7 +112,10 @@ pub async fn register(
name: "".to_string(),
routing: NodeRouting::Both {
ip: ip.clone(),
ports: std::collections::BTreeMap::from([("ws".to_string(), ws_port)]),
ports: std::collections::BTreeMap::from([
("ws".to_string(), ws_port),
("tcp".to_string(), tcp_port),
]),
routers: vec![
// "default-router-1.os".into(),
// "default-router-2.os".into(),
@ -154,7 +160,8 @@ pub async fn register(
let net_keypair = warp::any().map(move || net_keypair.clone());
let tx = warp::any().map(move || tx.clone());
let ip = warp::any().map(move || ip.clone());
let ws_port = warp::any().map(move || if ws_flag_used { Some(ws_port) } else { None });
let ws_port = warp::any().map(move || (ws_port, ws_flag_used));
let tcp_port = warp::any().map(move || (tcp_port, tcp_flag_used));
let static_files = warp::path("assets").and(static_dir!("src/register-ui/build/assets/"));
@ -248,10 +255,19 @@ pub async fn register(
.and(warp::body::json())
.and(ip.clone())
.and(ws_port.clone())
.and(tcp_port.clone())
.and(tx.clone())
.and_then(move |boot_info, ip, ws_port, tx| {
.and_then(move |boot_info, ip, ws_port, tcp_port, tx| {
let import_provider = import_provider.clone();
handle_import_keyfile(boot_info, ip, ws_port, tx, kns_address, import_provider)
handle_import_keyfile(
boot_info,
ip,
ws_port,
tcp_port,
tx,
kns_address,
import_provider,
)
}),
))
.or(warp::path("login").and(
@ -260,14 +276,16 @@ pub async fn register(
.and(warp::body::json())
.and(ip)
.and(ws_port.clone())
.and(tcp_port.clone())
.and(tx.clone())
.and(keyfile.clone())
.and_then(move |boot_info, ip, ws_port, tx, keyfile| {
.and_then(move |boot_info, ip, ws_port, tcp_port, tx, keyfile| {
let login_provider = login_provider.clone();
handle_login(
boot_info,
ip,
ws_port,
tcp_port,
tx,
keyfile,
kns_address,
@ -522,7 +540,8 @@ async fn handle_boot(
async fn handle_import_keyfile(
info: ImportKeyfileInfo,
ip: String,
ws_networking_port: Option<u16>,
ws_networking_port: (u16, bool),
tcp_networking_port: (u16, bool),
sender: Arc<RegistrationSender>,
kns_address: EthAddress,
provider: Arc<Provider<PubSubFrontend>>,
@ -551,7 +570,7 @@ async fn handle_import_keyfile(
routing: if k.routers.is_empty() {
NodeRouting::Direct {
ip,
ports: std::collections::BTreeMap::from([("ws".to_string(), 9000)]),
ports: std::collections::BTreeMap::new(),
}
} else {
NodeRouting::Routers(k.routers.clone())
@ -569,7 +588,15 @@ async fn handle_import_keyfile(
}
};
if let Err(e) = assign_ws_routing(&mut our, kns_address, provider, ws_networking_port).await {
if let Err(e) = assign_direct_routing(
&mut our,
kns_address,
provider,
ws_networking_port,
tcp_networking_port,
)
.await
{
return Ok(warp::reply::with_status(
warp::reply::json(&e.to_string()),
StatusCode::INTERNAL_SERVER_ERROR,
@ -582,7 +609,8 @@ async fn handle_import_keyfile(
async fn handle_login(
info: LoginInfo,
ip: String,
ws_networking_port: Option<u16>,
ws_networking_port: (u16, bool),
tcp_networking_port: (u16, bool),
sender: Arc<RegistrationSender>,
encoded_keyfile: Option<Vec<u8>>,
kns_address: EthAddress,
@ -609,7 +637,7 @@ async fn handle_login(
routing: if k.routers.is_empty() {
NodeRouting::Direct {
ip,
ports: std::collections::BTreeMap::from([("ws".to_string(), 9000)]),
ports: std::collections::BTreeMap::new(),
}
} else {
NodeRouting::Routers(k.routers.clone())
@ -627,7 +655,15 @@ async fn handle_login(
}
};
if let Err(e) = assign_ws_routing(&mut our, kns_address, provider, ws_networking_port).await {
if let Err(e) = assign_direct_routing(
&mut our,
kns_address,
provider,
ws_networking_port,
tcp_networking_port,
)
.await
{
return Ok(warp::reply::with_status(
warp::reply::json(&e.to_string()),
StatusCode::INTERNAL_SERVER_ERROR,
@ -698,11 +734,12 @@ async fn confirm_change_network_keys(
success_response(sender, our.clone(), decoded_keyfile, encoded_keyfile).await
}
pub async fn assign_ws_routing(
pub async fn assign_direct_routing(
our: &mut Identity,
kns_address: EthAddress,
provider: Arc<Provider<PubSubFrontend>>,
ws_networking_port: Option<u16>,
ws_networking_port: (u16, bool),
tcp_networking_port: (u16, bool),
) -> anyhow::Result<()> {
let namehash = FixedBytes::<32>::from_slice(&keygen::namehash(&our.name));
let ip_call = ipCall { _0: namehash }.abi_encode();
@ -717,7 +754,7 @@ pub async fn assign_ws_routing(
return Err(anyhow::anyhow!("Failed to fetch node IP data from PKI"));
};
let Ok((ip, ws, _wt, _tcp, _udp)) = <(u128, u16, u16, u16, u16)>::abi_decode(&ip_data, false)
let Ok((ip, ws, _wt, tcp, _udp)) = <(u128, u16, u16, u16, u16)>::abi_decode(&ip_data, false)
else {
return Err(anyhow::anyhow!("Failed to decode node IP data from PKI"));
};
@ -730,21 +767,30 @@ pub async fn assign_ws_routing(
ip & 0xFF
);
if node_ip != *"0.0.0.0" || ws != 0 {
if node_ip != *"0.0.0.0" && (ws != 0 || tcp != 0) {
// direct node
if let Some(chosen_port) = ws_networking_port {
if chosen_port != ws {
let mut ports = std::collections::BTreeMap::new();
if ws != 0 {
if ws_networking_port.1 && ws != ws_networking_port.0 {
return Err(anyhow::anyhow!(
"Binary used --ws-port flag to set port to {}, but node is using port {} onchain.",
chosen_port,
ws_networking_port.0,
ws
));
}
ports.insert("ws".to_string(), ws);
}
our.routing = NodeRouting::Direct {
ip: node_ip,
ports: std::collections::BTreeMap::from([("ws".to_string(), ws)]),
};
if tcp != 0 {
if tcp_networking_port.1 && tcp != tcp_networking_port.0 {
return Err(anyhow::anyhow!(
"Binary used --tcp-port flag to set port to {}, but node is using port {} onchain.",
tcp_networking_port.0,
tcp
));
}
ports.insert("tcp".to_string(), tcp);
}
our.routing = NodeRouting::Direct { ip: node_ip, ports };
}
Ok(())
}