Merge branch 'develop' into wg/app-store-http

This commit is contained in:
dr-frmr 2024-01-25 22:52:15 -03:00
commit 64e0b905b5
No known key found for this signature in database
6 changed files with 56 additions and 11 deletions

View File

@ -5,7 +5,7 @@ version = "0.5.1"
edition = "2021" edition = "2021"
description = "A general-purpose sovereign cloud computing platform" description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org" homepage = "https://kinode.org"
repository = "https://github.com/uqbar-dao/kinode" repository = "https://github.com/kinode-dao/kinode"
license = "Apache-2.0" license = "Apache-2.0"
[build-dependencies] [build-dependencies]

View File

@ -118,6 +118,7 @@ async fn handle_request(
) -> Result<(), EthError> { ) -> Result<(), EthError> {
match action { match action {
EthAction::SubscribeLogs { sub_id, filter } => { EthAction::SubscribeLogs { sub_id, filter } => {
let sub_id = (target.process.clone(), sub_id);
if connections.ws_provider_subscriptions.contains_key(&sub_id) { if connections.ws_provider_subscriptions.contains_key(&sub_id) {
return Err(EthError::SubscriptionIdCollision); return Err(EthError::SubscriptionIdCollision);
} }
@ -133,6 +134,7 @@ async fn handle_request(
Ok(()) Ok(())
} }
EthAction::UnsubscribeLogs(sub_id) => { EthAction::UnsubscribeLogs(sub_id) => {
let sub_id = (target.process.clone(), sub_id);
let handle = connections let handle = connections
.ws_provider_subscriptions .ws_provider_subscriptions
.remove(&sub_id) .remove(&sub_id)

View File

@ -1,3 +1,4 @@
use crate::types::ProcessId;
use ethers::prelude::Provider; use ethers::prelude::Provider;
use ethers::types::{Filter, Log}; use ethers::types::{Filter, Log};
use ethers_providers::Ws; use ethers_providers::Ws;
@ -48,5 +49,5 @@ pub enum EthSubEvent {
/// Primary state object of the `eth` module /// Primary state object of the `eth` module
pub struct RpcConnections { pub struct RpcConnections {
pub provider: Provider<Ws>, pub provider: Provider<Ws>,
pub ws_provider_subscriptions: HashMap<u64, JoinHandle<Result<(), EthError>>>, pub ws_provider_subscriptions: HashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>>,
} }

View File

@ -10,7 +10,7 @@ use std::sync::Arc;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use wasmtime::component::*; use wasmtime::component::*;
use wasmtime::{Engine, Store}; use wasmtime::{Engine, Store};
use wasmtime_wasi::preview2::{Table, WasiCtx, WasiCtxBuilder, WasiView}; use wasmtime_wasi::preview2::{pipe::MemoryOutputPipe, Table, WasiCtx, WasiCtxBuilder, WasiView};
bindgen!({ bindgen!({
path: "wit", path: "wit",
@ -53,6 +53,8 @@ impl WasiView for ProcessWasi {
} }
} }
const STACK_TRACE_SIZE: usize = 5000;
pub async fn send_and_await_response( pub async fn send_and_await_response(
process: &mut ProcessWasi, process: &mut ProcessWasi,
source: Option<t::Address>, source: Option<t::Address>,
@ -492,7 +494,8 @@ pub async fn make_process_loop(
Process::add_to_linker(&mut linker, |state: &mut ProcessWasi| state).unwrap(); Process::add_to_linker(&mut linker, |state: &mut ProcessWasi| state).unwrap();
let table = Table::new(); let table = Table::new();
let wasi = WasiCtxBuilder::new().build(); let wasi_stderr = MemoryOutputPipe::new(STACK_TRACE_SIZE);
let wasi = WasiCtxBuilder::new().stderr(wasi_stderr.clone()).build();
wasmtime_wasi::preview2::command::add_to_linker(&mut linker).unwrap(); wasmtime_wasi::preview2::command::add_to_linker(&mut linker).unwrap();
@ -548,14 +551,15 @@ pub async fn make_process_loop(
.await; .await;
false false
} }
Err(e) => { Err(_) => {
let stderr = wasi_stderr.contents().into();
let stderr = String::from_utf8(stderr)?;
let _ = send_to_terminal let _ = send_to_terminal
.send(t::Printout { .send(t::Printout {
verbosity: 0, verbosity: 0,
content: format!( content: format!(
"\x1b[38;5;196mprocess {} ended with error: {}\x1b[0m", "\x1b[38;5;196mprocess {} ended with error:\x1b[0m\n{}",
metadata.our.process, metadata.our.process, stderr,
format!("{:?}", e).lines().last().unwrap(),
), ),
}) })
.await; .await;

View File

@ -95,7 +95,7 @@ async fn serve_register_fe(
async fn main() { async fn main() {
let app = Command::new("kinode") let app = Command::new("kinode")
.version(VERSION) .version(VERSION)
.author("Uqbar DAO: https://github.com/uqbar-dao") .author("Kinode DAO: https://github.com/kinode-dao")
.about("A General Purpose Sovereign Cloud Computing Platform") .about("A General Purpose Sovereign Cloud Computing Platform")
.arg(arg!([home] "Path to home directory").required(true)) .arg(arg!([home] "Path to home directory").required(true))
.arg( .arg(
@ -450,6 +450,7 @@ async fn main() {
our.name.clone(), our.name.clone(),
kernel_message_sender.clone(), kernel_message_sender.clone(),
net_message_receiver, net_message_receiver,
print_sender.clone(),
network_error_sender, network_error_sender,
)); ));
tasks.spawn(state::state_sender( tasks.spawn(state::state_sender(

View File

@ -15,6 +15,7 @@ pub async fn mock_client(
node_identity: types::NodeId, node_identity: types::NodeId,
send_to_loop: Sender, send_to_loop: Sender,
mut recv_from_loop: Receiver, mut recv_from_loop: Receiver,
print_tx: types::PrintSender,
_network_error_sender: types::NetworkErrorSender, _network_error_sender: types::NetworkErrorSender,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let url = format!("ws://127.0.0.1:{}", port); let url = format!("ws://127.0.0.1:{}", port);
@ -39,8 +40,44 @@ pub async fn mock_client(
// Deserialize and forward the message to the loop // Deserialize and forward the message to the loop
// println!("{}:mock: incoming {}\r", node_identity, message); // println!("{}:mock: incoming {}\r", node_identity, message);
if let Binary(ref bin) = message { if let Binary(ref bin) = message {
let kernel_message: types::KernelMessage = rmp_serde::from_slice(bin)?; let km: types::KernelMessage = rmp_serde::from_slice(bin)?;
send_to_loop.send(kernel_message).await?; if km.target.process == "net:distro:sys" {
if let types::Message::Request(types::Request { ref body, .. }) = km.message {
print_tx
.send(types::Printout {
verbosity: 0,
content: format!(
"\x1b[3;32m{}: {}\x1b[0m",
km.source.node,
std::str::from_utf8(body).unwrap_or("!!message parse error!!")
),
})
.await?;
send_to_loop
.send(types::KernelMessage {
id: km.id,
source: types::Address {
node: node_identity.clone(),
process: types::ProcessId::new(Some("net"), "distro", "sys"),
},
target: km.rsvp.as_ref().unwrap_or(&km.source).clone(),
rsvp: None,
message: types::Message::Response((
types::Response {
inherit: false,
body: "delivered".as_bytes().to_vec(),
metadata: None,
capabilities: vec![],
},
None,
)),
lazy_load_blob: None,
})
.await?;
}
} else {
send_to_loop.send(km).await?;
}
} }
}, },
} }