From ba5bf412e53d71553893a7f3509b01816ee9ed96 Mon Sep 17 00:00:00 2001 From: Zaggy1024 Date: Thu, 13 Apr 2023 20:48:55 -0500 Subject: [PATCH] LibThreading: Create WorkerThread class run a single task concurrently This class can be used to run a task in another thread, and allows the caller to wait for the task to complete to retrieve any error that may have occurred. Currently, it doesn't support functions returning a value on success, but with some template magic that should be possible. :^) --- AK/Debug.h.in | 4 + Meta/CMake/all_the_debug_macros.cmake | 1 + Userland/Libraries/LibThreading/Forward.h | 15 ++ .../Libraries/LibThreading/WorkerThread.h | 172 ++++++++++++++++++ 4 files changed, 192 insertions(+) create mode 100644 Userland/Libraries/LibThreading/Forward.h create mode 100644 Userland/Libraries/LibThreading/WorkerThread.h diff --git a/AK/Debug.h.in b/AK/Debug.h.in index 79a8e95c321..08f8ec17d1e 100644 --- a/AK/Debug.h.in +++ b/AK/Debug.h.in @@ -538,6 +538,10 @@ # cmakedefine01 WINDOWMANAGER_DEBUG #endif +#ifndef WORKER_THREAD_DEBUG +# cmakedefine01 WORKER_THREAD_DEBUG +#endif + #ifndef WSMESSAGELOOP_DEBUG # cmakedefine01 WSMESSAGELOOP_DEBUG #endif diff --git a/Meta/CMake/all_the_debug_macros.cmake b/Meta/CMake/all_the_debug_macros.cmake index 1df7a89e55f..a44e5b13e3b 100644 --- a/Meta/CMake/all_the_debug_macros.cmake +++ b/Meta/CMake/all_the_debug_macros.cmake @@ -222,6 +222,7 @@ set(WEB_FETCH_DEBUG ON) set(WEB_WORKER_DEBUG ON) set(WEBP_DEBUG ON) set(WINDOWMANAGER_DEBUG ON) +set(WORKER_THREAD_DEBUG ON) set(WSMESSAGELOOP_DEBUG ON) set(WSSCREEN_DEBUG ON) set(XML_PARSER_DEBUG ON) diff --git a/Userland/Libraries/LibThreading/Forward.h b/Userland/Libraries/LibThreading/Forward.h new file mode 100644 index 00000000000..b15f5013f7c --- /dev/null +++ b/Userland/Libraries/LibThreading/Forward.h @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2019-2020, Sergey Bugaev + * Copyright (c) 2021, Spencer Dixon + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#pragma once + +namespace Threading { + +template +class WorkerThread; + +} diff --git a/Userland/Libraries/LibThreading/WorkerThread.h b/Userland/Libraries/LibThreading/WorkerThread.h new file mode 100644 index 00000000000..690d1ffc1e8 --- /dev/null +++ b/Userland/Libraries/LibThreading/WorkerThread.h @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2022, Gregory Bertilson + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace Threading { + +// Macro to allow single-line logging prints with fields that only exist in debug mode. +#if WORKER_THREAD_DEBUG +# define WORKER_LOG(args...) ({ dbgln(args); }) +#else +# define WORKER_LOG(args...) +#endif + +template +class WorkerThread { + enum class State { + Idle, + Working, + Stopped, + }; + using WorkerTask = Function()>; + using WorkerState = Variant; + +public: + static ErrorOr> create(StringView name) + { + auto worker_thread = TRY(adopt_nonnull_own_or_enomem(new (nothrow) WorkerThread())); + worker_thread->m_thread = TRY(Threading::Thread::try_create([&self = *worker_thread]() { + WORKER_LOG("Starting worker loop {}", self.m_id); + + while (true) { + self.m_mutex.lock(); + if (self.m_stop) { + WORKER_LOG("Exiting {}", self.m_id); + self.m_state = State::Stopped; + self.m_condition.broadcast(); + self.m_mutex.unlock(); + return 0; + } + if (self.m_state.template has()) { + auto task = move(self.m_state.template get()); + self.m_state = State::Working; + self.m_mutex.unlock(); + + WORKER_LOG("Starting task on {}", self.m_id); + auto result = task(); + if (result.is_error()) { + WORKER_LOG("Task finished on {} with error", self.m_id); + self.m_mutex.lock(); + self.m_state = result.release_error(); + self.m_condition.broadcast(); + } else { + WORKER_LOG("Task finished successfully on {}", self.m_id); + self.m_mutex.lock(); + self.m_state = State::Idle; + self.m_condition.broadcast(); + } + } + WORKER_LOG("Awaiting new task in {}...", self.m_id); + self.m_condition.wait(); + WORKER_LOG("Worker thread awoken in {}", self.m_id); + self.m_mutex.unlock(); + } + + return 0; + }, + name)); + worker_thread->m_thread->start(); + return worker_thread; + } + + ~WorkerThread() + { + m_mutex.lock(); + m_stop = true; + m_condition.broadcast(); + while (!is_in_state(State::Stopped)) + m_condition.wait(); + m_mutex.unlock(); + (void)m_thread->join(); + WORKER_LOG("Worker thread {} joined successfully", m_id); + } + + // Returns whether the task is starting. + bool start_task(WorkerTask&& task) + { + m_mutex.lock(); + VERIFY(!is_in_state(State::Stopped)); + + bool start_work = false; + if (is_in_state(State::Idle)) { + start_work = true; + } else if (m_state.template has()) { + WORKER_LOG("Starting task and ignoring previous error: {}", m_state.template get().string_literal()); + start_work = true; + } + if (start_work) { + WORKER_LOG("Queuing task on {}", m_id); + m_state = move(task); + m_condition.broadcast(); + } + + m_mutex.unlock(); + return start_work; + } + + ErrorOr wait_until_task_is_finished() + { + WORKER_LOG("Waiting for task to finish on {}...", m_id); + m_mutex.lock(); + while (true) { + if (m_state.template has() || is_in_state(State::Working)) { + m_condition.wait(); + } else if (m_state.template has()) { + auto error = move(m_state.template get()); + m_state = State::Idle; + m_mutex.unlock(); + WORKER_LOG("Finished waiting with error on {}: {}", m_id, error.string_literal()); + return error; + } else { + m_mutex.unlock(); + WORKER_LOG("Finished waiting on {}", m_id); + return {}; + } + } + m_mutex.unlock(); + } + +private: +#if WORKER_THREAD_DEBUG + static inline size_t current_id = 0; +#endif + + WorkerThread() + : m_condition(m_mutex) +#if WORKER_THREAD_DEBUG + , m_id(current_id++) +#endif + { + } + WorkerThread(WorkerThread const&) = delete; + WorkerThread(WorkerThread&&) = delete; + + // Must be called with the mutex locked. + bool is_in_state(State state) + { + return m_state.template has() && m_state.template get() == state; + } + + RefPtr m_thread; + Threading::Mutex m_mutex; + Threading::ConditionVariable m_condition; + WorkerState m_state { State::Idle }; + bool m_stop { false }; +#if WORKER_THREAD_DEBUG + size_t m_id; +#endif +}; + +#undef WORKER_LOG + +}