mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-02 08:02:23 +03:00
add a bunch more debug prints and a debug command to kernel
This commit is contained in:
parent
2ee9b1a412
commit
5b12cc254a
@ -6,8 +6,8 @@ use serde_json::json;
|
||||
use std::collections::HashMap;
|
||||
use std::string::FromUtf8Error;
|
||||
use uqbar_process_lib::{
|
||||
await_message, get_typed_state, http, println, receive, set_state, Address, Message, Payload,
|
||||
Request, Response,
|
||||
await_message, get_typed_state, http, print_to_terminal, println, receive, set_state, Address,
|
||||
Message, Payload, Request, Response,
|
||||
};
|
||||
|
||||
wit_bindgen::generate!({
|
||||
@ -308,6 +308,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
if send {
|
||||
print_to_terminal(1, &format!("qns_indexer: sending ID to net: {:?}", node));
|
||||
Request::new()
|
||||
.target((&our.node, "net", "sys", "uqbar"))
|
||||
.try_ipc(NetActions::QnsUpdate(node.clone()))?
|
||||
|
@ -14,7 +14,7 @@ pub async fn http_client(
|
||||
our_name: String,
|
||||
send_to_loop: MessageSender,
|
||||
mut recv_in_client: MessageReceiver,
|
||||
_print_tx: PrintSender,
|
||||
print_tx: PrintSender,
|
||||
) -> Result<()> {
|
||||
let client = reqwest::Client::new();
|
||||
let our_name = Arc::new(our_name);
|
||||
@ -42,6 +42,7 @@ pub async fn http_client(
|
||||
payload,
|
||||
client.clone(),
|
||||
send_to_loop.clone(),
|
||||
print_tx.clone(),
|
||||
));
|
||||
}
|
||||
Err(anyhow::anyhow!("http_client: loop died"))
|
||||
@ -56,6 +57,7 @@ async fn handle_message(
|
||||
body: Option<Payload>,
|
||||
client: reqwest::Client,
|
||||
send_to_loop: MessageSender,
|
||||
print_tx: PrintSender,
|
||||
) {
|
||||
let req: OutgoingHttpRequest = match serde_json::from_slice(&json) {
|
||||
Ok(req) => req,
|
||||
@ -88,6 +90,13 @@ async fn handle_message(
|
||||
return;
|
||||
};
|
||||
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
content: format!("http_client: building {req_method} request to {}", req.url),
|
||||
})
|
||||
.await;
|
||||
|
||||
let mut request_builder = client.request(req_method, req.url);
|
||||
|
||||
if let Some(version) = req.version {
|
||||
@ -136,6 +145,12 @@ async fn handle_message(
|
||||
|
||||
match client.execute(request).await {
|
||||
Ok(response) => {
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
content: format!("http_client: executed request, got response"),
|
||||
})
|
||||
.await;
|
||||
let _ = send_to_loop
|
||||
.send(KernelMessage {
|
||||
id,
|
||||
@ -168,6 +183,12 @@ async fn handle_message(
|
||||
.await;
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
content: format!("http_client: executed request but got error"),
|
||||
})
|
||||
.await;
|
||||
make_error_message(
|
||||
our,
|
||||
id,
|
||||
|
@ -276,10 +276,12 @@ async fn ws_handler(
|
||||
print_tx: PrintSender,
|
||||
) -> Result<impl warp::Reply, warp::Rejection> {
|
||||
let original_path = normalize_path(path.as_str());
|
||||
let _ = print_tx.send(Printout {
|
||||
verbosity: 1,
|
||||
content: format!("got ws request for {original_path}"),
|
||||
});
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
content: format!("http_server: got ws request for {original_path}"),
|
||||
})
|
||||
.await;
|
||||
|
||||
let serialized_headers = serialize_headers(&headers);
|
||||
let ws_path_bindings = ws_path_bindings.read().await;
|
||||
@ -294,7 +296,7 @@ async fn ws_handler(
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
content: format!(
|
||||
"got request for path {original_path} bound by subdomain {subdomain}"
|
||||
"http_server: ws request for {original_path} bound by subdomain {subdomain}"
|
||||
),
|
||||
})
|
||||
.await;
|
||||
@ -361,7 +363,7 @@ async fn http_handler(
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
content: format!("got request for path {original_path}"),
|
||||
content: format!("http_server: got request for path {original_path}"),
|
||||
})
|
||||
.await;
|
||||
let id: u64 = rand::random();
|
||||
@ -389,7 +391,9 @@ async fn http_handler(
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
content: format!("redirecting request from {socket_addr:?} to login page"),
|
||||
content: format!(
|
||||
"http_server: redirecting request from {socket_addr:?} to login page"
|
||||
),
|
||||
})
|
||||
.await;
|
||||
return Ok(warp::http::Response::builder()
|
||||
@ -411,7 +415,7 @@ async fn http_handler(
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
content: format!(
|
||||
"got request for path {original_path} bound by subdomain {subdomain}"
|
||||
"http_server: request for {original_path} bound by subdomain {subdomain}"
|
||||
),
|
||||
})
|
||||
.await;
|
||||
@ -454,7 +458,7 @@ async fn http_handler(
|
||||
// we extract message from base64 encoded bytes in data
|
||||
// and send it to the correct app.
|
||||
let message = if bound_path.app == "rpc:sys:uqbar" {
|
||||
match handle_rpc_message(our, id, body).await {
|
||||
match handle_rpc_message(our, id, body, print_tx).await {
|
||||
Ok(message) => message,
|
||||
Err(e) => {
|
||||
return Ok(warp::reply::with_status(vec![], e).into_response());
|
||||
@ -560,6 +564,7 @@ async fn handle_rpc_message(
|
||||
our: Arc<String>,
|
||||
id: u64,
|
||||
body: warp::hyper::body::Bytes,
|
||||
print_tx: PrintSender,
|
||||
) -> Result<KernelMessage, StatusCode> {
|
||||
let Ok(rpc_message) = serde_json::from_slice::<RpcMessage>(&body) else {
|
||||
return Err(StatusCode::BAD_REQUEST);
|
||||
@ -569,6 +574,13 @@ async fn handle_rpc_message(
|
||||
return Err(StatusCode::BAD_REQUEST);
|
||||
};
|
||||
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 2,
|
||||
content: format!("http_server: passing on RPC message to {target_process}"),
|
||||
})
|
||||
.await;
|
||||
|
||||
let payload: Option<Payload> = match rpc_message.data {
|
||||
None => None,
|
||||
Some(b64_bytes) => match base64::decode(b64_bytes) {
|
||||
@ -619,17 +631,18 @@ async fn maintain_websocket(
|
||||
print_tx: PrintSender,
|
||||
) {
|
||||
let (mut write_stream, mut read_stream) = ws.split();
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
content: format!("got new client websocket connection"),
|
||||
})
|
||||
.await;
|
||||
|
||||
let channel_id: u32 = rand::random();
|
||||
let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100);
|
||||
ws_senders.insert(channel_id, (app.clone(), ws_sender));
|
||||
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
content: format!("http_server: new websocket connection to {app} with id {channel_id}"),
|
||||
})
|
||||
.await;
|
||||
|
||||
let _ = send_to_loop
|
||||
.send(KernelMessage {
|
||||
id: rand::random(),
|
||||
@ -654,10 +667,6 @@ async fn maintain_websocket(
|
||||
})
|
||||
.await;
|
||||
|
||||
let _ = print_tx.send(Printout {
|
||||
verbosity: 1,
|
||||
content: format!("websocket channel {channel_id} opened"),
|
||||
});
|
||||
loop {
|
||||
tokio::select! {
|
||||
read = read_stream.next() => {
|
||||
@ -707,6 +716,12 @@ async fn maintain_websocket(
|
||||
}
|
||||
}
|
||||
}
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 1,
|
||||
content: format!("http_server: websocket connection {channel_id} closed"),
|
||||
})
|
||||
.await;
|
||||
let stream = write_stream.reunite(read_stream).unwrap();
|
||||
let _ = stream.close().await;
|
||||
}
|
||||
|
@ -381,7 +381,7 @@ async fn handle_kernel_request(
|
||||
None => {
|
||||
let _ = send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 2,
|
||||
verbosity: 0,
|
||||
content: format!("kernel: no such process {:?} to kill", process_id),
|
||||
})
|
||||
.await;
|
||||
@ -424,6 +424,48 @@ async fn handle_kernel_request(
|
||||
.await
|
||||
.expect("event loop: fatal: sender died");
|
||||
}
|
||||
t::KernelCommand::Debug(kind) => match kind {
|
||||
t::KernelPrint::ProcessMap => {
|
||||
let _ = send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 0,
|
||||
content: format!("kernel process map:\r\n{:?}", process_map),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
t::KernelPrint::Process(process_id) => {
|
||||
let Some(proc) = process_map.get(&process_id) else {
|
||||
let _ = send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 0,
|
||||
content: format!("kernel: no such running process {}", process_id),
|
||||
})
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
let _ = send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 0,
|
||||
content: format!("kernel process info:\r\n{proc:?}",),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
t::KernelPrint::HasCap { on, cap } => {
|
||||
let _ = send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 0,
|
||||
content: format!(
|
||||
"process {} has cap:\r\n{}",
|
||||
on,
|
||||
process_map
|
||||
.get(&on)
|
||||
.map(|p| p.capabilities.contains(&cap))
|
||||
.unwrap_or(false)
|
||||
),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -460,7 +460,7 @@ pub async fn make_process_loop(
|
||||
Ok(()) => {
|
||||
let _ = send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 2,
|
||||
verbosity: 1,
|
||||
content: format!("process {} returned without error", metadata.our.process,),
|
||||
})
|
||||
.await;
|
||||
|
@ -1,5 +1,6 @@
|
||||
use crate::kernel::process;
|
||||
use crate::kernel::process::uqbar::process::standard as wit;
|
||||
use crate::kernel::process::StandardHost;
|
||||
use crate::types as t;
|
||||
use crate::types::STATE_PROCESS_ID;
|
||||
use crate::KERNEL_PROCESS_ID;
|
||||
@ -8,7 +9,15 @@ use anyhow::Result;
|
||||
use ring::signature::{self, KeyPair};
|
||||
use std::collections::HashSet;
|
||||
|
||||
use crate::kernel::process::StandardHost;
|
||||
async fn print_debug(proc: &process::ProcessState, content: &str) {
|
||||
let _ = proc
|
||||
.send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 2,
|
||||
content: format!("{}: {}", proc.metadata.our.process, content),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
///
|
||||
/// create the process API. this is where the functions that a process can use live.
|
||||
@ -19,15 +28,11 @@ impl StandardHost for process::ProcessWasi {
|
||||
// system utils:
|
||||
//
|
||||
async fn print_to_terminal(&mut self, verbosity: u8, content: String) -> Result<()> {
|
||||
match self
|
||||
.process
|
||||
self.process
|
||||
.send_to_terminal
|
||||
.send(t::Printout { verbosity, content })
|
||||
.await
|
||||
{
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) => Err(anyhow::anyhow!("fatal: couldn't send to terminal: {:?}", e)),
|
||||
}
|
||||
.map_err(|e| anyhow::anyhow!("fatal: couldn't send to terminal: {e:?}"))
|
||||
}
|
||||
|
||||
async fn get_eth_block(&mut self) -> Result<u64> {
|
||||
@ -39,9 +44,10 @@ impl StandardHost for process::ProcessWasi {
|
||||
// process management:
|
||||
//
|
||||
|
||||
/// TODO critical: move to kernel logic to enable persistence of choice made here
|
||||
/// TODO critical: move to kernel logic to enable persistence of choice made here
|
||||
async fn set_on_exit(&mut self, on_exit: wit::OnExit) -> Result<()> {
|
||||
self.process.metadata.on_exit = t::OnExit::de_wit(on_exit);
|
||||
print_debug(&self.process, "set new on-exit behavior").await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -126,6 +132,7 @@ impl StandardHost for process::ProcessWasi {
|
||||
)),
|
||||
};
|
||||
self.process.last_payload = old_last_payload;
|
||||
print_debug(&self.process, "persisted state").await;
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -165,6 +172,7 @@ impl StandardHost for process::ProcessWasi {
|
||||
)),
|
||||
};
|
||||
self.process.last_payload = old_last_payload;
|
||||
print_debug(&self.process, "cleared persisted state").await;
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -357,6 +365,7 @@ impl StandardHost for process::ProcessWasi {
|
||||
.await
|
||||
.unwrap();
|
||||
let _ = rx.await.unwrap();
|
||||
print_debug(&self.process, "spawned a new process").await;
|
||||
Ok(Ok(new_process_id.en_wit().to_owned()))
|
||||
}
|
||||
|
||||
|
@ -124,7 +124,7 @@ pub async fn maintain_connection(
|
||||
let mut conn = conn.write_stream.reunite(conn.read_stream).unwrap();
|
||||
let _ = conn.close(None).await;
|
||||
|
||||
print_debug(&print_tx, &format!("net: connection with {peer_name} died")).await;
|
||||
print_debug(&print_tx, &format!("net: connection with {peer_name} closed")).await;
|
||||
peers.remove(&peer_name);
|
||||
}
|
||||
|
||||
|
@ -66,6 +66,10 @@ pub async fn timer_service(
|
||||
send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await;
|
||||
continue
|
||||
}
|
||||
let _ = print_tx.send(Printout {
|
||||
verbosity: 1,
|
||||
content: format!("set timer to pop in {}ms", timer_millis),
|
||||
}).await;
|
||||
if !timer_map.contains(pop_time) {
|
||||
timer_tasks.spawn(async move {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(timer_millis - 1)).await;
|
||||
|
12
src/types.rs
12
src/types.rs
@ -825,6 +825,18 @@ pub enum KernelCommand {
|
||||
/// RUNTIME ONLY: notify the kernel that the runtime is shutting down and it
|
||||
/// should gracefully stop and persist the running processes.
|
||||
Shutdown,
|
||||
/// Ask kernel to produce debugging information
|
||||
Debug(KernelPrint),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum KernelPrint {
|
||||
ProcessMap,
|
||||
Process(ProcessId),
|
||||
HasCap {
|
||||
on: ProcessId,
|
||||
cap: Capability,
|
||||
},
|
||||
}
|
||||
|
||||
/// IPC format for all KernelCommand responses
|
||||
|
Loading…
Reference in New Issue
Block a user