mirror of
https://github.com/uqbar-dao/nectar.git
synced 2025-01-05 00:10:23 +03:00
add builders for KernelMessage
and Printout
, WIP use them everywhere
This commit is contained in:
parent
cfd9959c82
commit
c842455314
@ -361,20 +361,10 @@ pub async fn parse_hello_message(
|
|||||||
|
|
||||||
/// Create a terminal printout at verbosity level 0.
|
/// Create a terminal printout at verbosity level 0.
|
||||||
pub async fn print_loud(print_tx: &PrintSender, content: &str) {
|
pub async fn print_loud(print_tx: &PrintSender, content: &str) {
|
||||||
let _ = print_tx
|
Printout::new(0, content).send(print_tx).await;
|
||||||
.send(Printout {
|
|
||||||
verbosity: 0,
|
|
||||||
content: content.into(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a terminal printout at verbosity level 2.
|
/// Create a terminal printout at verbosity level 2.
|
||||||
pub async fn print_debug(print_tx: &PrintSender, content: &str) {
|
pub async fn print_debug(print_tx: &PrintSender, content: &str) {
|
||||||
let _ = print_tx
|
Printout::new(2, content).send(print_tx).await;
|
||||||
.send(Printout {
|
|
||||||
verbosity: 2,
|
|
||||||
content: content.into(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
|
@ -218,17 +218,12 @@ pub async fn terminal(
|
|||||||
2 => verbose_mode = 3,
|
2 => verbose_mode = 3,
|
||||||
_ => verbose_mode = 0,
|
_ => verbose_mode = 0,
|
||||||
}
|
}
|
||||||
let _ = print_tx.send(
|
Printout::new(0, format!("verbose mode: {}", match verbose_mode {
|
||||||
Printout {
|
0 => "off",
|
||||||
verbosity: 0,
|
1 => "debug",
|
||||||
content: match verbose_mode {
|
2 => "super-debug",
|
||||||
0 => "verbose mode: off".into(),
|
_ => "full event loop",
|
||||||
1 => "verbose mode: debug".into(),
|
})).send(&print_tx).await;
|
||||||
2 => "verbose mode: super-debug".into(),
|
|
||||||
_ => "verbose mode: full event loop".into(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
).await;
|
|
||||||
if verbose_mode == 3 {
|
if verbose_mode == 3 {
|
||||||
let _ = debug_event_loop.send(DebugCommand::ToggleEventLoop).await;
|
let _ = debug_event_loop.send(DebugCommand::ToggleEventLoop).await;
|
||||||
}
|
}
|
||||||
@ -243,15 +238,12 @@ pub async fn terminal(
|
|||||||
}) => {
|
}) => {
|
||||||
let _ = debug_event_loop.send(DebugCommand::ToggleStepthrough).await;
|
let _ = debug_event_loop.send(DebugCommand::ToggleStepthrough).await;
|
||||||
in_step_through = !in_step_through;
|
in_step_through = !in_step_through;
|
||||||
let _ = print_tx.send(
|
Printout::new(0, format!("debug mode {}", match in_step_through {
|
||||||
Printout {
|
false => "off",
|
||||||
verbosity: 0,
|
true => "on: use CTRL+S to step through events",
|
||||||
content: match in_step_through {
|
}))
|
||||||
false => "debug mode off".into(),
|
.send(&print_tx)
|
||||||
true => "debug mode on: use CTRL+S to step through events".into(),
|
.await;
|
||||||
}
|
|
||||||
}
|
|
||||||
).await;
|
|
||||||
|
|
||||||
},
|
},
|
||||||
//
|
//
|
||||||
@ -273,15 +265,12 @@ pub async fn terminal(
|
|||||||
..
|
..
|
||||||
}) => {
|
}) => {
|
||||||
logging_mode = !logging_mode;
|
logging_mode = !logging_mode;
|
||||||
let _ = print_tx.send(
|
Printout::new(
|
||||||
Printout {
|
0,
|
||||||
verbosity: 0,
|
format!("logging mode: {}", if logging_mode { "on" } else { "off" })
|
||||||
content: match logging_mode {
|
)
|
||||||
true => "logging mode: on".into(),
|
.send(&print_tx)
|
||||||
false => "logging mode: off".into(),
|
.await;
|
||||||
}
|
|
||||||
}
|
|
||||||
).await;
|
|
||||||
},
|
},
|
||||||
//
|
//
|
||||||
// UP / CTRL+P: go up one command in history
|
// UP / CTRL+P: go up one command in history
|
||||||
|
@ -39,23 +39,14 @@ pub async fn timer_service(
|
|||||||
// we only handle Requests
|
// we only handle Requests
|
||||||
let Message::Request(req) = km.message else { continue };
|
let Message::Request(req) = km.message else { continue };
|
||||||
let Ok(timer_action) = serde_json::from_slice::<TimerAction>(&req.body) else {
|
let Ok(timer_action) = serde_json::from_slice::<TimerAction>(&req.body) else {
|
||||||
let _ = print_tx.send(Printout {
|
Printout::new(1, "timer service received a request with an invalid body").send(&print_tx).await;
|
||||||
verbosity: 1,
|
|
||||||
content: "timer service received a request with an invalid body".to_string(),
|
|
||||||
}).await;
|
|
||||||
continue
|
continue
|
||||||
};
|
};
|
||||||
match timer_action {
|
match timer_action {
|
||||||
TimerAction::Debug => {
|
TimerAction::Debug => {
|
||||||
let _ = print_tx.send(Printout {
|
Printout::new(0, format!("timer service active timers ({}):", timer_map.timers.len())).send(&print_tx).await;
|
||||||
verbosity: 0,
|
|
||||||
content: format!("timer service active timers ({}):", timer_map.timers.len()),
|
|
||||||
}).await;
|
|
||||||
for (k, v) in timer_map.timers.iter() {
|
for (k, v) in timer_map.timers.iter() {
|
||||||
let _ = print_tx.send(Printout {
|
Printout::new(0, format!("{k}: {v:?}")).send(&print_tx).await;
|
||||||
verbosity: 0,
|
|
||||||
content: format!("{}: {:?}", k, v),
|
|
||||||
}).await;
|
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -72,10 +63,7 @@ pub async fn timer_service(
|
|||||||
send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await;
|
send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await;
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
let _ = print_tx.send(Printout {
|
Printout::new(3, format!("set timer to pop in {timer_millis}ms")).send(&print_tx).await;
|
||||||
verbosity: 3,
|
|
||||||
content: format!("set timer to pop in {}ms", timer_millis),
|
|
||||||
}).await;
|
|
||||||
if !timer_map.contains(pop_time) {
|
if !timer_map.contains(pop_time) {
|
||||||
timer_tasks.spawn(async move {
|
timer_tasks.spawn(async move {
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(timer_millis - 1)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(timer_millis - 1)).await;
|
||||||
@ -121,25 +109,21 @@ impl TimerMap {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn send_response(our_node: &str, id: u64, target: Address, send_to_loop: &MessageSender) {
|
async fn send_response(our_node: &str, id: u64, target: Address, send_to_loop: &MessageSender) {
|
||||||
let _ = send_to_loop
|
KernelMessage::builder()
|
||||||
.send(KernelMessage {
|
.id(id)
|
||||||
id,
|
.source((our_node, TIMER_PROCESS_ID.clone()))
|
||||||
source: Address {
|
.target(target)
|
||||||
node: our_node.to_string(),
|
.message(Message::Response((
|
||||||
process: TIMER_PROCESS_ID.clone(),
|
Response {
|
||||||
|
inherit: false,
|
||||||
|
body: vec![],
|
||||||
|
metadata: None,
|
||||||
|
capabilities: vec![],
|
||||||
},
|
},
|
||||||
target,
|
None,
|
||||||
rsvp: None,
|
)))
|
||||||
message: Message::Response((
|
.build()
|
||||||
Response {
|
.unwrap()
|
||||||
inherit: false,
|
.send(send_to_loop)
|
||||||
body: vec![],
|
|
||||||
metadata: None,
|
|
||||||
capabilities: vec![],
|
|
||||||
},
|
|
||||||
None,
|
|
||||||
)),
|
|
||||||
lazy_load_blob: None,
|
|
||||||
})
|
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
@ -36,15 +36,15 @@ pub async fn vfs(
|
|||||||
|
|
||||||
while let Some(km) = recv_from_loop.recv().await {
|
while let Some(km) = recv_from_loop.recv().await {
|
||||||
if *our_node != km.source.node {
|
if *our_node != km.source.node {
|
||||||
let _ = send_to_terminal
|
Printout::new(
|
||||||
.send(Printout {
|
1,
|
||||||
verbosity: 1,
|
format!(
|
||||||
content: format!(
|
"vfs: got request from {}, but requests must come from our node {our_node}",
|
||||||
"vfs: got request from {}, but requests must come from our node {our_node}\r",
|
km.source.node
|
||||||
km.source.node,
|
|
||||||
),
|
),
|
||||||
})
|
)
|
||||||
.await;
|
.send(&send_to_terminal)
|
||||||
|
.await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -81,20 +81,10 @@ pub async fn vfs(
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
let _ = send_to_terminal
|
Printout::new(1, format!("vfs: {e}"))
|
||||||
.send(Printout {
|
.send(&send_to_terminal)
|
||||||
verbosity: 1,
|
|
||||||
content: format!("vfs: {e}\r"),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
let _ = send_to_loop
|
|
||||||
.send(make_error_message(
|
|
||||||
our_node.to_string(),
|
|
||||||
km_id,
|
|
||||||
km_source,
|
|
||||||
e,
|
|
||||||
))
|
|
||||||
.await;
|
.await;
|
||||||
|
make_error_message(&our_node, km_id, km_source, e, &send_to_loop).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -152,15 +142,11 @@ async fn handle_request(
|
|||||||
entries.push(dir_entry);
|
entries.push(dir_entry);
|
||||||
}
|
}
|
||||||
|
|
||||||
let response = KernelMessage {
|
KernelMessage::builder()
|
||||||
id: km.id,
|
.id(km.id)
|
||||||
source: Address {
|
.source((our_node, VFS_PROCESS_ID.clone()))
|
||||||
node: our_node.to_string(),
|
.target(km.source)
|
||||||
process: VFS_PROCESS_ID.clone(),
|
.message(Message::Response((
|
||||||
},
|
|
||||||
target: km.source,
|
|
||||||
rsvp: None,
|
|
||||||
message: Message::Response((
|
|
||||||
Response {
|
Response {
|
||||||
inherit: false,
|
inherit: false,
|
||||||
body: serde_json::to_vec(&VfsResponse::ReadDir(entries)).unwrap(),
|
body: serde_json::to_vec(&VfsResponse::ReadDir(entries)).unwrap(),
|
||||||
@ -168,11 +154,11 @@ async fn handle_request(
|
|||||||
capabilities: vec![],
|
capabilities: vec![],
|
||||||
},
|
},
|
||||||
None,
|
None,
|
||||||
)),
|
)))
|
||||||
lazy_load_blob: None,
|
.build()
|
||||||
};
|
.unwrap()
|
||||||
|
.send(send_to_loop)
|
||||||
let _ = send_to_loop.send(response).await;
|
.await;
|
||||||
return Ok(());
|
return Ok(());
|
||||||
} else {
|
} else {
|
||||||
return Err(VfsError::NoCap {
|
return Err(VfsError::NoCap {
|
||||||
@ -451,29 +437,26 @@ async fn handle_request(
|
|||||||
process: km.source.process,
|
process: km.source.process,
|
||||||
})
|
})
|
||||||
}) {
|
}) {
|
||||||
let _ = send_to_loop
|
KernelMessage::builder()
|
||||||
.send(KernelMessage {
|
.id(km.id)
|
||||||
id: km.id,
|
.source((our_node, VFS_PROCESS_ID.clone()))
|
||||||
source: Address {
|
.target(target)
|
||||||
node: our_node.to_string(),
|
.message(Message::Response((
|
||||||
process: VFS_PROCESS_ID.clone(),
|
Response {
|
||||||
|
inherit: false,
|
||||||
|
body: serde_json::to_vec(&response_body).unwrap(),
|
||||||
|
metadata,
|
||||||
|
capabilities: vec![],
|
||||||
},
|
},
|
||||||
target,
|
None,
|
||||||
rsvp: None,
|
)))
|
||||||
message: Message::Response((
|
.lazy_load_blob(bytes.map(|bytes| LazyLoadBlob {
|
||||||
Response {
|
mime: Some("application/octet-stream".into()),
|
||||||
inherit: false,
|
bytes,
|
||||||
body: serde_json::to_vec(&response_body).unwrap(),
|
}))
|
||||||
metadata,
|
.build()
|
||||||
capabilities: vec![],
|
.unwrap()
|
||||||
},
|
.send(send_to_loop)
|
||||||
None,
|
|
||||||
)),
|
|
||||||
lazy_load_blob: bytes.map(|bytes| LazyLoadBlob {
|
|
||||||
mime: Some("application/octet-stream".into()),
|
|
||||||
bytes,
|
|
||||||
}),
|
|
||||||
})
|
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -822,21 +805,18 @@ fn join_paths_safely(base: &PathBuf, extension: &str) -> PathBuf {
|
|||||||
base.join(extension_path)
|
base.join(extension_path)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn make_error_message(
|
async fn make_error_message(
|
||||||
our_node: String,
|
our_node: &str,
|
||||||
id: u64,
|
id: u64,
|
||||||
source: Address,
|
source: Address,
|
||||||
error: VfsError,
|
error: VfsError,
|
||||||
) -> KernelMessage {
|
send_to_loop: &MessageSender,
|
||||||
KernelMessage {
|
) {
|
||||||
id,
|
KernelMessage::builder()
|
||||||
source: Address {
|
.id(id)
|
||||||
node: our_node,
|
.source((our_node, VFS_PROCESS_ID.clone()))
|
||||||
process: VFS_PROCESS_ID.clone(),
|
.target(source)
|
||||||
},
|
.message(Message::Response((
|
||||||
target: source,
|
|
||||||
rsvp: None,
|
|
||||||
message: Message::Response((
|
|
||||||
Response {
|
Response {
|
||||||
inherit: false,
|
inherit: false,
|
||||||
body: serde_json::to_vec(&VfsResponse::Err(error)).unwrap(),
|
body: serde_json::to_vec(&VfsResponse::Err(error)).unwrap(),
|
||||||
@ -844,7 +824,9 @@ fn make_error_message(
|
|||||||
capabilities: vec![],
|
capabilities: vec![],
|
||||||
},
|
},
|
||||||
None,
|
None,
|
||||||
)),
|
)))
|
||||||
lazy_load_blob: None,
|
.build()
|
||||||
}
|
.unwrap()
|
||||||
|
.send(send_to_loop)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
@ -1138,6 +1138,75 @@ pub struct KernelMessage {
|
|||||||
pub lazy_load_blob: Option<LazyLoadBlob>,
|
pub lazy_load_blob: Option<LazyLoadBlob>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl KernelMessage {
|
||||||
|
pub fn builder() -> KernelMessageBuilder {
|
||||||
|
KernelMessageBuilder::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn send(self, sender: &MessageSender) {
|
||||||
|
sender.send(self).await.expect("kernel message sender died");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct KernelMessageBuilder {
|
||||||
|
id: u64,
|
||||||
|
source: Option<Address>,
|
||||||
|
target: Option<Address>,
|
||||||
|
rsvp: Rsvp,
|
||||||
|
message: Option<Message>,
|
||||||
|
lazy_load_blob: Option<LazyLoadBlob>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl KernelMessageBuilder {
|
||||||
|
pub fn id(mut self, id: u64) -> Self {
|
||||||
|
self.id = id;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn source<T>(mut self, source: T) -> Self
|
||||||
|
where
|
||||||
|
T: Into<Address>,
|
||||||
|
{
|
||||||
|
self.source = Some(source.into());
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn target<T>(mut self, target: T) -> Self
|
||||||
|
where
|
||||||
|
T: Into<Address>,
|
||||||
|
{
|
||||||
|
self.target = Some(target.into());
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn rsvp(mut self, rsvp: Rsvp) -> Self {
|
||||||
|
self.rsvp = rsvp;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn message(mut self, message: Message) -> Self {
|
||||||
|
self.message = Some(message);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn lazy_load_blob(mut self, blob: Option<LazyLoadBlob>) -> Self {
|
||||||
|
self.lazy_load_blob = blob;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn build(self) -> Result<KernelMessage, String> {
|
||||||
|
Ok(KernelMessage {
|
||||||
|
id: self.id,
|
||||||
|
source: self.source.ok_or("Source address is required")?,
|
||||||
|
target: self.target.ok_or("Target address is required")?,
|
||||||
|
rsvp: self.rsvp,
|
||||||
|
message: self.message.ok_or("Message is required")?,
|
||||||
|
lazy_load_blob: self.lazy_load_blob,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for KernelMessage {
|
impl std::fmt::Display for KernelMessage {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
write!(
|
write!(
|
||||||
@ -1173,6 +1242,22 @@ pub struct Printout {
|
|||||||
pub content: String,
|
pub content: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Printout {
|
||||||
|
pub fn new<T>(verbosity: u8, content: T) -> Self
|
||||||
|
where
|
||||||
|
T: Into<String>,
|
||||||
|
{
|
||||||
|
Self {
|
||||||
|
verbosity,
|
||||||
|
content: content.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn send(self, sender: &PrintSender) {
|
||||||
|
sender.send(self).await.expect("print sender died");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// kernel sets in case, e.g.,
|
// kernel sets in case, e.g.,
|
||||||
// A requests response from B does not request response from C
|
// A requests response from B does not request response from C
|
||||||
// -> kernel sets `Some(A) = Rsvp` for B's request to C
|
// -> kernel sets `Some(A) = Rsvp` for B's request to C
|
||||||
|
Loading…
Reference in New Issue
Block a user