diff --git a/Cargo.toml b/Cargo.toml index 25a50976..0ed001db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ version = "0.5.1" edition = "2021" description = "A general-purpose sovereign cloud computing platform" homepage = "https://kinode.org" -repository = "https://github.com/uqbar-dao/kinode" +repository = "https://github.com/kinode-dao/kinode" license = "Apache-2.0" [build-dependencies] diff --git a/src/eth/provider.rs b/src/eth/provider.rs index 130a5c00..87ec2c04 100644 --- a/src/eth/provider.rs +++ b/src/eth/provider.rs @@ -118,6 +118,7 @@ async fn handle_request( ) -> Result<(), EthError> { match action { EthAction::SubscribeLogs { sub_id, filter } => { + let sub_id = (target.process.clone(), sub_id); if connections.ws_provider_subscriptions.contains_key(&sub_id) { return Err(EthError::SubscriptionIdCollision); } @@ -133,6 +134,7 @@ async fn handle_request( Ok(()) } EthAction::UnsubscribeLogs(sub_id) => { + let sub_id = (target.process.clone(), sub_id); let handle = connections .ws_provider_subscriptions .remove(&sub_id) diff --git a/src/eth/types.rs b/src/eth/types.rs index e1959ef1..6c2fc702 100644 --- a/src/eth/types.rs +++ b/src/eth/types.rs @@ -1,3 +1,4 @@ +use crate::types::ProcessId; use ethers::prelude::Provider; use ethers::types::{Filter, Log}; use ethers_providers::Ws; @@ -48,5 +49,5 @@ pub enum EthSubEvent { /// Primary state object of the `eth` module pub struct RpcConnections { pub provider: Provider, - pub ws_provider_subscriptions: HashMap>>, + pub ws_provider_subscriptions: HashMap<(ProcessId, u64), JoinHandle>>, } diff --git a/src/kernel/process.rs b/src/kernel/process.rs index 935d3d8e..fd3ab443 100644 --- a/src/kernel/process.rs +++ b/src/kernel/process.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use tokio::task::JoinHandle; use wasmtime::component::*; use wasmtime::{Engine, Store}; -use wasmtime_wasi::preview2::{Table, WasiCtx, WasiCtxBuilder, WasiView}; +use wasmtime_wasi::preview2::{pipe::MemoryOutputPipe, Table, WasiCtx, WasiCtxBuilder, WasiView}; bindgen!({ path: "wit", @@ -53,6 +53,8 @@ impl WasiView for ProcessWasi { } } +const STACK_TRACE_SIZE: usize = 5000; + pub async fn send_and_await_response( process: &mut ProcessWasi, source: Option, @@ -492,7 +494,8 @@ pub async fn make_process_loop( Process::add_to_linker(&mut linker, |state: &mut ProcessWasi| state).unwrap(); let table = Table::new(); - let wasi = WasiCtxBuilder::new().build(); + let wasi_stderr = MemoryOutputPipe::new(STACK_TRACE_SIZE); + let wasi = WasiCtxBuilder::new().stderr(wasi_stderr.clone()).build(); wasmtime_wasi::preview2::command::add_to_linker(&mut linker).unwrap(); @@ -548,14 +551,15 @@ pub async fn make_process_loop( .await; false } - Err(e) => { + Err(_) => { + let stderr = wasi_stderr.contents().into(); + let stderr = String::from_utf8(stderr)?; let _ = send_to_terminal .send(t::Printout { verbosity: 0, content: format!( - "\x1b[38;5;196mprocess {} ended with error: {}\x1b[0m", - metadata.our.process, - format!("{:?}", e).lines().last().unwrap(), + "\x1b[38;5;196mprocess {} ended with error:\x1b[0m\n{}", + metadata.our.process, stderr, ), }) .await; diff --git a/src/main.rs b/src/main.rs index 1a37aa9f..34381ddd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -95,7 +95,7 @@ async fn serve_register_fe( async fn main() { let app = Command::new("kinode") .version(VERSION) - .author("Uqbar DAO: https://github.com/uqbar-dao") + .author("Kinode DAO: https://github.com/kinode-dao") .about("A General Purpose Sovereign Cloud Computing Platform") .arg(arg!([home] "Path to home directory").required(true)) .arg( @@ -450,6 +450,7 @@ async fn main() { our.name.clone(), kernel_message_sender.clone(), net_message_receiver, + print_sender.clone(), network_error_sender, )); tasks.spawn(state::state_sender( diff --git a/src/net/mock.rs b/src/net/mock.rs index 82a4bbde..44e16459 100644 --- a/src/net/mock.rs +++ b/src/net/mock.rs @@ -15,6 +15,7 @@ pub async fn mock_client( node_identity: types::NodeId, send_to_loop: Sender, mut recv_from_loop: Receiver, + print_tx: types::PrintSender, _network_error_sender: types::NetworkErrorSender, ) -> anyhow::Result<()> { let url = format!("ws://127.0.0.1:{}", port); @@ -39,8 +40,44 @@ pub async fn mock_client( // Deserialize and forward the message to the loop // println!("{}:mock: incoming {}\r", node_identity, message); if let Binary(ref bin) = message { - let kernel_message: types::KernelMessage = rmp_serde::from_slice(bin)?; - send_to_loop.send(kernel_message).await?; + let km: types::KernelMessage = rmp_serde::from_slice(bin)?; + if km.target.process == "net:distro:sys" { + if let types::Message::Request(types::Request { ref body, .. }) = km.message { + print_tx + .send(types::Printout { + verbosity: 0, + content: format!( + "\x1b[3;32m{}: {}\x1b[0m", + km.source.node, + std::str::from_utf8(body).unwrap_or("!!message parse error!!") + ), + }) + .await?; + send_to_loop + .send(types::KernelMessage { + id: km.id, + source: types::Address { + node: node_identity.clone(), + process: types::ProcessId::new(Some("net"), "distro", "sys"), + }, + target: km.rsvp.as_ref().unwrap_or(&km.source).clone(), + rsvp: None, + message: types::Message::Response(( + types::Response { + inherit: false, + body: "delivered".as_bytes().to_vec(), + metadata: None, + capabilities: vec![], + }, + None, + )), + lazy_load_blob: None, + }) + .await?; + } + } else { + send_to_loop.send(km).await?; + } } }, }