From e83ae0e888c218515b1d0ee0d060025aed9efe55 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Mon, 2 Oct 2023 23:58:20 -0400 Subject: [PATCH 1/2] assorted hotfixes --- build.rs | 154 +++++++++++++++++++++++++++++++++ modules/qns_indexer/src/lib.rs | 6 +- src/eth_rpc.rs | 24 ++--- src/main.rs | 32 +++---- src/net/mod.rs | 2 +- 5 files changed, 188 insertions(+), 30 deletions(-) create mode 100644 build.rs diff --git a/build.rs b/build.rs new file mode 100644 index 00000000..a3b3f614 --- /dev/null +++ b/build.rs @@ -0,0 +1,154 @@ +use std::process::Command; +use std::{fs, io}; + +fn run_command(cmd: &mut Command) -> io::Result<()> { + let status = cmd.status()?; + if status.success() { + Ok(()) + } else { + Err(io::Error::new(io::ErrorKind::Other, "Command failed")) + } +} + +fn main() { + if std::env::var("SKIP_BUILD_SCRIPT").is_ok() { + println!("Skipping build script"); + return; + } + + let pwd = std::env::current_dir().unwrap(); + + // create target.wasm (compiled .wit) & world + run_command(Command::new("wasm-tools").args(&[ + "component", + "wit", + &format!("{}/wit/", pwd.display()), + "-o", + "target.wasm", + "--wasm", + ])) + .unwrap(); + run_command(Command::new("touch").args(&[&format!("{}/world", pwd.display())])).unwrap(); + + // Build wasm32-wasi apps. + const WASI_APPS: [&str; 8] = [ + "apps_home", + "chess", + "http_bindings", + "http_proxy", + "orgs", + "qns_indexer", + "rpc", + "terminal", + ]; + for name in WASI_APPS { + // only execute if one of the modules has source code changes + println!("cargo:rerun-if-changed=modules/{}/src", name); + // copy in the wit files + run_command( + Command::new("rm").args(&["-rf", &format!("{}/modules/{}/wit", pwd.display(), name)]), + ) + .unwrap(); + run_command(Command::new("cp").args(&[ + "-r", + "wit", + &format!("{}/modules/{}", pwd.display(), name), + ])) + .unwrap(); + + fs::create_dir_all(&format!( + "{}/modules/{}/target/bindings/{}", + pwd.display(), + name, + name + )) + .unwrap(); + run_command(Command::new("cp").args(&[ + "target.wasm", + &format!( + "{}/modules/{}/target/bindings/{}/", + pwd.display(), + name, + name + ), + ])) + .unwrap(); + run_command(Command::new("cp").args(&[ + "world", + &format!( + "{}/modules/{}/target/bindings/{}/", + pwd.display(), + name, + name + ), + ])) + .unwrap(); + + fs::create_dir_all(&format!( + "{}/modules/{}/target/wasm32-unknown-unknown/release", + pwd.display(), + name + )) + .unwrap(); + + // build the module + run_command(Command::new("cargo").args(&[ + "build", + "--release", + "--no-default-features", + &format!( + "--manifest-path={}/modules/{}/Cargo.toml", + pwd.display(), + name + ), + "--target", + "wasm32-wasi", + ])) + .unwrap(); + + // adapt module to component with adaptor + run_command(Command::new("wasm-tools").args(&[ + "component", + "new", + &format!( + "{}/modules/{}/target/wasm32-wasi/release/{}.wasm", + pwd.display(), + name, + name + ), + "-o", + &format!( + "{}/modules/{}/target/wasm32-wasi/release/{}_adapted.wasm", + pwd.display(), + name, + name + ), + "--adapt", + &format!("{}/wasi_snapshot_preview1.wasm", pwd.display()), + ])) + .unwrap(); + + // put wit into component & place where boot sequence expects to find it + run_command(Command::new("wasm-tools").args(&[ + "component", + "embed", + "wit", + "--world", + "uq-process", + &format!( + "{}/modules/{}/target/wasm32-wasi/release/{}_adapted.wasm", + pwd.display(), + name, + name + ), + "-o", + &format!( + "{}/modules/{}/target/wasm32-unknown-unknown/release/{}.wasm", + pwd.display(), + name, + name + ), + ])) + .unwrap(); + } +} \ No newline at end of file diff --git a/modules/qns_indexer/src/lib.rs b/modules/qns_indexer/src/lib.rs index 5eede377..1443f860 100644 --- a/modules/qns_indexer/src/lib.rs +++ b/modules/qns_indexer/src/lib.rs @@ -77,8 +77,6 @@ fn subscribe_to_qns(from_block: u64) -> String { impl UqProcess for Component { fn init(our: Address) { - bindings::print_to_terminal(0, "qns_indexer: start"); - let mut state: State = State { names: HashMap::new(), nodes: HashMap::new(), @@ -101,6 +99,8 @@ impl UqProcess for Component { }, } + bindings::print_to_terminal(0, &format!("qns_indexer: starting at block {}", state.block)); + // shove all state into net::net for (_, ipc) in state.nodes.iter() { send_request( @@ -218,6 +218,7 @@ impl UqProcess for Component { match msg { // Probably more message types later...maybe not... AllActions::EventSubscription(e) => { + state.block = hex_to_u64(&e.blockNumber).unwrap(); match decode_hex(&e.topics[0].clone()) { NodeRegistered::SIGNATURE_HASH => { // bindings::print_to_terminal(0, format!("qns_indexer: got NodeRegistered event: {:?}", e).as_str()); @@ -230,7 +231,6 @@ impl UqProcess for Component { // bindings::print_to_terminal(0, format!("qns_indexer: NAME: {:?}", name.to_string()).as_str()); state.names.insert(node.to_string(), name); - state.block = hex_to_u64(&e.blockNumber).unwrap(); } WsChanged::SIGNATURE_HASH => { // bindings::print_to_terminal(0, format!("qns_indexer: got WsChanged event: {:?}", e).as_str()); diff --git a/src/eth_rpc.rs b/src/eth_rpc.rs index 6801a786..dfb5b740 100644 --- a/src/eth_rpc.rs +++ b/src/eth_rpc.rs @@ -52,11 +52,7 @@ pub async fn eth_rpc( mut recv_in_client: MessageReceiver, print_tx: PrintSender, ) -> Result<()> { - // TODO maybe don't need to do Arc Mutex - let subscriptions = Arc::new(Mutex::new(HashMap::< - u64, - tokio::task::JoinHandle>, - >::new())); + let mut subscriptions = HashMap::>>::new(); while let Some(message) = recv_in_client.recv().await { let our = our.clone(); @@ -198,8 +194,8 @@ pub async fn eth_rpc( // TODO grab and print error let _ = print_tx .send(Printout { - verbosity: 1, - content: format!("eth_rpc: connection retrying"), + verbosity: 0, + content: format!("eth_rpc: connection failed, retrying in 5s"), }) .await; tokio::time::sleep(std::time::Duration::from_secs(5)).await; @@ -211,12 +207,18 @@ pub async fn eth_rpc( .await { Err(e) => { + let _ = print_tx + .send(Printout { + verbosity: 0, + content: format!("eth_rpc: subscription error: {:?}", e), + }) + .await; continue; } Ok(mut stream) => { let _ = print_tx .send(Printout { - verbosity: 1, + verbosity: 0, content: format!("eth_rpc: connection established"), }) .await; @@ -232,7 +234,7 @@ pub async fn eth_rpc( target: target.clone(), rsvp: None, message: Message::Request(Request { - inherit: false, // TODO what + inherit: false, expects_response: None, ipc: Some(json!({ "EventSubscription": serde_json::to_value(event.clone()).unwrap() @@ -257,7 +259,7 @@ pub async fn eth_rpc( }; } }); - subscriptions.lock().await.insert(id, handle); + subscriptions.insert(id, handle); } EthRpcAction::Unsubscribe(sub_id) => { let _ = print_tx @@ -267,7 +269,7 @@ pub async fn eth_rpc( }) .await; - if let Some(handle) = subscriptions.lock().await.remove(&sub_id) { + if let Some(handle) = subscriptions.remove(&sub_id) { handle.abort(); } else { let _ = print_tx diff --git a/src/main.rs b/src/main.rs index 7c329e0b..a4b2c458 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use anyhow::Result; use dotenv; use ethers::prelude::{abigen, namehash, Address as EthAddress, Provider, U256}; -use ethers_providers::Ws; +use ethers_providers::{Middleware, Ws}; use ring::pkcs8::Document; use ring::signature::{self, KeyPair}; use std::env; @@ -56,7 +56,7 @@ async fn main() { } // read PKI from websocket endpoint served by public RPC // if you get rate-limited or something, pass in your own RPC as a boot argument - let mut rpc_url = "wss://eth-sepolia.public.blastapi.io".to_string(); + let mut rpc_url = "wss://ethereum-sepolia.publicnode.com".to_string(); for (i, arg) in args.iter().enumerate() { if arg == "--rpc" { @@ -162,15 +162,15 @@ async fn main() { let (fs_kill_confirm_send, fs_kill_confirm_recv) = oneshot::channel::<()>(); println!("finding public IP address..."); - let our_ip = { + let our_ip: std::net::Ipv4Addr = { if let Ok(Some(ip)) = timeout(std::time::Duration::from_secs(5), public_ip::addr_v4()).await { - ip.to_string() + ip } else { println!( "\x1b[38;5;196mfailed to find public IPv4 address: booting as a routed node\x1b[0m" ); - "localhost".into() + std::net::Ipv4Addr::LOCALHOST } }; @@ -202,7 +202,7 @@ async fn main() { "Click here to log in to your node.", ); println!("(http://localhost:{}/login)", http_server_port); - if our_ip != "localhost" { + if our_ip != std::net::Ipv4Addr::LOCALHOST { println!( "(if on a remote machine: http://{}:{}/login)", our_ip, http_server_port @@ -236,11 +236,15 @@ async fn main() { let Ok(ws_rpc) = Provider::::connect(rpc_url.clone()).await else { panic!("rpc: couldn't connect to blockchain wss endpoint"); }; + let Ok(_) = ws_rpc.get_block_number().await else { + panic!("error: RPC endpoint not responding, try setting one with --rpc flag"); + }; let qns_address: EthAddress = QNS_SEPOLIA_ADDRESS.parse().unwrap(); let contract = QNSRegistry::new(qns_address, ws_rpc.into()); let node_id: U256 = namehash(&username).as_bytes().into(); - let onchain_id = contract.ws(node_id).call().await.unwrap(); // TODO unwrap - + let Ok(onchain_id) = contract.ws(node_id).call().await else { + panic!("error: RPC endpoint failed to fetch our node_id"); + }; // double check that routers match on-chain information let namehashed_routers: Vec<[u8; 32]> = routers .clone() @@ -256,10 +260,8 @@ async fn main() { // double check that keys match on-chain information if onchain_id.routers != namehashed_routers || onchain_id.public_key != networking_keypair.public_key().as_ref() - // || (onchain_id.ip_and_port > 0 && onchain_id.ip_and_port != combineIpAndPort( - // our_ip.clone(), - // http_server_port, - // )) + || (onchain_id.ip != 0 + && onchain_id.ip != >::into(our_ip)) { panic!("CRITICAL: your routing information does not match on-chain records"); } @@ -299,7 +301,7 @@ async fn main() { "Click here to register your node.", ); println!("(http://localhost:{})", http_server_port); - if our_ip != "localhost" { + if our_ip != std::net::Ipv4Addr::LOCALHOST { println!( "(if on a remote machine: http://{}:{})", our_ip, http_server_port @@ -308,7 +310,7 @@ async fn main() { let (tx, mut rx) = mpsc::channel::<(Identity, String, Document, Vec)>(1); let (mut our, password, serialized_networking_keypair, jwt_secret_bytes) = tokio::select! { - _ = register::register(tx, kill_rx, our_ip.clone(), http_server_port, http_server_port) + _ = register::register(tx, kill_rx, our_ip.to_string(), http_server_port, http_server_port) => panic!("registration failed"), (our, password, serialized_networking_keypair, jwt_secret_bytes) = async { while let Some(fin) = rx.recv().await { @@ -415,7 +417,7 @@ async fn main() { )); tasks.spawn(net::networking( our.clone(), - our_ip, + our_ip.to_string(), networking_keypair_arc.clone(), kernel_message_sender.clone(), network_error_sender, diff --git a/src/net/mod.rs b/src/net/mod.rs index 77f4e4d7..be202e00 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -641,7 +641,7 @@ async fn receive_incoming_connections( ) { let tcp = TcpListener::bind(format!("0.0.0.0:{}", port)) .await - .expect(format!("fatal error: can't listen on port {port}").as_str()); + .expect(format!("net: fatal error: can't listen on port {port}. change port onchain or free up this port!").as_str()); while let Ok((stream, _socket_addr)) = tcp.accept().await { // TODO we can perform some amount of validation here From d8303be9adeb71ec3877e2f2421d425c99da356e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 3 Oct 2023 03:58:48 +0000 Subject: [PATCH 2/2] Format Rust code using rustfmt --- build.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.rs b/build.rs index a3b3f614..c9b48390 100644 --- a/build.rs +++ b/build.rs @@ -151,4 +151,4 @@ fn main() { ])) .unwrap(); } -} \ No newline at end of file +}