From 690141ff8b37aa39ec261afce9f1ad435ac54915 Mon Sep 17 00:00:00 2001 From: Sergey Bugaev Date: Mon, 5 Jul 2021 15:10:34 +0300 Subject: [PATCH] LibPthread: Reimplement semaphores This implementation does not use locking or condition variables internally; it's purely based on atomics and futexes. Notably, concurrent sem_wait() and sem_post() calls can run *completely in parallel* without slowing each other down, as long as there are empty slots for them all to succeed without blocking. Additionally, sem_wait() never executes an atomic operation with release ordering, and sem_post() never executes an atomic operation with acquire ordering (unless you count the syscall). This means the compiler and the hardware are free to reorder code *into* the critical section. --- Userland/Libraries/LibPthread/semaphore.cpp | 234 +++++++++----------- Userland/Libraries/LibPthread/semaphore.h | 5 +- 2 files changed, 107 insertions(+), 132 deletions(-) diff --git a/Userland/Libraries/LibPthread/semaphore.cpp b/Userland/Libraries/LibPthread/semaphore.cpp index 3983e6da4b7..bf579b7c148 100644 --- a/Userland/Libraries/LibPthread/semaphore.cpp +++ b/Userland/Libraries/LibPthread/semaphore.cpp @@ -1,12 +1,26 @@ /* - * Copyright (c) 2021, the SerenityOS developers. + * Copyright (c) 2021, Gunnar Beutner + * Copyright (c) 2021, Sergey Bugaev * * SPDX-License-Identifier: BSD-2-Clause */ #include +#include +#include #include #include +#include + +// Whether sem_wait() or sem_post() is responsible for waking any sleeping +// threads. +static constexpr u32 POST_WAKES = 1 << 31; + +sem_t* sem_open(const char*, int, ...) +{ + errno = ENOSYS; + return nullptr; +} int sem_close(sem_t*) { @@ -14,42 +28,10 @@ int sem_close(sem_t*) return -1; } -int sem_destroy(sem_t* sem) +int sem_unlink(const char*) { - auto rc = pthread_mutex_destroy(&sem->mtx); - if (rc != 0) { - errno = rc; - return -1; - } - - rc = pthread_cond_destroy(&sem->cv); - if (rc != 0) { - errno = rc; - return -1; - } - - return 0; -} - -int sem_getvalue(sem_t* sem, int* sval) -{ - auto rc = pthread_mutex_trylock(&sem->mtx); - - if (rc == EBUSY) { - *sval = 0; - return 0; - } - - if (rc != 0) { - errno = rc; - return -1; - } - - *sval = sem->value; - - pthread_mutex_unlock(&sem->mtx); - - return 0; + errno = ENOSYS; + return -1; } int sem_init(sem_t* sem, int shared, unsigned int value) @@ -64,116 +46,110 @@ int sem_init(sem_t* sem, int shared, unsigned int value) return -1; } - auto rc = pthread_mutex_init(&sem->mtx, nullptr); - if (rc != 0) { - errno = rc; - return -1; - } - - rc = pthread_cond_init(&sem->cv, nullptr); - if (rc != 0) { - errno = rc; - return -1; - } - sem->value = value; - return 0; } -sem_t* sem_open(const char*, int, ...) +int sem_destroy(sem_t*) { - errno = ENOSYS; - return nullptr; + return 0; +} + +int sem_getvalue(sem_t* sem, int* sval) +{ + u32 value = AK::atomic_load(&sem->value, AK::memory_order_relaxed); + *sval = value & ~POST_WAKES; + return 0; } int sem_post(sem_t* sem) { - auto rc = pthread_mutex_lock(&sem->mtx); - if (rc != 0) { - errno = rc; - return -1; - } - - if (sem->value == SEM_VALUE_MAX) { - pthread_mutex_unlock(&sem->mtx); - errno = EOVERFLOW; - return -1; - } - - sem->value++; - - rc = pthread_cond_signal(&sem->cv); - if (rc != 0) { - pthread_mutex_unlock(&sem->mtx); - errno = rc; - return -1; - } - - rc = pthread_mutex_unlock(&sem->mtx); - if (rc != 0) { - errno = rc; - return -1; - } + u32 value = AK::atomic_fetch_add(&sem->value, 1u, AK::memory_order_release); + // Fast path: no need to wake. + if (!(value & POST_WAKES)) [[likely]] + return 0; + // Pass the responsibility for waking more threads if more slots become + // available later to sem_wait() in the thread we're about to wake, as + // opposed to further sem_post() calls that free up those slots. + value = AK::atomic_fetch_and(&sem->value, ~POST_WAKES, AK::memory_order_relaxed); + // Check if another sem_post() call has handled it already. + if (!(value & POST_WAKES)) [[likely]] + return 0; + int rc = futex_wake(&sem->value, 1); + VERIFY(rc == 0); return 0; } int sem_trywait(sem_t* sem) { - auto rc = pthread_mutex_lock(&sem->mtx); - if (rc != 0) { - errno = rc; - return -1; - } - - if (sem->value == 0) { - pthread_mutex_unlock(&sem->mtx); - errno = EAGAIN; - return -1; - } - - sem->value--; - - rc = pthread_mutex_unlock(&sem->mtx); - if (rc != 0) { - errno = rc; - return -1; - } - - return 0; -} - -int sem_unlink(const char*) -{ - errno = ENOSYS; - return -1; + u32 value = AK::atomic_load(&sem->value, AK::memory_order_relaxed); + u32 count = value & ~POST_WAKES; + if (count == 0) + return EAGAIN; + // Decrement the count without touching the flag. + u32 desired = (count - 1) | (value & POST_WAKES); + bool exchanged = AK::atomic_compare_exchange_strong(&sem->value, value, desired, AK::memory_order_acquire); + if (exchanged) [[likely]] + return 0; + else + return EAGAIN; } int sem_wait(sem_t* sem) { - auto rc = pthread_mutex_lock(&sem->mtx); - if (rc != 0) { - errno = rc; - return -1; - } - - while (sem->value == 0) { - rc = pthread_cond_wait(&sem->cv, &sem->mtx); - if (rc != 0) { - pthread_mutex_unlock(&sem->mtx); - errno = rc; - return -1; - } - } - - sem->value--; - - rc = pthread_mutex_unlock(&sem->mtx); - if (rc != 0) { - errno = rc; - return -1; - } - - return 0; + return sem_timedwait(sem, nullptr); +} + +int sem_timedwait(sem_t* sem, const struct timespec* abstime) +{ + u32 value = AK::atomic_load(&sem->value, AK::memory_order_relaxed); + bool responsible_for_waking = false; + + while (true) { + u32 count = value & ~POST_WAKES; + if (count > 0) [[likely]] { + // It looks like there are some free slots. + u32 whether_post_wakes = value & POST_WAKES; + bool going_to_wake = false; + if (responsible_for_waking && !whether_post_wakes) { + // If we have ourselves been woken up previously, and the + // POST_WAKES flag is not set, that means some more slots might + // be available now, and it's us who has to wake up additional + // threads. + if (count > 1) [[unlikely]] + going_to_wake = true; + // Pass the responsibility for waking up further threads back to + // sem_post() calls. In particular, we don't want the threads + // we're about to wake to try to wake anyone else. + whether_post_wakes = POST_WAKES; + } + // Now, try to commit this. + u32 desired = (count - 1) | whether_post_wakes; + bool exchanged = AK::atomic_compare_exchange_strong(&sem->value, value, desired, AK::memory_order_acquire); + if (!exchanged) [[unlikely]] + // Re-evaluate. + continue; + if (going_to_wake) [[unlikely]] { + int rc = futex_wake(&sem->value, count - 1); + VERIFY(rc >= 0); + } + return 0; + } + // We're probably going to sleep, so attempt to set the flag. We do not + // commit to sleeping yet, though, as setting the flag may fail and + // cause us to reevaluate what we're doing. + if (value == 0) { + bool exchanged = AK::atomic_compare_exchange_strong(&sem->value, value, POST_WAKES, AK::memory_order_relaxed); + if (!exchanged) [[unlikely]] + // Re-evaluate. + continue; + value = POST_WAKES; + } + // At this point, we're committed to sleeping. + responsible_for_waking = true; + futex_wait(&sem->value, value, abstime, CLOCK_REALTIME); + // This is the state we will probably see upon being waked: + value = 1; + } } diff --git a/Userland/Libraries/LibPthread/semaphore.h b/Userland/Libraries/LibPthread/semaphore.h index f75b779c703..8e71c813cf9 100644 --- a/Userland/Libraries/LibPthread/semaphore.h +++ b/Userland/Libraries/LibPthread/semaphore.h @@ -14,9 +14,7 @@ __BEGIN_DECLS typedef struct { - pthread_mutex_t mtx; - pthread_cond_t cv; - int value; + uint32_t value; } sem_t; int sem_close(sem_t*); @@ -28,6 +26,7 @@ int sem_post(sem_t*); int sem_trywait(sem_t*); int sem_unlink(const char*); int sem_wait(sem_t*); +int sem_timedwait(sem_t*, const struct timespec* abstime); #define SEM_VALUE_MAX INT_MAX