Merge pull request #617 from kinode-dao/hf/terminal-per-process-verbosity

terminal: per process verbosity
This commit is contained in:
nick.kino 2024-12-10 13:27:13 -08:00 committed by GitHub
commit e971e92106
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 618 additions and 107 deletions

4
Cargo.lock generated
View File

@ -3965,7 +3965,7 @@ dependencies = [
[[package]] [[package]]
name = "kinode_process_lib" name = "kinode_process_lib"
version = "0.10.0" version = "0.10.0"
source = "git+https://github.com/kinode-dao/process_lib?rev=5d6e2af#5d6e2af88ef0716822cc1638be5b71a1ec9c4fb3" source = "git+https://github.com/kinode-dao/process_lib?rev=8a5b040#8a5b040dd699ae69d5bc1826fdd6e0bd5d84e766"
dependencies = [ dependencies = [
"alloy 0.1.4", "alloy 0.1.4",
"alloy-primitives 0.7.7", "alloy-primitives 0.7.7",
@ -4265,7 +4265,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"clap", "clap",
"kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=5d6e2af)", "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=8a5b040)",
"regex", "regex",
"serde", "serde",
"serde_json", "serde_json",

View File

@ -154,6 +154,8 @@ The `sys` publisher is not a real node ID, but it's also not a special case valu
- UpArrow/DownArrow or CTRL+P/CTRL+N to move up and down through command history - UpArrow/DownArrow or CTRL+P/CTRL+N to move up and down through command history
- CTRL+R to search history, CTRL+R again to toggle through search results, CTRL+G to cancel search - CTRL+R to search history, CTRL+R again to toggle through search results, CTRL+G to cancel search
- CTRL+W to set process-level verbosities that override the verbosity mode set with CTRL+V (0-3, 0 is default and lowest verbosity)
### Built-in terminal scripts ### Built-in terminal scripts
The terminal package contains a number of built-in scripts. The terminal package contains a number of built-in scripts.

View File

@ -1682,7 +1682,7 @@ dependencies = [
[[package]] [[package]]
name = "kinode_process_lib" name = "kinode_process_lib"
version = "0.10.0" version = "0.10.0"
source = "git+https://github.com/kinode-dao/process_lib?rev=5d6e2af#5d6e2af88ef0716822cc1638be5b71a1ec9c4fb3" source = "git+https://github.com/kinode-dao/process_lib?rev=8a5b040#8a5b040dd699ae69d5bc1826fdd6e0bd5d84e766"
dependencies = [ dependencies = [
"alloy", "alloy",
"alloy-primitives 0.7.7", "alloy-primitives 0.7.7",
@ -1762,7 +1762,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"clap", "clap",
"kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=5d6e2af)", "kinode_process_lib 0.10.0 (git+https://github.com/kinode-dao/process_lib?rev=8a5b040)",
"regex", "regex",
"serde", "serde",
"serde_json", "serde_json",

View File

@ -9,7 +9,7 @@ simulation-mode = []
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
clap = "4.4" clap = "4.4"
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "5d6e2af" } kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "8a5b040" }
regex = "1.10.3" regex = "1.10.3"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"

View File

@ -1138,10 +1138,11 @@ async fn check_for_root_cap(
async fn verbose_print(print_tx: &PrintSender, content: &str) { async fn verbose_print(print_tx: &PrintSender, content: &str) {
let _ = print_tx let _ = print_tx
.send(Printout { .send(Printout::new(
verbosity: 2, 2,
content: content.to_string(), NET_PROCESS_ID.clone(),
}) content.to_string(),
))
.await; .await;
} }

View File

@ -190,9 +190,13 @@ async fn build_subscription(
return Err(EthError::PermissionDenied); // will never hit return Err(EthError::PermissionDenied); // will never hit
}; };
if *kind == SubscriptionKind::NewHeads { if *kind == SubscriptionKind::NewHeads {
Printout::new(0, format!("newHeads subscription requested by {target}!")) Printout::new(
.send(print_tx) 0,
.await; ETH_PROCESS_ID.clone(),
format!("newHeads subscription requested by {target}!"),
)
.send(print_tx)
.await;
} }
let mut urls = { let mut urls = {
// in code block to drop providers lock asap to avoid deadlock // in code block to drop providers lock asap to avoid deadlock

View File

@ -145,10 +145,10 @@ pub async fn fd_manager(
&send_to_loop, &send_to_loop,
).await { ).await {
Ok(Some(to_print)) => { Ok(Some(to_print)) => {
Printout::new(2, to_print).send(&send_to_terminal).await; Printout::new(2, FD_MANAGER_PROCESS_ID.clone(), to_print).send(&send_to_terminal).await;
} }
Err(e) => { Err(e) => {
Printout::new(1, &format!("handle_message error: {e:?}")) Printout::new(1, FD_MANAGER_PROCESS_ID.clone(), &format!("handle_message error: {e:?}"))
.send(&send_to_terminal) .send(&send_to_terminal)
.await; .await;
} }
@ -163,7 +163,7 @@ pub async fn fd_manager(
state.update_all_fds_limits(our_node.as_str(), &send_to_loop).await; state.update_all_fds_limits(our_node.as_str(), &send_to_loop).await;
} }
} }
Err(e) => Printout::new(1, &format!("update_max_fds error: {e:?}")) Err(e) => Printout::new(1, FD_MANAGER_PROCESS_ID.clone(), &format!("update_max_fds error: {e:?}"))
.send(&send_to_terminal) .send(&send_to_terminal)
.await, .await,
} }
@ -173,13 +173,19 @@ pub async fn fd_manager(
if let Some(message) = recv_from_loop.recv().await { if let Some(message) = recv_from_loop.recv().await {
match handle_message(&our_node, message, &mut state, &send_to_loop).await { match handle_message(&our_node, message, &mut state, &send_to_loop).await {
Ok(Some(to_print)) => { Ok(Some(to_print)) => {
Printout::new(2, to_print).send(&send_to_terminal).await; Printout::new(2, FD_MANAGER_PROCESS_ID.clone(), to_print)
}
Err(e) => {
Printout::new(1, &format!("handle_message error: {e:?}"))
.send(&send_to_terminal) .send(&send_to_terminal)
.await; .await;
} }
Err(e) => {
Printout::new(
1,
FD_MANAGER_PROCESS_ID.clone(),
&format!("handle_message error: {e:?}"),
)
.send(&send_to_terminal)
.await;
}
_ => {} _ => {}
} }
} }

View File

@ -217,10 +217,11 @@ async fn connect_websocket(
Ok((ws_stream, _)) => ws_stream, Ok((ws_stream, _)) => ws_stream,
Err(e) => { Err(e) => {
let _ = print_tx let _ = print_tx
.send(Printout { .send(Printout::new(
verbosity: 1, 1,
content: format!("http-client: underlying lib connection error {e:?}"), HTTP_CLIENT_PROCESS_ID.clone(),
}) format!("http-client: underlying lib connection error {e:?}"),
))
.await; .await;
return Err(HttpClientError::WsOpenFailed { return Err(HttpClientError::WsOpenFailed {
@ -404,10 +405,11 @@ async fn handle_http_request(
}; };
let _ = print_tx let _ = print_tx
.send(Printout { .send(Printout::new(
verbosity: 2, 2,
content: format!("http-client: {req_method} request to {}", url), HTTP_CLIENT_PROCESS_ID.clone(),
}) format!("http-client: {req_method} request to {}", url),
))
.await; .await;
// Build the request // Build the request
@ -498,10 +500,11 @@ async fn handle_http_request(
} }
Err(e) => { Err(e) => {
let _ = print_tx let _ = print_tx
.send(Printout { .send(Printout::new(
verbosity: 2, 2,
content: "http-client: executed request but got error".to_string(), HTTP_CLIENT_PROCESS_ID.clone(),
}) "http-client: executed request but got error".to_string(),
))
.await; .await;
// Forward the error to the target process // Forward the error to the target process
http_error_message( http_error_message(

View File

@ -254,9 +254,13 @@ async fn serve(
send_to_loop: MessageSender, send_to_loop: MessageSender,
print_tx: PrintSender, print_tx: PrintSender,
) { ) {
Printout::new(0, format!("http-server: running on port {our_port}")) Printout::new(
.send(&print_tx) 0,
.await; HTTP_SERVER_PROCESS_ID.clone(),
format!("http-server: running on port {our_port}"),
)
.send(&print_tx)
.await;
// filter to receive websockets // filter to receive websockets
let cloned_our = our.clone(); let cloned_our = our.clone();
@ -450,13 +454,18 @@ async fn ws_handler(
print_tx: PrintSender, print_tx: PrintSender,
) -> Result<impl warp::Reply, warp::Rejection> { ) -> Result<impl warp::Reply, warp::Rejection> {
let original_path = utils::normalize_path(path.as_str()); let original_path = utils::normalize_path(path.as_str());
Printout::new(2, format!("http-server: ws request for {original_path}")) Printout::new(
.send(&print_tx) 2,
.await; HTTP_SERVER_PROCESS_ID.clone(),
format!("http-server: ws request for {original_path}"),
)
.send(&print_tx)
.await;
if ws_senders.len() >= WS_SELF_IMPOSED_MAX_CONNECTIONS as usize { if ws_senders.len() >= WS_SELF_IMPOSED_MAX_CONNECTIONS as usize {
Printout::new( Printout::new(
0, 0,
HTTP_SERVER_PROCESS_ID.clone(),
format!( format!(
"http-server: too many open websockets ({})! rejecting incoming", "http-server: too many open websockets ({})! rejecting incoming",
ws_senders.len() ws_senders.len()
@ -559,9 +568,13 @@ async fn http_handler(
) -> Result<impl warp::Reply, warp::Rejection> { ) -> Result<impl warp::Reply, warp::Rejection> {
let original_path = utils::normalize_path(path.as_str()); let original_path = utils::normalize_path(path.as_str());
let base_path = original_path.split('/').skip(1).next().unwrap_or(""); let base_path = original_path.split('/').skip(1).next().unwrap_or("");
Printout::new(2, format!("http-server: request for {original_path}")) Printout::new(
.send(&print_tx) 2,
.await; HTTP_SERVER_PROCESS_ID.clone(),
format!("http-server: request for {original_path}"),
)
.send(&print_tx)
.await;
let id: u64 = rand::random(); let id: u64 = rand::random();
let serialized_headers = utils::serialize_headers(&headers); let serialized_headers = utils::serialize_headers(&headers);
@ -577,6 +590,7 @@ async fn http_handler(
} else { } else {
Printout::new( Printout::new(
2, 2,
HTTP_SERVER_PROCESS_ID.clone(),
format!("http-server: no route found for {original_path}"), format!("http-server: no route found for {original_path}"),
) )
.send(&print_tx) .send(&print_tx)
@ -816,6 +830,7 @@ async fn handle_rpc_message(
Printout::new( Printout::new(
2, 2,
HTTP_SERVER_PROCESS_ID.clone(),
format!("http-server: passing on RPC message to {target_process}"), format!("http-server: passing on RPC message to {target_process}"),
) )
.send(&print_tx) .send(&print_tx)
@ -988,6 +1003,7 @@ async fn maintain_websocket(
Printout::new( Printout::new(
2, 2,
HTTP_SERVER_PROCESS_ID.clone(),
format!("http-server: new websocket connection to {app} with id {channel_id}"), format!("http-server: new websocket connection to {app} with id {channel_id}"),
) )
.send(&print_tx) .send(&print_tx)
@ -1062,6 +1078,7 @@ async fn maintain_websocket(
} }
Printout::new( Printout::new(
2, 2,
HTTP_SERVER_PROCESS_ID.clone(),
format!("http-server: websocket connection {channel_id} closed"), format!("http-server: websocket connection {channel_id} closed"),
) )
.send(&print_tx) .send(&print_tx)
@ -1197,6 +1214,7 @@ async fn handle_app_message(
let mut path_bindings = path_bindings.write().await; let mut path_bindings = path_bindings.write().await;
Printout::new( Printout::new(
2, 2,
HTTP_SERVER_PROCESS_ID.clone(),
format!( format!(
"http: binding {path}, {}, {}, {}", "http: binding {path}, {}, {}, {}",
if authenticated { if authenticated {
@ -1268,6 +1286,7 @@ async fn handle_app_message(
let mut path_bindings = path_bindings.write().await; let mut path_bindings = path_bindings.write().await;
Printout::new( Printout::new(
2, 2,
HTTP_SERVER_PROCESS_ID.clone(),
format!( format!(
"http: binding subdomain {subdomain} with path {path}, {}", "http: binding subdomain {subdomain} with path {path}, {}",
if cache { "cached" } else { "dynamic" }, if cache { "cached" } else { "dynamic" },

View File

@ -105,9 +105,13 @@ async fn handle_kernel_request(
}; };
let command: t::KernelCommand = match serde_json::from_slice(&request.body) { let command: t::KernelCommand = match serde_json::from_slice(&request.body) {
Err(e) => { Err(e) => {
t::Printout::new(0, format!("kernel: couldn't parse command: {e:?}")) t::Printout::new(
.send(send_to_terminal) 0,
.await; KERNEL_PROCESS_ID.clone(),
format!("kernel: couldn't parse command: {e:?}"),
)
.send(send_to_terminal)
.await;
return None; return None;
} }
Ok(c) => c, Ok(c) => c,
@ -159,9 +163,13 @@ async fn handle_kernel_request(
public, public,
} => { } => {
let Some(blob) = km.lazy_load_blob else { let Some(blob) = km.lazy_load_blob else {
t::Printout::new(0, "kernel: process startup requires bytes") t::Printout::new(
.send(send_to_terminal) 0,
.await; KERNEL_PROCESS_ID.clone(),
"kernel: process startup requires bytes",
)
.send(send_to_terminal)
.await;
// fire an error back // fire an error back
t::KernelMessage::builder() t::KernelMessage::builder()
.id(km.id) .id(km.id)
@ -184,7 +192,7 @@ async fn handle_kernel_request(
return None; return None;
}; };
if let Err(e) = t::check_process_id_kimap_safe(&id) { if let Err(e) = t::check_process_id_kimap_safe(&id) {
t::Printout::new(0, &format!("kernel: {e}")) t::Printout::new(0, KERNEL_PROCESS_ID.clone(), &format!("kernel: {e}"))
.send(send_to_terminal) .send(send_to_terminal)
.await; .await;
// fire an error back // fire an error back
@ -228,6 +236,7 @@ async fn handle_kernel_request(
None => { None => {
t::Printout::new( t::Printout::new(
0, 0,
KERNEL_PROCESS_ID.clone(),
format!( format!(
"kernel: InitializeProcess caller {} doesn't have capability {}", "kernel: InitializeProcess caller {} doesn't have capability {}",
km.source.process, km.source.process,
@ -301,9 +310,13 @@ async fn handle_kernel_request(
t::KernelResponse::InitializedProcess t::KernelResponse::InitializedProcess
} }
Err(e) => { Err(e) => {
t::Printout::new(0, format!("kernel: error initializing process: {e:?}")) t::Printout::new(
.send(send_to_terminal) 0,
.await; KERNEL_PROCESS_ID.clone(),
format!("kernel: error initializing process: {e:?}"),
)
.send(send_to_terminal)
.await;
t::KernelResponse::InitializeProcessError t::KernelResponse::InitializeProcessError
} }
}; };
@ -381,9 +394,13 @@ async fn handle_kernel_request(
t::KernelResponse::RunProcessError t::KernelResponse::RunProcessError
} }
} else { } else {
t::Printout::new(0, format!("kernel: no such process {process_id} to run")) t::Printout::new(
.send(send_to_terminal) 0,
.await; KERNEL_PROCESS_ID.clone(),
format!("kernel: no such process {process_id} to run"),
)
.send(send_to_terminal)
.await;
t::KernelResponse::RunProcessError t::KernelResponse::RunProcessError
}; };
t::KernelMessage::builder() t::KernelMessage::builder()
@ -416,9 +433,13 @@ async fn handle_kernel_request(
let process_handle = match process_handles.remove(&process_id) { let process_handle = match process_handles.remove(&process_id) {
Some(ph) => ph, Some(ph) => ph,
None => { None => {
t::Printout::new(2, format!("kernel: no such process {process_id} to kill")) t::Printout::new(
.send(send_to_terminal) 2,
.await; KERNEL_PROCESS_ID.clone(),
format!("kernel: no such process {process_id} to kill"),
)
.send(send_to_terminal)
.await;
return None; return None;
} }
}; };
@ -435,14 +456,22 @@ async fn handle_kernel_request(
.expect("event loop: fatal: sender died"); .expect("event loop: fatal: sender died");
} }
if request.expects_response.is_none() { if request.expects_response.is_none() {
t::Printout::new(2, format!("kernel: killing process {process_id}")) t::Printout::new(
.send(send_to_terminal) 2,
.await; KERNEL_PROCESS_ID.clone(),
return None; format!("kernel: killing process {process_id}"),
} )
t::Printout::new(0, format!("kernel: killing process {process_id}"))
.send(send_to_terminal) .send(send_to_terminal)
.await; .await;
return None;
}
t::Printout::new(
0,
KERNEL_PROCESS_ID.clone(),
format!("kernel: killing process {process_id}"),
)
.send(send_to_terminal)
.await;
t::KernelMessage::builder() t::KernelMessage::builder()
.id(km.id) .id(km.id)
.source(("our", KERNEL_PROCESS_ID.clone())) .source(("our", KERNEL_PROCESS_ID.clone()))
@ -628,6 +657,8 @@ pub async fn kernel(
// skip sending prints for every event. // skip sending prints for every event.
let mut print_full_event_loop: bool = true; let mut print_full_event_loop: bool = true;
let mut print_full_event_loop_for_process: HashSet<t::ProcessId> = HashSet::new();
// create a list of processes which are successfully rebooted, // create a list of processes which are successfully rebooted,
// keeping only them in the updated post-boot process map // keeping only them in the updated post-boot process map
let mut non_rebooted_processes: HashSet<t::ProcessId> = HashSet::new(); let mut non_rebooted_processes: HashSet<t::ProcessId> = HashSet::new();
@ -655,6 +686,7 @@ pub async fn kernel(
Err(e) => { Err(e) => {
t::Printout::new( t::Printout::new(
0, 0,
KERNEL_PROCESS_ID.clone(),
format!("kernel: couldn't read wasm bytes for process: {process_id} at {path:?}: {e}"), format!("kernel: couldn't read wasm bytes for process: {process_id} at {path:?}: {e}"),
) )
.send(&send_to_terminal) .send(&send_to_terminal)
@ -717,9 +749,13 @@ pub async fn kernel(
{ {
Ok(()) => {} Ok(()) => {}
Err(e) => { Err(e) => {
t::Printout::new(0, format!("kernel: couldn't reboot process: {e}")) t::Printout::new(
.send(&send_to_terminal) 0,
.await; KERNEL_PROCESS_ID.clone(),
format!("kernel: couldn't reboot process: {e}"),
)
.send(&send_to_terminal)
.await;
non_rebooted_processes.insert(process_id.clone()); non_rebooted_processes.insert(process_id.clone());
} }
} }
@ -780,6 +816,13 @@ pub async fn kernel(
t::DebugCommand::ToggleEventLoop => { t::DebugCommand::ToggleEventLoop => {
print_full_event_loop = !print_full_event_loop; print_full_event_loop = !print_full_event_loop;
} }
t::DebugCommand::ToggleEventLoopForProcess(ref process) => {
if print_full_event_loop_for_process.contains(process) {
print_full_event_loop_for_process.remove(process);
} else {
print_full_event_loop_for_process.insert(process.clone());
}
}
} }
}, },
// network error message receiver: handle `timeout` and `offline` errors // network error message receiver: handle `timeout` and `offline` errors
@ -788,7 +831,11 @@ pub async fn kernel(
Some(wrapped_network_error) = network_error_recv.recv() => { Some(wrapped_network_error) = network_error_recv.recv() => {
// display every single event when verbose // display every single event when verbose
if print_full_event_loop { if print_full_event_loop {
t::Printout::new(3, format!("{wrapped_network_error:?}")).send(&send_to_terminal).await; t::Printout::new(3, KERNEL_PROCESS_ID.clone(), format!("{wrapped_network_error:?}")).send(&send_to_terminal).await;
} else if print_full_event_loop_for_process.contains(&wrapped_network_error.source.process) && wrapped_network_error.source.node == our.name {
t::Printout::new(3, wrapped_network_error.source.process.clone(), format!("{wrapped_network_error:?}")).send(&send_to_terminal).await;
} else if print_full_event_loop_for_process.contains(&wrapped_network_error.error.target.process) && wrapped_network_error.error.target.node == our.name {
t::Printout::new(3, wrapped_network_error.error.target.process.clone(), format!("{wrapped_network_error:?}")).send(&send_to_terminal).await;
} }
// forward the error to the relevant process // forward the error to the relevant process
match senders.get(&wrapped_network_error.source.process) { match senders.get(&wrapped_network_error.source.process) {
@ -803,6 +850,7 @@ pub async fn kernel(
None => { None => {
t::Printout::new( t::Printout::new(
0, 0,
KERNEL_PROCESS_ID.clone(),
format!( format!(
"event loop: {} failed to deliver a message {}; but process has already terminated", "event loop: {} failed to deliver a message {}; but process has already terminated",
wrapped_network_error.source.process, wrapped_network_error.source.process,
@ -841,6 +889,7 @@ pub async fn kernel(
// capabilities are not correct! skip this message. // capabilities are not correct! skip this message.
t::Printout::new( t::Printout::new(
0, 0,
KERNEL_PROCESS_ID.clone(),
format!( format!(
"event loop: process {} doesn't have capability to send networked messages", "event loop: process {} doesn't have capability to send networked messages",
kernel_message.source.process kernel_message.source.process
@ -848,6 +897,7 @@ pub async fn kernel(
).send(&send_to_terminal).await; ).send(&send_to_terminal).await;
t::Printout::new( t::Printout::new(
0, 0,
KERNEL_PROCESS_ID.clone(),
format!("their capabilities: {:?}", proc.capabilities) format!("their capabilities: {:?}", proc.capabilities)
).send(&send_to_terminal).await; ).send(&send_to_terminal).await;
throw_timeout(&our.name, &senders, kernel_message).await; throw_timeout(&our.name, &senders, kernel_message).await;
@ -860,6 +910,7 @@ pub async fn kernel(
let Some(persisted) = process_map.get(&kernel_message.target.process) else { let Some(persisted) = process_map.get(&kernel_message.target.process) else {
t::Printout::new( t::Printout::new(
2, 2,
KERNEL_PROCESS_ID.clone(),
format!( format!(
"event loop: got {} from network for {}, but process does not exist{}", "event loop: got {} from network for {}, but process does not exist{}",
match kernel_message.message { match kernel_message.message {
@ -882,6 +933,7 @@ pub async fn kernel(
// capabilities are not correct! skip this message. // capabilities are not correct! skip this message.
t::Printout::new( t::Printout::new(
0, 0,
KERNEL_PROCESS_ID.clone(),
format!( format!(
"event loop: process {} got a message from over the network, but doesn't have capability to receive networked messages", "event loop: process {} got a message from over the network, but doesn't have capability to receive networked messages",
kernel_message.target.process kernel_message.target.process
@ -903,6 +955,7 @@ pub async fn kernel(
let Some(persisted_target) = process_map.get(&kernel_message.target.process) else { let Some(persisted_target) = process_map.get(&kernel_message.target.process) else {
t::Printout::new( t::Printout::new(
2, 2,
KERNEL_PROCESS_ID.clone(),
format!( format!(
"event loop: process {} sent message to non-existing {}; dropping message", "event loop: process {} sent message to non-existing {}; dropping message",
kernel_message.source.process, kernel_message.target.process kernel_message.source.process, kernel_message.target.process
@ -918,6 +971,7 @@ pub async fn kernel(
// capabilities are not correct! skip this message. // capabilities are not correct! skip this message.
t::Printout::new( t::Printout::new(
0, 0,
KERNEL_PROCESS_ID.clone(),
format!( format!(
"event loop: process {} doesn't have capability to message process {}", "event loop: process {} doesn't have capability to message process {}",
kernel_message.source.process, kernel_message.target.process kernel_message.source.process, kernel_message.target.process
@ -937,11 +991,22 @@ pub async fn kernel(
t::DebugCommand::ToggleStepthrough => in_stepthrough_mode = !in_stepthrough_mode, t::DebugCommand::ToggleStepthrough => in_stepthrough_mode = !in_stepthrough_mode,
t::DebugCommand::Step => break, t::DebugCommand::Step => break,
t::DebugCommand::ToggleEventLoop => print_full_event_loop = !print_full_event_loop, t::DebugCommand::ToggleEventLoop => print_full_event_loop = !print_full_event_loop,
t::DebugCommand::ToggleEventLoopForProcess(ref process) => {
if print_full_event_loop_for_process.contains(process) {
print_full_event_loop_for_process.remove(process);
} else {
print_full_event_loop_for_process.insert(process.clone());
}
}
} }
} }
// display every single event when verbose // display every single event when verbose
if print_full_event_loop { if print_full_event_loop {
t::Printout::new(3, format!("{kernel_message}")).send(&send_to_terminal).await; t::Printout::new(3, KERNEL_PROCESS_ID.clone(), format!("{kernel_message}")).send(&send_to_terminal).await;
} else if print_full_event_loop_for_process.contains(&kernel_message.source.process) && kernel_message.source.node == our.name {
t::Printout::new(3, kernel_message.source.process.clone(), format!("{kernel_message}")).send(&send_to_terminal).await;
} else if print_full_event_loop_for_process.contains(&kernel_message.target.process) && kernel_message.target.node == our.name {
t::Printout::new(3, kernel_message.target.process.clone(), format!("{kernel_message}")).send(&send_to_terminal).await;
} }
if our.name != kernel_message.target.node { if our.name != kernel_message.target.node {
@ -982,6 +1047,7 @@ pub async fn kernel(
None => { None => {
t::Printout::new( t::Printout::new(
0, 0,
KERNEL_PROCESS_ID.clone(),
format!( format!(
"event loop: got {} from {:?} for {:?}, but target doesn't exist (perhaps it terminated): {}", "event loop: got {} from {:?} for {:?}, but target doesn't exist (perhaps it terminated): {}",
match kernel_message.message { match kernel_message.message {
@ -1001,7 +1067,19 @@ pub async fn kernel(
// capabilities oracle: handles all requests to add, drop, and check capabilities // capabilities oracle: handles all requests to add, drop, and check capabilities
Some(cap_message) = caps_oracle_receiver.recv() => { Some(cap_message) = caps_oracle_receiver.recv() => {
if print_full_event_loop { if print_full_event_loop {
t::Printout::new(3, format!("{cap_message}")).send(&send_to_terminal).await; t::Printout::new(3, KERNEL_PROCESS_ID.clone(), format!("{cap_message}")).send(&send_to_terminal).await;
} else {
let on = match cap_message {
t::CapMessage::Add { ref on, .. } => on,
t::CapMessage::Drop { ref on, .. } => on,
t::CapMessage::Has { ref on, .. } => on,
t::CapMessage::GetAll { ref on, .. } => on,
t::CapMessage::RevokeAll { ref on, .. } => on,
t::CapMessage::FilterCaps { ref on, .. } => on,
};
if print_full_event_loop_for_process.contains(on) {
t::Printout::new(3, on.clone(), format!("{cap_message}")).send(&send_to_terminal).await;
}
} }
match cap_message { match cap_message {
t::CapMessage::Add { on, caps, responder } => { t::CapMessage::Add { on, caps, responder } => {

View File

@ -166,6 +166,7 @@ async fn make_component(
Err(e) => { Err(e) => {
t::Printout::new( t::Printout::new(
0, 0,
t::KERNEL_PROCESS_ID.clone(),
format!("kernel: process {our_process_id} failed to instantiate: {e:?}"), format!("kernel: process {our_process_id} failed to instantiate: {e:?}"),
) )
.send(&send_to_terminal) .send(&send_to_terminal)
@ -209,6 +210,7 @@ async fn make_component_v0(
Err(e) => { Err(e) => {
t::Printout::new( t::Printout::new(
0, 0,
t::KERNEL_PROCESS_ID.clone(),
format!("kernel: process {our_process_id} failed to instantiate: {e:?}"), format!("kernel: process {our_process_id} failed to instantiate: {e:?}"),
) )
.send(&send_to_terminal) .send(&send_to_terminal)
@ -294,9 +296,13 @@ pub async fn make_process_loop(
// the process will run until it returns from init() or crashes // the process will run until it returns from init() or crashes
match bindings.call_init(&mut store, &our.to_string()).await { match bindings.call_init(&mut store, &our.to_string()).await {
Ok(()) => { Ok(()) => {
t::Printout::new(1, format!("process {our} returned without error")) t::Printout::new(
.send(&send_to_terminal) 1,
.await; t::KERNEL_PROCESS_ID.clone(),
format!("process {our} returned without error"),
)
.send(&send_to_terminal)
.await;
} }
Err(e) => { Err(e) => {
let stderr = wasi_stderr.contents().into(); let stderr = wasi_stderr.contents().into();
@ -308,6 +314,7 @@ pub async fn make_process_loop(
}; };
t::Printout::new( t::Printout::new(
0, 0,
t::KERNEL_PROCESS_ID.clone(),
format!("\x1b[38;5;196mprocess {our} ended with error:\x1b[0m\n{output}"), format!("\x1b[38;5;196mprocess {our} ended with error:\x1b[0m\n{output}"),
) )
.send(&send_to_terminal) .send(&send_to_terminal)
@ -327,9 +334,13 @@ pub async fn make_process_loop(
// the process will run until it returns from init() or crashes // the process will run until it returns from init() or crashes
match bindings.call_init(&mut store, &our.to_string()).await { match bindings.call_init(&mut store, &our.to_string()).await {
Ok(()) => { Ok(()) => {
t::Printout::new(1, format!("process {our} returned without error")) t::Printout::new(
.send(&send_to_terminal) 1,
.await; t::KERNEL_PROCESS_ID.clone(),
format!("process {our} returned without error"),
)
.send(&send_to_terminal)
.await;
} }
Err(e) => { Err(e) => {
let stderr = wasi_stderr.contents().into(); let stderr = wasi_stderr.contents().into();
@ -341,6 +352,7 @@ pub async fn make_process_loop(
}; };
t::Printout::new( t::Printout::new(
0, 0,
t::KERNEL_PROCESS_ID.clone(),
format!("\x1b[38;5;196mprocess {our} ended with error:\x1b[0m\n{output}"), format!("\x1b[38;5;196mprocess {our} ended with error:\x1b[0m\n{output}"),
) )
.send(&send_to_terminal) .send(&send_to_terminal)
@ -359,6 +371,7 @@ pub async fn make_process_loop(
t::Printout::new( t::Printout::new(
1, 1,
t::KERNEL_PROCESS_ID.clone(),
format!( format!(
"process {} has OnExit behavior {}", "process {} has OnExit behavior {}",
metadata.our.process, metadata.on_exit metadata.our.process, metadata.on_exit

View File

@ -6,9 +6,18 @@ use lib::wit::Host as StandardHost;
use ring::signature::{self, KeyPair}; use ring::signature::{self, KeyPair};
async fn print_debug(proc: &process::ProcessState, content: &str) { async fn print_debug(proc: &process::ProcessState, content: &str) {
t::Printout::new(2, format!("{}: {}", proc.metadata.our.process, content)) t::Printout::new(
.send(&proc.send_to_terminal) 2,
.await; &proc.metadata.our.process,
format!(
"{}:{}: {}",
proc.metadata.our.process.package(),
proc.metadata.our.process.publisher(),
content
),
)
.send(&proc.send_to_terminal)
.await;
} }
impl process::ProcessState { impl process::ProcessState {
@ -339,6 +348,7 @@ impl process::ProcessState {
let Some(ref prompting_message) = self.prompting_message else { let Some(ref prompting_message) = self.prompting_message else {
t::Printout::new( t::Printout::new(
0, 0,
KERNEL_PROCESS_ID.clone(),
format!("kernel: need non-None prompting_message to handle Response {response:?}"), format!("kernel: need non-None prompting_message to handle Response {response:?}"),
) )
.send(&self.send_to_terminal) .send(&self.send_to_terminal)
@ -449,15 +459,16 @@ impl StandardHost for process::ProcessWasi {
async fn print_to_terminal(&mut self, verbosity: u8, content: String) -> Result<()> { async fn print_to_terminal(&mut self, verbosity: u8, content: String) -> Result<()> {
self.process self.process
.send_to_terminal .send_to_terminal
.send(t::Printout { .send(t::Printout::new(
verbosity, verbosity,
content: format!( &self.process.metadata.our.process,
format!(
"{}:{}: {}", "{}:{}: {}",
self.process.metadata.our.process.package(), self.process.metadata.our.process.package(),
self.process.metadata.our.process.publisher(), self.process.metadata.our.process.publisher(),
content content
), ),
}) ))
.await .await
.map_err(|e| anyhow::anyhow!("fatal: couldn't send to terminal: {e:?}")) .map_err(|e| anyhow::anyhow!("fatal: couldn't send to terminal: {e:?}"))
} }

View File

@ -6,13 +6,18 @@ use lib::v0::wit::Host as StandardHost;
use ring::signature::{self, KeyPair}; use ring::signature::{self, KeyPair};
async fn print_debug(proc: &process::ProcessState, content: &str) { async fn print_debug(proc: &process::ProcessState, content: &str) {
let _ = proc t::Printout::new(
.send_to_terminal 2,
.send(t::Printout { &proc.metadata.our.process,
verbosity: 2, format!(
content: format!("{}: {}", proc.metadata.our.process, content), "{}:{}: {}",
}) proc.metadata.our.process.package(),
.await; proc.metadata.our.process.publisher(),
content
),
)
.send(&proc.send_to_terminal)
.await;
} }
impl process::ProcessState { impl process::ProcessState {
@ -342,6 +347,7 @@ impl process::ProcessState {
let Some(ref prompting_message) = self.prompting_message else { let Some(ref prompting_message) = self.prompting_message else {
t::Printout::new( t::Printout::new(
0, 0,
KERNEL_PROCESS_ID.clone(),
format!("kernel: need non-None prompting_message to handle Response {response:?}"), format!("kernel: need non-None prompting_message to handle Response {response:?}"),
) )
.send(&self.send_to_terminal) .send(&self.send_to_terminal)
@ -457,15 +463,16 @@ impl StandardHost for process::ProcessWasiV0 {
async fn print_to_terminal(&mut self, verbosity: u8, content: String) -> Result<()> { async fn print_to_terminal(&mut self, verbosity: u8, content: String) -> Result<()> {
self.process self.process
.send_to_terminal .send_to_terminal
.send(t::Printout { .send(t::Printout::new(
verbosity, verbosity,
content: format!( &self.process.metadata.our.process,
format!(
"{}:{}: {}", "{}:{}: {}",
self.process.metadata.our.process.package(), self.process.metadata.our.process.package(),
self.process.metadata.our.process.publisher(), self.process.metadata.our.process.publisher(),
content content
), ),
}) ))
.await .await
.map_err(|e| anyhow::anyhow!("fatal: couldn't send to terminal: {e:?}")) .map_err(|e| anyhow::anyhow!("fatal: couldn't send to terminal: {e:?}"))
} }

View File

@ -124,6 +124,7 @@ pub async fn kv(
if state.our.node != km.source.node { if state.our.node != km.source.node {
Printout::new( Printout::new(
1, 1,
KV_PROCESS_ID.clone(),
format!( format!(
"kv: got request from {}, but requests must come from our node {}", "kv: got request from {}, but requests must come from our node {}",
km.source.node, state.our.node, km.source.node, state.our.node,
@ -138,6 +139,7 @@ pub async fn kv(
if let Err(e) = handle_fd_request(km, &mut state).await { if let Err(e) = handle_fd_request(km, &mut state).await {
Printout::new( Printout::new(
1, 1,
KV_PROCESS_ID.clone(),
format!("kv: got request from fd-manager that errored: {e:?}"), format!("kv: got request from fd-manager that errored: {e:?}"),
) )
.send(&state.send_to_terminal) .send(&state.send_to_terminal)
@ -167,7 +169,7 @@ pub async fn kv(
(km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone())); (km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone()));
if let Err(e) = handle_request(km, &mut state, &send_to_caps_oracle).await { if let Err(e) = handle_request(km, &mut state, &send_to_caps_oracle).await {
Printout::new(1, format!("kv: {e}")) Printout::new(1, KV_PROCESS_ID.clone(), format!("kv: {e}"))
.send(&state.send_to_terminal) .send(&state.send_to_terminal)
.await; .await;
KernelMessage::builder() KernelMessage::builder()

View File

@ -3,11 +3,12 @@ use clap::{arg, value_parser, Command};
use lib::types::core::{ use lib::types::core::{
CapMessageReceiver, CapMessageSender, DebugReceiver, DebugSender, Identity, KernelCommand, CapMessageReceiver, CapMessageSender, DebugReceiver, DebugSender, Identity, KernelCommand,
KernelMessage, Keyfile, Message, MessageReceiver, MessageSender, NetworkErrorReceiver, KernelMessage, Keyfile, Message, MessageReceiver, MessageSender, NetworkErrorReceiver,
NetworkErrorSender, NodeRouting, PrintReceiver, PrintSender, ProcessId, Request, NetworkErrorSender, NodeRouting, PrintReceiver, PrintSender, ProcessId, ProcessVerbosity,
KERNEL_PROCESS_ID, Request, KERNEL_PROCESS_ID,
}; };
#[cfg(feature = "simulation-mode")] #[cfg(feature = "simulation-mode")]
use ring::{rand::SystemRandom, signature, signature::KeyPair}; use ring::{rand::SystemRandom, signature, signature::KeyPair};
use std::collections::HashMap;
use std::env; use std::env;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
@ -106,6 +107,14 @@ async fn main() {
// detached determines whether terminal is interactive // detached determines whether terminal is interactive
let detached = *matches.get_one::<bool>("detached").unwrap(); let detached = *matches.get_one::<bool>("detached").unwrap();
let process_verbosity = matches.get_one::<String>("process-verbosity").unwrap();
let process_verbosity: ProcessVerbosity = if process_verbosity.is_empty() {
HashMap::new()
} else {
serde_json::from_str(&process_verbosity)
.expect("failed to parse given --process-verbosity to HashMap<ProcessId, u8>")
};
#[cfg(feature = "simulation-mode")] #[cfg(feature = "simulation-mode")]
let (fake_node_name, fakechain_port) = ( let (fake_node_name, fakechain_port) = (
matches.get_one::<String>("fake-node-name"), matches.get_one::<String>("fake-node-name"),
@ -477,6 +486,7 @@ async fn main() {
is_logging, is_logging,
max_log_size.copied(), max_log_size.copied(),
number_log_files.copied(), number_log_files.copied(),
process_verbosity,
) => { ) => {
match quit { match quit {
Ok(()) => { Ok(()) => {
@ -726,6 +736,10 @@ fn build_command() -> Command {
.arg( .arg(
arg!(--"soft-ulimit" <SOFT_ULIMIT> "Enforce a static maximum number of file descriptors (default fetched from system)") arg!(--"soft-ulimit" <SOFT_ULIMIT> "Enforce a static maximum number of file descriptors (default fetched from system)")
.value_parser(value_parser!(u64)), .value_parser(value_parser!(u64)),
)
.arg(
arg!(--"process-verbosity" <JSON_STRING> "ProcessId: verbosity JSON object")
.default_value("")
); );
#[cfg(feature = "simulation-mode")] #[cfg(feature = "simulation-mode")]

View File

@ -5,7 +5,7 @@ use crate::net::types::{
use lib::types::core::{ use lib::types::core::{
Identity, KernelMessage, KnsUpdate, Message, MessageSender, NetAction, NetworkErrorSender, Identity, KernelMessage, KnsUpdate, Message, MessageSender, NetAction, NetworkErrorSender,
NodeId, NodeRouting, PrintSender, Printout, Request, Response, SendError, SendErrorKind, NodeId, NodeRouting, PrintSender, Printout, Request, Response, SendError, SendErrorKind,
WrappedSendError, WrappedSendError, NET_PROCESS_ID,
}; };
use { use {
futures::{SinkExt, StreamExt}, futures::{SinkExt, StreamExt},
@ -429,12 +429,16 @@ pub async fn parse_hello_message(
/// Create a terminal printout at verbosity level 0. /// Create a terminal printout at verbosity level 0.
pub async fn print_loud(print_tx: &PrintSender, content: &str) { pub async fn print_loud(print_tx: &PrintSender, content: &str) {
Printout::new(0, content).send(print_tx).await; Printout::new(0, NET_PROCESS_ID.clone(), content)
.send(print_tx)
.await;
} }
/// Create a terminal printout at verbosity level 2. /// Create a terminal printout at verbosity level 2.
pub async fn print_debug(print_tx: &PrintSender, content: &str) { pub async fn print_debug(print_tx: &PrintSender, content: &str) {
Printout::new(2, content).send(print_tx).await; Printout::new(2, NET_PROCESS_ID.clone(), content)
.send(print_tx)
.await;
} }
pub fn get_now() -> u64 { pub fn get_now() -> u64 {

View File

@ -135,6 +135,7 @@ pub async fn sqlite(
if state.our.node != km.source.node { if state.our.node != km.source.node {
Printout::new( Printout::new(
1, 1,
SQLITE_PROCESS_ID.clone(),
format!( format!(
"sqlite: got request from {}, but requests must come from our node {}", "sqlite: got request from {}, but requests must come from our node {}",
km.source.node, state.our.node km.source.node, state.our.node
@ -149,6 +150,7 @@ pub async fn sqlite(
if let Err(e) = handle_fd_request(km, &mut state).await { if let Err(e) = handle_fd_request(km, &mut state).await {
Printout::new( Printout::new(
1, 1,
SQLITE_PROCESS_ID.clone(),
format!("sqlite: got request from fd-manager that errored: {e:?}"), format!("sqlite: got request from fd-manager that errored: {e:?}"),
) )
.send(&state.send_to_terminal) .send(&state.send_to_terminal)
@ -178,7 +180,7 @@ pub async fn sqlite(
(km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone())); (km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone()));
if let Err(e) = handle_request(km, &mut state, &send_to_caps_oracle).await { if let Err(e) = handle_request(km, &mut state, &send_to_caps_oracle).await {
Printout::new(1, format!("sqlite: {e}")) Printout::new(1, SQLITE_PROCESS_ID.clone(), format!("sqlite: {e}"))
.send(&state.send_to_terminal) .send(&state.send_to_terminal)
.await; .await;
KernelMessage::builder() KernelMessage::builder()

View File

@ -110,6 +110,7 @@ pub async fn state_sender(
if *our_node != km.source.node { if *our_node != km.source.node {
Printout::new( Printout::new(
1, 1,
STATE_PROCESS_ID.clone(),
format!( format!(
"state: got request from {}, but requests must come from our node {our_node}", "state: got request from {}, but requests must come from our node {our_node}",
km.source.node km.source.node

View File

@ -9,9 +9,11 @@ use crossterm::{
use futures::{future::FutureExt, StreamExt}; use futures::{future::FutureExt, StreamExt};
use lib::types::core::{ use lib::types::core::{
DebugCommand, DebugSender, Identity, KernelMessage, Message, MessageSender, PrintReceiver, DebugCommand, DebugSender, Identity, KernelMessage, Message, MessageSender, PrintReceiver,
PrintSender, Printout, Request, TERMINAL_PROCESS_ID, PrintSender, Printout, ProcessId, ProcessVerbosity, ProcessVerbosityVal, Request,
TERMINAL_PROCESS_ID,
}; };
use std::{ use std::{
collections::{HashMap, VecDeque},
fs::{read_to_string, OpenOptions}, fs::{read_to_string, OpenOptions},
io::BufWriter, io::BufWriter,
path::PathBuf, path::PathBuf,
@ -22,6 +24,9 @@ use unicode_segmentation::UnicodeSegmentation;
pub mod utils; pub mod utils;
// TODO: add a flag & `terminal::terminal()` arg so can be set at run time
const MAX_PRINTOUT_QUEUE_LEN_DEFAULT: usize = 256;
struct State { struct State {
pub stdout: std::io::Stdout, pub stdout: std::io::Stdout,
/// handle and settings for on-disk log (disabled by default, triggered by CTRL+L) /// handle and settings for on-disk log (disabled by default, triggered by CTRL+L)
@ -44,6 +49,16 @@ struct State {
pub logging_mode: bool, pub logging_mode: bool,
/// verbosity mode (increased by CTRL+V) /// verbosity mode (increased by CTRL+V)
pub verbose_mode: u8, pub verbose_mode: u8,
/// process-level verbosities: override verbose_mode when populated
pub process_verbosity: HashMap<ProcessId, ProcessVerbosityVal>,
/// flag representing whether we are in process verbosity mode (activated by CTRL+W, exited by CTRL+W)
pub process_verbosity_mode: bool,
/// line to be restored when exiting process_verbosity_mode
pub saved_line: Option<String>,
/// if in alternate screen, queue up max_printout_queue_len printouts
pub printout_queue: VecDeque<Printout>,
pub max_printout_queue_len: usize,
pub printout_queue_number_dropped_printouts: u64,
} }
impl State { impl State {
@ -112,6 +127,122 @@ impl State {
) )
} }
} }
fn enter_process_verbosity_mode(&mut self) -> Result<(), std::io::Error> {
// Save current line and switch to alternate screen
execute!(
self.stdout,
terminal::EnterAlternateScreen,
cursor::Hide, // Hide cursor while in alternate screen
)?;
self.display_process_verbosity()?;
Ok(())
}
fn exit_process_verbosity_mode(&mut self) -> anyhow::Result<()> {
// Leave alternate screen and restore cursor
execute!(self.stdout, cursor::Show, terminal::LeaveAlternateScreen)?;
// Print queued messages to main screen
if self.printout_queue_number_dropped_printouts != 0 {
let number_dropped_printout = Printout::new(
0,
TERMINAL_PROCESS_ID.clone(),
format!(
"Dropped {} prints while on alternate screen",
self.printout_queue_number_dropped_printouts,
),
);
handle_printout(number_dropped_printout, self)?;
self.printout_queue_number_dropped_printouts = 0;
}
while let Some(printout) = self.printout_queue.pop_front() {
handle_printout(printout, self)?;
}
Ok(())
}
fn display_process_verbosity(&mut self) -> Result<(), std::io::Error> {
// Clear the entire screen from the input line up
execute!(
self.stdout,
cursor::MoveTo(0, 0),
terminal::Clear(ClearType::FromCursorDown),
)?;
// Display the header with overall verbosity
execute!(
self.stdout,
cursor::MoveTo(0, 0),
style::SetForegroundColor(style::Color::Green),
Print("=== Process Verbosity Mode ===\n\r"),
style::SetForegroundColor(style::Color::Reset),
Print(format!(
"Overall verbosity: {}\n\r",
match self.verbose_mode {
0 => "off",
1 => "debug",
2 => "super-debug",
3 => "full event loop",
_ => "unknown",
}
)),
Print("\n\rProcess-specific verbosity levels:\n\r"),
)?;
// Display current process verbosities
let mut row = 4;
if self.process_verbosity.is_empty() {
execute!(self.stdout, cursor::MoveTo(0, row), Print("(none set)\n\r"),)?;
row += 1;
} else {
for (process_id, verbosity) in &self.process_verbosity {
execute!(
self.stdout,
cursor::MoveTo(0, row),
Print(format!("{}: {}\n\r", process_id, verbosity)),
)?;
row += 1;
}
}
// Display instructions
execute!(
self.stdout,
cursor::MoveTo(0, row + 1),
Print("To set process verbosity, input '<ProcessId> <verbosity (u8)>' and then press <Enter>\n\r e.g.\n\r chat:chat:template.os 3\n\rTo mute a process, input '<ProcessId> m' or 'mute' or 'muted' and then press <Enter>.\n\rTo remove a previously set process verbosity, input '<ProcessId>' and then press <Enter>.\n\r"),
Print("Press CTRL+W to exit\n\r"),
)?;
// Display input line at the bottom
execute!(
self.stdout,
cursor::MoveTo(0, self.win_rows),
terminal::Clear(ClearType::CurrentLine),
Print("> "),
Print(&self.current_line.line),
cursor::MoveTo(
self.current_line.cursor_col + 2, // +2 for the "> " prompt
self.win_rows
),
)?;
Ok(())
}
fn parse_process_verbosity(input: &str) -> Option<(ProcessId, ProcessVerbosityVal)> {
let parts: Vec<&str> = input.trim().split_whitespace().collect();
if parts.len() != 2 {
return None;
}
let process_id: ProcessId = parts[0].parse().ok()?;
let verbosity = parts[1].parse::<ProcessVerbosityVal>().ok()?;
Some((process_id, verbosity))
}
} }
struct CurrentLine { struct CurrentLine {
@ -186,6 +317,7 @@ pub async fn terminal(
is_logging: bool, is_logging: bool,
max_log_size: Option<u64>, max_log_size: Option<u64>,
number_log_files: Option<u64>, number_log_files: Option<u64>,
process_verbosity: ProcessVerbosity,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let (stdout, _maybe_raw_mode) = utils::splash(&our, version, is_detached)?; let (stdout, _maybe_raw_mode) = utils::splash(&our, version, is_detached)?;
@ -220,6 +352,13 @@ pub async fn terminal(
let log_dir_path = home_directory_path.join(".terminal_logs"); let log_dir_path = home_directory_path.join(".terminal_logs");
let logger = utils::Logger::new(log_dir_path, max_log_size, number_log_files); let logger = utils::Logger::new(log_dir_path, max_log_size, number_log_files);
let process_verbosity_mode = false;
let saved_line = None;
let printout_queue = VecDeque::new();
let max_printout_queue_len = MAX_PRINTOUT_QUEUE_LEN_DEFAULT.clone();
let printout_queue_number_dropped_printouts = 0;
let mut state = State { let mut state = State {
stdout, stdout,
logger, logger,
@ -238,6 +377,12 @@ pub async fn terminal(
search_depth, search_depth,
logging_mode, logging_mode,
verbose_mode, verbose_mode,
process_verbosity,
process_verbosity_mode,
saved_line,
printout_queue,
max_printout_queue_len,
printout_queue_number_dropped_printouts,
}; };
// use to trigger cleanup if receive signal to kill process // use to trigger cleanup if receive signal to kill process
@ -272,6 +417,19 @@ pub async fn terminal(
.expect("failed to toggle full event loop off"); .expect("failed to toggle full event loop off");
} }
// in contrast, "full event loop" per-process is default off:
// here, we toggle it ON if we have any given at that level
for (process, verbosity) in state.process_verbosity.iter() {
if let ProcessVerbosityVal::U8(verbosity) = verbosity {
if *verbosity == 3 {
debug_event_loop
.send(DebugCommand::ToggleEventLoopForProcess(process.clone()))
.await
.expect("failed to toggle process-level full event loop on");
}
}
}
// only create event stream if not in detached mode // only create event stream if not in detached mode
if !is_detached { if !is_detached {
let mut reader = EventStream::new(); let mut reader = EventStream::new();
@ -333,6 +491,15 @@ pub async fn terminal(
} }
fn handle_printout(printout: Printout, state: &mut State) -> anyhow::Result<()> { fn handle_printout(printout: Printout, state: &mut State) -> anyhow::Result<()> {
if state.process_verbosity_mode {
if state.printout_queue.len() >= state.max_printout_queue_len {
// remove oldest if queue is overflowing
state.printout_queue.pop_front();
state.printout_queue_number_dropped_printouts += 1;
}
state.printout_queue.push_back(printout);
return Ok(());
}
// lock here so that runtime can still use println! without freezing.. // lock here so that runtime can still use println! without freezing..
// can lock before loop later if we want to reduce overhead // can lock before loop later if we want to reduce overhead
let mut stdout = state.stdout.lock(); let mut stdout = state.stdout.lock();
@ -342,7 +509,14 @@ fn handle_printout(printout: Printout, state: &mut State) -> anyhow::Result<()>
} }
// skip writing print to terminal if it's of a greater // skip writing print to terminal if it's of a greater
// verbosity level than our current mode // verbosity level than our current mode
if printout.verbosity > state.verbose_mode { let current_verbosity = match state.process_verbosity.get(&printout.source) {
None => &state.verbose_mode,
Some(cv) => match cv.get_verbosity() {
Some(v) => v,
None => return Ok(()), // process is muted
},
};
if &printout.verbosity > current_verbosity {
return Ok(()); return Ok(());
} }
let now = Local::now(); let now = Local::now();
@ -451,6 +625,8 @@ async fn handle_event(
} }
if state.search_mode { if state.search_mode {
state.search(&our.name)?; state.search(&our.name)?;
} else if state.process_verbosity_mode {
state.display_process_verbosity()?;
} else { } else {
state.display_current_input_line(false)?; state.display_current_input_line(false)?;
} }
@ -514,6 +690,9 @@ async fn handle_key_event(
modifiers: KeyModifiers::CONTROL, modifiers: KeyModifiers::CONTROL,
.. ..
} => { } => {
if state.process_verbosity_mode {
return Ok(Some(false));
}
// go from low to high, then reset to 0 // go from low to high, then reset to 0
match verbose_mode { match verbose_mode {
0 => *verbose_mode = 1, 0 => *verbose_mode = 1,
@ -536,6 +715,7 @@ async fn handle_key_event(
} }
Printout::new( Printout::new(
0, 0,
TERMINAL_PROCESS_ID.clone(),
format!( format!(
"verbose mode: {}", "verbose mode: {}",
match verbose_mode { match verbose_mode {
@ -559,10 +739,14 @@ async fn handle_key_event(
modifiers: KeyModifiers::CONTROL, modifiers: KeyModifiers::CONTROL,
.. ..
} => { } => {
if state.process_verbosity_mode {
return Ok(Some(false));
}
let _ = debug_event_loop.send(DebugCommand::ToggleStepthrough).await; let _ = debug_event_loop.send(DebugCommand::ToggleStepthrough).await;
*in_step_through = !*in_step_through; *in_step_through = !*in_step_through;
Printout::new( Printout::new(
0, 0,
TERMINAL_PROCESS_ID.clone(),
format!( format!(
"debug mode {}", "debug mode {}",
match in_step_through { match in_step_through {
@ -583,6 +767,9 @@ async fn handle_key_event(
modifiers: KeyModifiers::CONTROL, modifiers: KeyModifiers::CONTROL,
.. ..
} => { } => {
if state.process_verbosity_mode {
return Ok(Some(false));
}
let _ = debug_event_loop.send(DebugCommand::Step).await; let _ = debug_event_loop.send(DebugCommand::Step).await;
return Ok(Some(false)); return Ok(Some(false));
} }
@ -594,9 +781,13 @@ async fn handle_key_event(
modifiers: KeyModifiers::CONTROL, modifiers: KeyModifiers::CONTROL,
.. ..
} => { } => {
if state.process_verbosity_mode {
return Ok(Some(false));
}
*logging_mode = !*logging_mode; *logging_mode = !*logging_mode;
Printout::new( Printout::new(
0, 0,
TERMINAL_PROCESS_ID.clone(),
format!("logging mode: {}", if *logging_mode { "on" } else { "off" }), format!("logging mode: {}", if *logging_mode { "on" } else { "off" }),
) )
.send(&print_tx) .send(&print_tx)
@ -614,7 +805,7 @@ async fn handle_key_event(
modifiers: KeyModifiers::CONTROL, modifiers: KeyModifiers::CONTROL,
.. ..
} => { } => {
if state.search_mode { if state.search_mode || state.process_verbosity_mode {
return Ok(Some(false)); return Ok(Some(false));
} }
// go up one command in history // go up one command in history
@ -646,7 +837,7 @@ async fn handle_key_event(
modifiers: KeyModifiers::CONTROL, modifiers: KeyModifiers::CONTROL,
.. ..
} => { } => {
if state.search_mode { if state.search_mode || state.process_verbosity_mode {
return Ok(Some(false)); return Ok(Some(false));
} }
// go down one command in history // go down one command in history
@ -706,6 +897,9 @@ async fn handle_key_event(
modifiers: KeyModifiers::CONTROL, modifiers: KeyModifiers::CONTROL,
.. ..
} => { } => {
if state.process_verbosity_mode {
return Ok(Some(false));
}
if state.search_mode { if state.search_mode {
*search_depth += 1; *search_depth += 1;
} }
@ -724,6 +918,46 @@ async fn handle_key_event(
*search_depth = 0; *search_depth = 0;
} }
// //
// CTRL+W: enter/exit process_verbosity_mode
//
KeyEvent {
code: KeyCode::Char('w'),
modifiers: KeyModifiers::CONTROL,
..
} => {
if state.search_mode {
return Ok(Some(false));
}
if state.process_verbosity_mode {
// Exit process verbosity mode
state.process_verbosity_mode = false;
// Restore previous line if it exists
if let Some(saved_line) = state.saved_line.take() {
current_line.line = saved_line;
current_line.line_col = current_line.line.graphemes(true).count();
current_line.cursor_col = std::cmp::min(
utils::display_width(&current_line.line) as u16,
*win_cols - current_line.prompt_len as u16,
);
}
state.exit_process_verbosity_mode()?;
} else {
// Enter process verbosity mode
state.process_verbosity_mode = true;
// Save current line
state.saved_line = Some(current_line.line.clone());
current_line.line.clear();
current_line.line_col = 0;
current_line.cursor_col = 0;
state.enter_process_verbosity_mode()?;
}
return Ok(Some(false));
}
//
// KEY: handle keypress events // KEY: handle keypress events
// //
k => { k => {
@ -813,6 +1047,61 @@ async fn handle_key_event(
// ENTER: send current input to terminal process, clearing input line // ENTER: send current input to terminal process, clearing input line
// //
KeyCode::Enter => { KeyCode::Enter => {
// if we were in process verbosity mode, update state
if state.process_verbosity_mode {
if let Some((process_id, verbosity)) =
State::parse_process_verbosity(&current_line.line)
{
// add ProcessId
let old_verbosity = state
.process_verbosity
.insert(process_id.clone(), verbosity.clone())
.and_then(|ov| ov.get_verbosity().map(|ov| ov.clone()))
.unwrap_or_default();
let verbosity = verbosity
.get_verbosity()
.map(|v| v.clone())
.unwrap_or_default();
if (old_verbosity == 3 && verbosity != 3)
|| (verbosity == 3 && old_verbosity != 3)
{
debug_event_loop
.send(DebugCommand::ToggleEventLoopForProcess(
process_id.clone(),
))
.await
.expect("failed to toggle process-level full event loop on");
}
current_line.line.clear();
current_line.line_col = 0;
current_line.cursor_col = 0;
state.display_process_verbosity()?;
} else if let Ok(process_id) = &current_line.line.parse() {
// remove ProcessId
if let Some(old_verbosity) = state.process_verbosity.remove(&process_id)
{
let old_verbosity = old_verbosity
.get_verbosity()
.map(|ov| ov.clone())
.unwrap_or_default();
if old_verbosity == 3 {
debug_event_loop
.send(DebugCommand::ToggleEventLoopForProcess(
process_id.clone(),
))
.await
.expect(
"failed to toggle process-level full event loop on",
);
}
}
current_line.line.clear();
current_line.line_col = 0;
current_line.cursor_col = 0;
state.display_process_verbosity()?;
}
return Ok(Some(false));
}
// if we were in search mode, pull command from that // if we were in search mode, pull command from that
let command = if !state.search_mode { let command = if !state.search_mode {
current_line.line.clone() current_line.line.clone()

View File

@ -59,14 +59,14 @@ pub async fn timer_service(
// we only handle Requests // we only handle Requests
let Message::Request(req) = km.message else { continue }; let Message::Request(req) = km.message else { continue };
let Ok(timer_action) = serde_json::from_slice::<TimerAction>(&req.body) else { let Ok(timer_action) = serde_json::from_slice::<TimerAction>(&req.body) else {
Printout::new(1, "timer service received a request with an invalid body").send(&print_tx).await; Printout::new(1, TIMER_PROCESS_ID.clone(), "timer service received a request with an invalid body").send(&print_tx).await;
continue continue
}; };
match timer_action { match timer_action {
TimerAction::Debug => { TimerAction::Debug => {
Printout::new(0, format!("timer service active timers ({}):", timer_map.timers.len())).send(&print_tx).await; Printout::new(0, TIMER_PROCESS_ID.clone(), format!("timer service active timers ({}):", timer_map.timers.len())).send(&print_tx).await;
for (k, v) in timer_map.timers.iter() { for (k, v) in timer_map.timers.iter() {
Printout::new(0, format!("{k}: {v:?}")).send(&print_tx).await; Printout::new(0, TIMER_PROCESS_ID.clone(), format!("{k}: {v:?}")).send(&print_tx).await;
} }
continue continue
} }
@ -98,7 +98,7 @@ pub async fn timer_service(
.send(&kernel_message_sender).await; .send(&kernel_message_sender).await;
continue continue
} }
Printout::new(3, format!("set timer to pop in {timer_millis}ms")).send(&print_tx).await; Printout::new(3, TIMER_PROCESS_ID.clone(), format!("set timer to pop in {timer_millis}ms")).send(&print_tx).await;
if !timer_map.contains(pop_time) { if !timer_map.contains(pop_time) {
timer_tasks.spawn(async move { timer_tasks.spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(timer_millis - 1)).await; tokio::time::sleep(std::time::Duration::from_millis(timer_millis - 1)).await;

View File

@ -63,6 +63,7 @@ pub async fn vfs(
if *our_node != km.source.node { if *our_node != km.source.node {
Printout::new( Printout::new(
1, 1,
VFS_PROCESS_ID.clone(),
format!( format!(
"vfs: got request from {}, but requests must come from our node {our_node}", "vfs: got request from {}, but requests must come from our node {our_node}",
km.source.node km.source.node
@ -77,6 +78,7 @@ pub async fn vfs(
if let Err(e) = handle_fd_request(km, &mut files).await { if let Err(e) = handle_fd_request(km, &mut files).await {
Printout::new( Printout::new(
1, 1,
VFS_PROCESS_ID.clone(),
format!("vfs: got request from fd-manager that errored: {e:?}"), format!("vfs: got request from fd-manager that errored: {e:?}"),
) )
.send(&send_to_terminal) .send(&send_to_terminal)

View File

@ -1332,16 +1332,19 @@ pub struct WrappedSendError {
/// - `3`: very verbose: shows every event in event loop /// - `3`: very verbose: shows every event in event loop
pub struct Printout { pub struct Printout {
pub verbosity: u8, pub verbosity: u8,
pub source: ProcessId,
pub content: String, pub content: String,
} }
impl Printout { impl Printout {
pub fn new<T>(verbosity: u8, content: T) -> Self pub fn new<T, U>(verbosity: u8, source: T, content: U) -> Self
where where
T: Into<String>, T: Into<ProcessId>,
U: Into<String>,
{ {
Self { Self {
verbosity, verbosity,
source: source.into(),
content: content.into(), content: content.into(),
} }
} }
@ -1352,6 +1355,55 @@ impl Printout {
} }
} }
#[derive(Error, Debug)]
pub enum ProcessVerbosityValError {
#[error("Parse failed; must be `m` `mute` or `muted` (-> `Muted`) OR a u8")]
ParseFailed,
}
#[derive(Clone, Deserialize, Serialize)]
pub enum ProcessVerbosityVal {
U8(u8),
Muted,
}
impl ProcessVerbosityVal {
pub fn get_verbosity(&self) -> Option<&u8> {
match self {
ProcessVerbosityVal::U8(v) => Some(v),
ProcessVerbosityVal::Muted => None,
}
}
}
impl std::str::FromStr for ProcessVerbosityVal {
type Err = ProcessVerbosityValError;
fn from_str(input: &str) -> Result<Self, Self::Err> {
if input == "m" || input == "mute" || input == "muted" {
return Ok(Self::Muted);
}
let Ok(u) = input.parse::<u8>() else {
return Err(ProcessVerbosityValError::ParseFailed);
};
Ok(Self::U8(u))
}
}
impl std::fmt::Display for ProcessVerbosityVal {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{}",
match self {
ProcessVerbosityVal::U8(verbosity) => format!("{verbosity}"),
ProcessVerbosityVal::Muted => "muted".to_string(),
},
)
}
}
pub type ProcessVerbosity = HashMap<ProcessId, ProcessVerbosityVal>;
/// kernel sets in case, e.g., /// kernel sets in case, e.g.,
/// A requests response from B does not request response from C /// A requests response from B does not request response from C
/// -> kernel sets `Some(A) = Rsvp` for B's request to C /// -> kernel sets `Some(A) = Rsvp` for B's request to C
@ -1362,6 +1414,7 @@ pub enum DebugCommand {
ToggleStepthrough, ToggleStepthrough,
Step, Step,
ToggleEventLoop, ToggleEventLoop,
ToggleEventLoopForProcess(ProcessId),
} }
/// IPC format for requests sent to kernel runtime module /// IPC format for requests sent to kernel runtime module