Merge branch 'main' into hf/add-kv-process

This commit is contained in:
hosted-fornet 2023-10-02 22:29:07 -07:00
commit 1cc5287234
5 changed files with 188 additions and 30 deletions

154
build.rs Normal file
View File

@ -0,0 +1,154 @@
use std::process::Command;
use std::{fs, io};
fn run_command(cmd: &mut Command) -> io::Result<()> {
let status = cmd.status()?;
if status.success() {
Ok(())
} else {
Err(io::Error::new(io::ErrorKind::Other, "Command failed"))
}
}
fn main() {
if std::env::var("SKIP_BUILD_SCRIPT").is_ok() {
println!("Skipping build script");
return;
}
let pwd = std::env::current_dir().unwrap();
// create target.wasm (compiled .wit) & world
run_command(Command::new("wasm-tools").args(&[
"component",
"wit",
&format!("{}/wit/", pwd.display()),
"-o",
"target.wasm",
"--wasm",
]))
.unwrap();
run_command(Command::new("touch").args(&[&format!("{}/world", pwd.display())])).unwrap();
// Build wasm32-wasi apps.
const WASI_APPS: [&str; 8] = [
"apps_home",
"chess",
"http_bindings",
"http_proxy",
"orgs",
"qns_indexer",
"rpc",
"terminal",
];
for name in WASI_APPS {
// only execute if one of the modules has source code changes
println!("cargo:rerun-if-changed=modules/{}/src", name);
// copy in the wit files
run_command(
Command::new("rm").args(&["-rf", &format!("{}/modules/{}/wit", pwd.display(), name)]),
)
.unwrap();
run_command(Command::new("cp").args(&[
"-r",
"wit",
&format!("{}/modules/{}", pwd.display(), name),
]))
.unwrap();
fs::create_dir_all(&format!(
"{}/modules/{}/target/bindings/{}",
pwd.display(),
name,
name
))
.unwrap();
run_command(Command::new("cp").args(&[
"target.wasm",
&format!(
"{}/modules/{}/target/bindings/{}/",
pwd.display(),
name,
name
),
]))
.unwrap();
run_command(Command::new("cp").args(&[
"world",
&format!(
"{}/modules/{}/target/bindings/{}/",
pwd.display(),
name,
name
),
]))
.unwrap();
fs::create_dir_all(&format!(
"{}/modules/{}/target/wasm32-unknown-unknown/release",
pwd.display(),
name
))
.unwrap();
// build the module
run_command(Command::new("cargo").args(&[
"build",
"--release",
"--no-default-features",
&format!(
"--manifest-path={}/modules/{}/Cargo.toml",
pwd.display(),
name
),
"--target",
"wasm32-wasi",
]))
.unwrap();
// adapt module to component with adaptor
run_command(Command::new("wasm-tools").args(&[
"component",
"new",
&format!(
"{}/modules/{}/target/wasm32-wasi/release/{}.wasm",
pwd.display(),
name,
name
),
"-o",
&format!(
"{}/modules/{}/target/wasm32-wasi/release/{}_adapted.wasm",
pwd.display(),
name,
name
),
"--adapt",
&format!("{}/wasi_snapshot_preview1.wasm", pwd.display()),
]))
.unwrap();
// put wit into component & place where boot sequence expects to find it
run_command(Command::new("wasm-tools").args(&[
"component",
"embed",
"wit",
"--world",
"uq-process",
&format!(
"{}/modules/{}/target/wasm32-wasi/release/{}_adapted.wasm",
pwd.display(),
name,
name
),
"-o",
&format!(
"{}/modules/{}/target/wasm32-unknown-unknown/release/{}.wasm",
pwd.display(),
name,
name
),
]))
.unwrap();
}
}

View File

