1
1
mirror of https://github.com/Nukesor/pueue.git synced 2024-10-03 19:27:25 +03:00

refactor: Revisit some docs, restructure some code

This commit is contained in:
Arne Beer 2024-09-23 15:48:11 +02:00
parent e73d2a5261
commit cee1cc9812
No known key found for this signature in database
GPG Key ID: CC9408F679023B65
8 changed files with 186 additions and 169 deletions

View File

@ -13,7 +13,7 @@ use pueue_lib::{
use super::state_helper::LockedState;
/// Users can specify a callback that's fired whenever a task finishes.
/// Execute the callback by spawning a new subprocess.
/// The callback is performed by spawning a new subprocess.
pub fn spawn_callback(settings: &Settings, state: &mut LockedState, task: &Task) {
// Return early, if there's no callback specified
let Some(template_string) = &settings.daemon.callback else {
@ -110,8 +110,8 @@ pub fn build_callback_command(
handlebars.render_template(template_string, &parameters)
}
/// Look at all running callbacks and log any errors.
/// If everything went smoothly, simply remove them from the list.
/// Look at all running callbacks and check if they're still running.
/// Handle finished callbacks and log their outcome.
pub fn check_callbacks(state: &mut LockedState) {
let mut finished = Vec::new();
for (id, child) in state.callbacks.iter_mut().enumerate() {

View File

@ -1,143 +0,0 @@
use std::io::Read;
use std::path::Path;
use std::time::Duration;
use anyhow::Result;
use pueue_lib::log::*;
use pueue_lib::network::message::*;
use pueue_lib::network::protocol::{send_message, GenericStream};
use pueue_lib::state::SharedState;
/// Handle the continuous stream of a some log output.
///
/// It's not actually a stream in the sense of a low-level network stream, but rather a series of
/// `Message::Stream` messages, that each send a portion of new log output.
///
/// It's basically our own chunked stream implementation on top of the protocol we established.
pub async fn handle_follow(
pueue_directory: &Path,
stream: &mut GenericStream,
state: &SharedState,
message: StreamRequestMessage,
) -> Result<Message> {
// The user can specify the id of the task they want to follow
// If the id isn't specified and there's only a single running task, this task will be used.
// However, if there are multiple running tasks, the user will have to specify an id.
let task_id = if let Some(task_id) = message.task_id {
task_id
} else {
// Get all ids of running tasks
let state = state.lock().unwrap();
let running_ids: Vec<_> = state
.tasks
.iter()
.filter_map(|(&id, t)| if t.is_running() { Some(id) } else { None })
.collect();
// Return a message on "no" or multiple running tasks.
match running_ids.len() {
0 => {
return Ok(create_failure_message("There are no running tasks."));
}
1 => running_ids[0],
_ => {
let running_ids = running_ids
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
.join(", ");
return Ok(create_failure_message(format!(
"Multiple tasks are running, please select one of the following: {running_ids}"
)));
}
}
};
// It might be that the task is not yet running.
// Ensure that it exists and is started.
loop {
{
let state = state.lock().unwrap();
let Some(task) = state.tasks.get(&task_id) else {
return Ok(create_failure_message(
"Pueue: The task to be followed doesn't exist.",
));
};
// The task is running or finished, we can start to follow.
if task.is_running() || task.is_done() {
break;
}
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}
let mut handle = match get_log_file_handle(task_id, pueue_directory) {
Err(_) => {
return Ok(create_failure_message(
"Couldn't find output files for task. Maybe it finished? Try `log`",
))
}
Ok(handle) => handle,
};
// Get the output path.
// We need to check continuously, whether the file still exists,
// since the file can go away (e.g. due to finishing a task).
let path = get_log_path(task_id, pueue_directory);
// If `lines` is passed as an option, we only want to show the last `X` lines.
// To achieve this, we seek the file handle to the start of the `Xth` line
// from the end of the file.
// The loop following this section will then only copy those last lines to stdout.
if let Some(lines) = message.lines {
if let Err(err) = seek_to_last_lines(&mut handle, lines) {
println!("Error seeking to last lines from log: {err}");
}
}
loop {
// Check whether the file still exists. Exit if it doesn't.
if !path.exists() {
return Ok(create_success_message(
"Pueue: Log file has gone away. Has the task been removed?",
));
}
// Read the next chunk of text from the last position.
let mut buffer = Vec::new();
if let Err(err) = handle.read_to_end(&mut buffer) {
return Ok(create_failure_message(format!("Pueue Error: {err}")));
};
let text = String::from_utf8_lossy(&buffer).to_string();
// Only send a message, if there's actual new content.
if !text.is_empty() {
// Send the next chunk.
let response = Message::Stream(text);
send_message(response, stream).await?;
}
// Check if the task in question does:
// 1. Still exist
// 2. Is still running
//
// In case it's not, close the stream.
{
let state = state.lock().unwrap();
let Some(task) = state.tasks.get(&task_id) else {
return Ok(create_failure_message(
"Pueue: The followed task has been removed.",
));
};
// The task is done, just close the stream.
if !task.is_running() {
return Ok(Message::Close);
}
}
// Wait for 1 second before sending the next chunk.
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}

View File

@ -54,7 +54,7 @@ pub fn add_task(settings: &Settings, state: &SharedState, message: AddMessage) -
// If one is found, we expand the command, otherwise we just take the original command.
// Anyhow, we save this separately and keep the original command in a separate field.
//
// This allows us to have a debug experience and the user can opt to either show the
// This gives us better debugging capabilities and the user can opt to either show the
// original command or the expanded command in their `status` view.
task.command = insert_alias(settings, task.original_command.clone());

View File

@ -1,8 +1,15 @@
use std::collections::BTreeMap;
use std::io::Read;
use std::path::Path;
use std::time::Duration;
use anyhow::Result;
use pueue_lib::failure_msg;
use pueue_lib::log::read_and_compress_log_file;
use pueue_lib::log::*;
use pueue_lib::network::message::*;
use pueue_lib::network::protocol::{send_message, GenericStream};
use pueue_lib::settings::Settings;
use pueue_lib::state::SharedState;
@ -49,3 +56,136 @@ pub fn get_log(settings: &Settings, state: &SharedState, message: LogRequestMess
}
Message::LogResponse(tasks)
}
/// Handle the continuous stream of a some log output.
///
/// It's not actually a stream in the sense of a low-level network stream, but rather a series of
/// `Message::Stream` messages, that each send a portion of new log output.
///
/// It's basically our own chunked stream implementation on top of the protocol we established.
pub async fn follow_log(
pueue_directory: &Path,
stream: &mut GenericStream,
state: &SharedState,
message: StreamRequestMessage,
) -> Result<Message> {
// The user can specify the id of the task they want to follow
// If the id isn't specified and there's only a single running task, this task will be used.
// However, if there are multiple running tasks, the user will have to specify an id.
let task_id = if let Some(task_id) = message.task_id {
task_id
} else {
// Get all ids of running tasks
let state = state.lock().unwrap();
let running_ids: Vec<_> = state
.tasks
.iter()
.filter_map(|(&id, t)| if t.is_running() { Some(id) } else { None })
.collect();
// Return a message on "no" or multiple running tasks.
match running_ids.len() {
0 => {
return Ok(create_failure_message("There are no running tasks."));
}
1 => running_ids[0],
_ => {
let running_ids = running_ids
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
.join(", ");
return Ok(create_failure_message(format!(
"Multiple tasks are running, please select one of the following: {running_ids}"
)));
}
}
};
// It might be that the task is not yet running.
// Ensure that it exists and is started.
loop {
{
let state = state.lock().unwrap();
let Some(task) = state.tasks.get(&task_id) else {
return Ok(create_failure_message(
"Pueue: The task to be followed doesn't exist.",
));
};
// The task is running or finished, we can start to follow.
if task.is_running() || task.is_done() {
break;
}
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}
let mut handle = match get_log_file_handle(task_id, pueue_directory) {
Err(_) => {
return Ok(create_failure_message(
"Couldn't find output files for task. Maybe it finished? Try `log`",
))
}
Ok(handle) => handle,
};
// Get the output path.
// We need to check continuously, whether the file still exists,
// since the file can go away (e.g. due to finishing a task).
let path = get_log_path(task_id, pueue_directory);
// If `lines` is passed as an option, we only want to show the last `X` lines.
// To achieve this, we seek the file handle to the start of the `Xth` line
// from the end of the file.
// The loop following this section will then only copy those last lines to stdout.
if let Some(lines) = message.lines {
if let Err(err) = seek_to_last_lines(&mut handle, lines) {
println!("Error seeking to last lines from log: {err}");
}
}
loop {
// Check whether the file still exists. Exit if it doesn't.
if !path.exists() {
return Ok(create_success_message(
"Pueue: Log file has gone away. Has the task been removed?",
));
}
// Read the next chunk of text from the last position.
let mut buffer = Vec::new();
if let Err(err) = handle.read_to_end(&mut buffer) {
return Ok(create_failure_message(format!("Pueue Error: {err}")));
};
let text = String::from_utf8_lossy(&buffer).to_string();
// Only send a message, if there's actual new content.
if !text.is_empty() {
// Send the next chunk.
let response = Message::Stream(text);
send_message(response, stream).await?;
}
// Check if the task in question does:
// 1. Still exist
// 2. Is still running
//
// In case it's not, close the stream.
{
let state = state.lock().unwrap();
let Some(task) = state.tasks.get(&task_id) else {
return Ok(create_failure_message(
"Pueue: The followed task has been removed.",
));
};
// The task is done, just close the stream.
if !task.is_running() {
return Ok(Message::Close);
}
}
// Wait for 1 second before sending the next chunk.
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}

View File

@ -25,6 +25,8 @@ mod start;
mod stash;
mod switch;
pub use log::follow_log;
pub fn handle_message(message: Message, state: &SharedState, settings: &Settings) -> Message {
match message {
Message::Add(message) => add::add_task(settings, state, message),

View File

@ -1,4 +1,3 @@
pub mod follow_log;
pub mod message_handler;
pub mod response_helper;
pub mod socket;

View File

@ -21,11 +21,21 @@ pub fn ensure_group_exists<'state>(
)))
}
/// Compile a response for actions that affect several given tasks.
/// These actions can sometimes only succeed for a part of the given tasks.
/// Compile a response for an action that affect several given tasks.
/// That action can sometimes only succeed for a portion of the given tasks.
/// E.g. only running tasks can be killed.
///
/// That's why this helper exists, which determines for which tasks the action succeeded
/// and which tasks failed, based on a given `filter` criterion.
/// That's why this helper exists, which determines for which tasks an action succeeds
/// and which tasks fail, based on a given `filter` criterion.
/// ```rs
/// task_ids = vec![1, 2, 4];
/// task_action_response_helper(
/// "Tasks are being killed",
/// task_ids.clone(),
/// |task| task.is_running(),
/// &state,
/// ),
/// ```
pub fn task_action_response_helper<F>(
message: &str,
task_ids: Vec<usize>,

View File

@ -12,12 +12,14 @@ use pueue_lib::network::secret::read_shared_secret;
use pueue_lib::settings::Settings;
use pueue_lib::state::SharedState;
use crate::daemon::network::follow_log::handle_follow;
use crate::daemon::network::message_handler::handle_message;
use crate::daemon::process_handler::initiate_shutdown;
/// Poll the listener and accept new incoming connections.
/// Create a new future to handle the message and spawn it.
use super::message_handler::follow_log;
/// Listen for new connections on the socket.
/// On a new connection, the connected stream will be handled in a separate tokio task.
/// See [handle_incoming] for the actual connection handler function.
pub async fn accept_incoming(settings: Settings, state: SharedState) -> Result<()> {
let listener = get_listener(&settings.shared).await?;
// Read secret once to prevent multiple disk reads.
@ -43,9 +45,19 @@ pub async fn accept_incoming(settings: Settings, state: SharedState) -> Result<(
}
}
/// Continuously poll the existing incoming futures.
/// In case we received an instruction, handle it and create a response future.
/// The response future is added to unix_responses and handled in a separate function.
/// Handle a new connection from a client.
///
/// Pueue has a very simple protocol that needs to be followed.
/// 1. Client sends secret for authentication
/// 2. If secret is valid, the daemon sends its own version to the client.
/// 3. The Client sends the instruction message.
/// 4. The Daemon reads the instruction and acts upon it.
/// 5. The Daemon sends a response
///
/// There're two edge-cases where this pattern is not valid:
/// 1. Shutdown. In that case the message is sent first and the daemon shuts down afterwards.
/// 2. Streaming of logs. The Daemon will continuously send messages with log chunks until
/// the watched task finished or the client disconnects.
async fn handle_incoming(
mut stream: GenericStream,
state: SharedState,
@ -78,14 +90,11 @@ async fn handle_incoming(
bail!("Received invalid secret");
}
// Send a short `ok` byte to the client, so it knows that the secret has been accepted.
// This is also the current version of the daemon, so the client can inform the user if the
// daemon needs a restart in case a version difference exists.
// Send confirmation to the client, that the secret was valid.
// This is also the current version of the daemon, so the client can inform user if the
// daemon needs a restart in case of a version mismatch.
send_bytes(crate_version!().as_bytes(), &mut stream).await?;
// Get the directory for convenience purposes.
let pueue_directory = settings.shared.pueue_directory();
loop {
// Receive the actual instruction from the client
let message_result = receive_message(&mut stream).await;
@ -111,13 +120,13 @@ async fn handle_incoming(
// The client requested the output of a task.
// Since this involves streaming content, we have to do some special handling.
Message::StreamRequest(message) => {
handle_follow(&pueue_directory, &mut stream, &state, message).await?
let pueue_directory = settings.shared.pueue_directory();
follow_log(&pueue_directory, &mut stream, &state, message).await?
}
// Initialize the shutdown procedure.
// The message is forwarded to the TaskHandler, which is responsible for
// gracefully shutting down.
// To initiated a shutdown, a flag in Pueue's state is set that informs the TaskHandler
// to perform a graceful shutdown.
//
// This is an edge-case as we have respond to the client first.
// However, this is an edge-case as we have respond to the client first.
// Otherwise it might happen, that the daemon shuts down too fast and we aren't
// capable of actually sending the message back to the client.
Message::DaemonShutdown(shutdown_type) => {