diff --git a/src/translator/pcqueue.h b/src/translator/pcqueue.h index f0b3541..d6f4582 100644 --- a/src/translator/pcqueue.h +++ b/src/translator/pcqueue.h @@ -10,12 +10,14 @@ #include #ifdef __APPLE__ -#include -#include #include #include +#include +#include #elif defined(__linux) #include +#elif defined(_WIN32) || defined(_WIN64) +#include #else #include #endif @@ -35,67 +37,107 @@ namespace bergamot { #ifdef __APPLE__ class Semaphore { -public: - explicit Semaphore(int value) : task_(mach_task_self()) { - ABORT_IF(KERN_SUCCESS != - semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value), - "Could not create semaphore"); - } - - ~Semaphore() { - if (KERN_SUCCESS != semaphore_destroy(task_, back_)) { - std::cerr << "Could not destroy semaphore" << std::endl; - abort(); + public: + explicit Semaphore(int value) : task_(mach_task_self()) { + ABORT_IF(KERN_SUCCESS != semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value), "Could not create semaphore"); } - } - void wait() { - ABORT_IF(KERN_SUCCESS != semaphore_wait(back_), - "Wait for semaphore failed"); - } + ~Semaphore() { + if (KERN_SUCCESS != semaphore_destroy(task_, back_)) { + std::cerr << "Could not destroy semaphore" << std::endl; + abort(); + } + } - void post() { - ABORT_IF(KERN_SUCCESS != semaphore_signal(back_), - "Could not post to semaphore"); - } + void wait() { + ABORT_IF(KERN_SUCCESS != semaphore_wait(back_), "Wait for semaphore failed"); + } -private: - semaphore_t back_; - task_t task_; + void post() { + ABORT_IF(KERN_SUCCESS != semaphore_signal(back_), "Could not post to semaphore"); + } + + private: + semaphore_t back_; + task_t task_; }; -inline void WaitSemaphore(Semaphore &semaphore) { semaphore.wait(); } +inline void WaitSemaphore(Semaphore &semaphore) { + semaphore.wait(); +} #elif defined(__linux) class Semaphore { -public: - explicit Semaphore(unsigned int value) { - ABORT_IF(sem_init(&sem_, 0, value), "Could not create semaphore"); - } - - ~Semaphore() { - if (-1 == sem_destroy(&sem_)) { - std::cerr << "Could not destroy semaphore " << std::endl; - abort(); + public: + explicit Semaphore(unsigned int value) { + ABORT_IF(sem_init(&sem_, 0, value), "Could not create semaphore"); } - } - void wait() { - while (UTIL_UNLIKELY(-1 == sem_wait(&sem_))) { - ABORT_IF(errno != EINTR, "Wait for semaphore failed"); + ~Semaphore() { + if (-1 == sem_destroy(&sem_)) { + std::cerr << "Could not destroy semaphore" << std::endl; + abort(); + } } - } - void post() { - ABORT_IF(-1 == sem_post(&sem_), "Could not post to semaphore"); - } + void wait() { + while (-1 == sem_wait(&sem_)) { + ABORT_IF(errno != EINTR, "Wait for semaphore failed"); + } + } -private: - sem_t sem_; + void post() { + ABORT_IF(-1 == sem_post(&sem_), "Could not post to semaphore"); + } + + private: + sem_t sem_; }; -inline void WaitSemaphore(Semaphore &semaphore) { semaphore.wait(); } +inline void WaitSemaphore(Semaphore &semaphore) { + semaphore.wait(); +} + +#elif defined(_WIN32) || defined(_WIN64) + +class Semaphore { + public: + explicit Semaphore(LONG value) : sem_(CreateSemaphoreA(NULL, value, 2147483647, NULL)) { + ABORT_IF(!sem_, "Could not CreateSemaphore {}", GetLastError()); + } + + ~Semaphore() { + CloseHandle(sem_); + } + + + void wait() { + while (true) { + switch (WaitForSingleObject(sem_, 0L)) { + case WAIT_OBJECT_0: + return; + case WAIT_ABANDONED: + ABORT("A semaphore can't be abandoned, confused by Windows"); + case WAIT_TIMEOUT: + continue; + case WAIT_FAILED: + ABORT("Waiting on Semaphore failed {}", GetLastError()); + } + } + } + + void post() { + ABORT_IF(!ReleaseSemaphore(sem_, 1, NULL), "Failed to release Semaphore {}", GetLastError()); + } + + private: + HANDLE sem_; +}; + +inline void WaitSemaphore(Semaphore &semaphore) { + semaphore.wait(); +} #else typedef boost::interprocess::interprocess_semaphore Semaphore; @@ -113,7 +155,7 @@ inline void WaitSemaphore(Semaphore &on) { } } -#endif // Apple +#endif // Cases for semaphore support /** * Producer consumer queue safe for multiple producers and multiple consumers. @@ -124,11 +166,13 @@ inline void WaitSemaphore(Semaphore &on) { * throw. */ template class PCQueue { -public: + public: explicit PCQueue(size_t size) - : empty_(size), used_(0), storage_(new T[size]), - end_(storage_.get() + size), produce_at_(storage_.get()), - consume_at_(storage_.get()) {} + : empty_(size), used_(0), + storage_(new T[size]), + end_(storage_.get() + size), + produce_at_(storage_.get()), + consume_at_(storage_.get()) {} // Add a value to the queue. void Produce(const T &val) { @@ -141,8 +185,7 @@ public: empty_.post(); throw; } - if (++produce_at_ == end_) - produce_at_ = storage_.get(); + if (++produce_at_ == end_) produce_at_ = storage_.get(); } used_.post(); } @@ -158,14 +201,14 @@ public: empty_.post(); throw; } - if (++produce_at_ == end_) - produce_at_ = storage_.get(); + if (++produce_at_ == end_) produce_at_ = storage_.get(); } used_.post(); } + // Consume a value, assigning it to out. - T &Consume(T &out) { + T& Consume(T &out) { WaitSemaphore(used_); { std::lock_guard consume_lock(consume_at_mutex_); @@ -175,15 +218,14 @@ public: used_.post(); throw; } - if (++consume_at_ == end_) - consume_at_ = storage_.get(); + if (++consume_at_ == end_) consume_at_ = storage_.get(); } empty_.post(); return out; } // Consume a value, swapping it to out. - T &ConsumeSwap(T &out) { + T& ConsumeSwap(T &out) { WaitSemaphore(used_); { std::lock_guard consume_lock(consume_at_mutex_); @@ -193,13 +235,13 @@ public: used_.post(); throw; } - if (++consume_at_ == end_) - consume_at_ = storage_.get(); + if (++consume_at_ == end_) consume_at_ = storage_.get(); } empty_.post(); return out; } + // Convenience version of Consume that copies the value to return. // The other version is faster. T Consume() { @@ -208,7 +250,7 @@ public: return ret; } -private: + private: // Number of empty spaces in storage_. Semaphore empty_; // Number of occupied spaces in storage_. @@ -234,63 +276,67 @@ template struct UnboundedPage { }; template class UnboundedSingleQueue { -public: - UnboundedSingleQueue() : valid_(0) { - SetFilling(new UnboundedPage()); - SetReading(filling_); - } - - void Produce(T &&val) { - if (filling_current_ == filling_end_) { - UnboundedPage *next = new UnboundedPage(); - filling_->next = next; - SetFilling(next); + public: + UnboundedSingleQueue() : valid_(0) { + SetFilling(new UnboundedPage()); + SetReading(filling_); } - *(filling_current_++) = std::move(val); - valid_.post(); - } - void Produce(const T &val) { Produce(T(val)); } - - T &Consume(T &out) { - WaitSemaphore(valid_); - if (reading_current_ == reading_end_) { - SetReading(reading_->next); + void Produce(T &&val) { + if (filling_current_ == filling_end_) { + UnboundedPage *next = new UnboundedPage(); + filling_->next = next; + SetFilling(next); + } + *(filling_current_++) = std::move(val); + valid_.post(); } - out = std::move(*(reading_current_++)); - return out; - } - // Warning: very much a no-guarantees race-condition-rich implementation! - // But sufficient for our specific purpose: The single thread that consumes - // is also the only one that checks Empty, and knows that it's racing. - bool Empty() const { return reading_current_ == filling_current_; } + void Produce(const T &val) { + Produce(T(val)); + } -private: - void SetFilling(UnboundedPage *to) { - filling_ = to; - filling_current_ = to->entries; - filling_end_ = filling_current_ + sizeof(to->entries) / sizeof(T); - } - void SetReading(UnboundedPage *to) { - reading_.reset(to); - reading_current_ = to->entries; - reading_end_ = reading_current_ + sizeof(to->entries) / sizeof(T); - } + T& Consume(T &out) { + WaitSemaphore(valid_); + if (reading_current_ == reading_end_) { + SetReading(reading_->next); + } + out = std::move(*(reading_current_++)); + return out; + } - Semaphore valid_; + // Warning: very much a no-guarantees race-condition-rich implementation! + // But sufficient for our specific purpose: The single thread that consumes + // is also the only one that checks Empty, and knows that it's racing. + bool Empty() const { + return reading_current_ == filling_current_; + } - UnboundedPage *filling_; + private: + void SetFilling(UnboundedPage *to) { + filling_ = to; + filling_current_ = to->entries; + filling_end_ = filling_current_ + sizeof(to->entries) / sizeof(T); + } + void SetReading(UnboundedPage *to) { + reading_.reset(to); + reading_current_ = to->entries; + reading_end_ = reading_current_ + sizeof(to->entries) / sizeof(T); + } - std::unique_ptr> reading_; + Semaphore valid_; - T *filling_current_; - T *filling_end_; - T *reading_current_; - T *reading_end_; + UnboundedPage *filling_; - UnboundedSingleQueue(const UnboundedSingleQueue &) = delete; - UnboundedSingleQueue &operator=(const UnboundedSingleQueue &) = delete; + std::unique_ptr > reading_; + + T *filling_current_; + T *filling_end_; + T *reading_current_; + T *reading_end_; + + UnboundedSingleQueue(const UnboundedSingleQueue &) = delete; + UnboundedSingleQueue &operator=(const UnboundedSingleQueue &) = delete; }; } // namespace bergamot