@ -77,8 +77,6 @@ fn subscribe_to_qns(from_block: u64) -> String {
impl UqProcess for Component {
fn init(our: Address) {
bindings::print_to_terminal(0, "qns_indexer: start");
let mut state: State = State {
names: HashMap::new(),
nodes: HashMap::new(),
@ -101,6 +99,8 @@ impl UqProcess for Component {
},
}
bindings::print_to_terminal(0, &format!("qns_indexer: starting at block {}", state.block));
// shove all state into net::net
for (_, ipc) in state.nodes.iter() {
send_request(
@ -218,6 +218,7 @@ impl UqProcess for Component {
match msg {
// Probably more message types later...maybe not...
AllActions::EventSubscription(e) => {
state.block = hex_to_u64(&e.blockNumber).unwrap();
match decode_hex(&e.topics[0].clone()) {
NodeRegistered::SIGNATURE_HASH => {
// bindings::print_to_terminal(0, format!("qns_indexer: got NodeRegistered event: {:?}", e).as_str());
@ -230,7 +231,6 @@ impl UqProcess for Component {
// bindings::print_to_terminal(0, format!("qns_indexer: NAME: {:?}", name.to_string()).as_str());
state.names.insert(node.to_string(), name);
state.block = hex_to_u64(&e.blockNumber).unwrap();
}
WsChanged::SIGNATURE_HASH => {
// bindings::print_to_terminal(0, format!("qns_indexer: got WsChanged event: {:?}", e).as_str());

View File

@ -52,11 +52,7 @@ pub async fn eth_rpc(
mut recv_in_client: MessageReceiver,
print_tx: PrintSender,
) -> Result<()> {
// TODO maybe don't need to do Arc Mutex
let subscriptions = Arc::new(Mutex::new(HashMap::<
u64,
tokio::task::JoinHandle<Result<(), EthRpcError>>,
>::new()));
let mut subscriptions = HashMap::<u64, tokio::task::JoinHandle<Result<(), EthRpcError>>>::new();
while let Some(message) = recv_in_client.recv().await {
let our = our.clone();
@ -198,8 +194,8 @@ pub async fn eth_rpc(
// TODO grab and print error
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("eth_rpc: connection retrying"),
verbosity: 0,
content: format!("eth_rpc: connection failed, retrying in 5s"),
})
.await;
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
@ -211,12 +207,18 @@ pub async fn eth_rpc(
.await
{
Err(e) => {
let _ = print_tx
.send(Printout {
verbosity: 0,
content: format!("eth_rpc: subscription error: {:?}", e),
})
.await;
continue;
}
Ok(mut stream) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
verbosity: 0,
content: format!("eth_rpc: connection established"),
})
.await;
@ -232,7 +234,7 @@ pub async fn eth_rpc(
target: target.clone(),
rsvp: None,
message: Message::Request(Request {
inherit: false, // TODO what
inherit: false,
expects_response: None,
ipc: Some(json!({
"EventSubscription": serde_json::to_value(event.clone()).unwrap()
@ -257,7 +259,7 @@ pub async fn eth_rpc(
};
}
});
subscriptions.lock().await.insert(id, handle);
subscriptions.insert(id, handle);
}
EthRpcAction::Unsubscribe(sub_id) => {
let _ = print_tx
@ -267,7 +269,7 @@ pub async fn eth_rpc(
})
.await;
if let Some(handle) = subscriptions.lock().await.remove(&sub_id) {
if let Some(handle) = subscriptions.remove(&sub_id) {
handle.abort();
} else {
let _ = print_tx

View File

@ -1,7 +1,7 @@
use anyhow::Result;
use dotenv;
use ethers::prelude::{abigen, namehash, Address as EthAddress, Provider, U256};
use ethers_providers::Ws;
use ethers_providers::{Middleware, Ws};
use ring::pkcs8::Document;
use ring::signature::{self, KeyPair};
use std::env;
@ -56,7 +56,7 @@ async fn main() {
}
// read PKI from websocket endpoint served by public RPC
// if you get rate-limited or something, pass in your own RPC as a boot argument
let mut rpc_url = "wss://eth-sepolia.public.blastapi.io".to_string();
let mut rpc_url = "wss://ethereum-sepolia.publicnode.com".to_string();
for (i, arg) in args.iter().enumerate() {
if arg == "--rpc" {
@ -162,15 +162,15 @@ async fn main() {
let (fs_kill_confirm_send, fs_kill_confirm_recv) = oneshot::channel::<()>();
println!("finding public IP address...");
let our_ip = {
let our_ip: std::net::Ipv4Addr = {
if let Ok(Some(ip)) = timeout(std::time::Duration::from_secs(5), public_ip::addr_v4()).await
{
ip.to_string()
ip
} else {
println!(
"\x1b[38;5;196mfailed to find public IPv4 address: booting as a routed node\x1b[0m"
);
"localhost".into()
std::net::Ipv4Addr::LOCALHOST
}
};
@ -202,7 +202,7 @@ async fn main() {
"Click here to log in to your node.",
);
println!("(http://localhost:{}/login)", http_server_port);
if our_ip != "localhost" {
if our_ip != std::net::Ipv4Addr::LOCALHOST {
println!(
"(if on a remote machine: http://{}:{}/login)",
our_ip, http_server_port
@ -236,11 +236,15 @@ async fn main() {
let Ok(ws_rpc) = Provider::<Ws>::connect(rpc_url.clone()).await else {
panic!("rpc: couldn't connect to blockchain wss endpoint");
};
let Ok(_) = ws_rpc.get_block_number().await else {
panic!("error: RPC endpoint not responding, try setting one with --rpc flag");
};
let qns_address: EthAddress = QNS_SEPOLIA_ADDRESS.parse().unwrap();
let contract = QNSRegistry::new(qns_address, ws_rpc.into());
let node_id: U256 = namehash(&username).as_bytes().into();
let onchain_id = contract.ws(node_id).call().await.unwrap(); // TODO unwrap
let Ok(onchain_id) = contract.ws(node_id).call().await else {
panic!("error: RPC endpoint failed to fetch our node_id");
};
// double check that routers match on-chain information
let namehashed_routers: Vec<[u8; 32]> = routers
.clone()
@ -256,10 +260,8 @@ async fn main() {
// double check that keys match on-chain information
if onchain_id.routers != namehashed_routers
|| onchain_id.public_key != networking_keypair.public_key().as_ref()
// || (onchain_id.ip_and_port > 0 && onchain_id.ip_and_port != combineIpAndPort(
// our_ip.clone(),
// http_server_port,
// ))
|| (onchain_id.ip != 0
&& onchain_id.ip != <std::net::Ipv4Addr as Into<u32>>::into(our_ip))
{
panic!("CRITICAL: your routing information does not match on-chain records");
}
@ -299,7 +301,7 @@ async fn main() {
"Click here to register your node.",
);
println!("(http://localhost:{})", http_server_port);
if our_ip != "localhost" {
if our_ip != std::net::Ipv4Addr::LOCALHOST {
println!(
"(if on a remote machine: http://{}:{})",
our_ip, http_server_port
@ -308,7 +310,7 @@ async fn main() {
let (tx, mut rx) = mpsc::channel::<(Identity, String, Document, Vec<u8>)>(1);
let (mut our, password, serialized_networking_keypair, jwt_secret_bytes) = tokio::select! {
_ = register::register(tx, kill_rx, our_ip.clone(), http_server_port, http_server_port)
_ = register::register(tx, kill_rx, our_ip.to_string(), http_server_port, http_server_port)
=> panic!("registration failed"),
(our, password, serialized_networking_keypair, jwt_secret_bytes) = async {
while let Some(fin) = rx.recv().await {
@ -415,7 +417,7 @@ async fn main() {
));
tasks.spawn(net::networking(
our.clone(),
our_ip,
our_ip.to_string(),
networking_keypair_arc.clone(),
kernel_message_sender.clone(),
network_error_sender,

View File

@ -641,7 +641,7 @@ async fn receive_incoming_connections(
) {
let tcp = TcpListener::bind(format!("0.0.0.0:{}", port))
.await
.expect(format!("fatal error: can't listen on port {port}").as_str());
.expect(format!("net: fatal error: can't listen on port {port}. change port onchain or free up this port!").as_str());
while let Ok((stream, _socket_addr)) = tcp.accept().await {
// TODO we can perform some amount of validation here