mirror of
https://github.com/browsermt/bergamot-translator.git
synced 2024-08-15 16:40:26 +03:00
Windows PCQueue support without Boost (#106)
This commit is contained in:
parent
c00c263f8f
commit
1184875cc9
@ -10,12 +10,14 @@
|
||||
#include <mutex>
|
||||
|
||||
#ifdef __APPLE__
|
||||
#include <mach/mach.h>
|
||||
#include <mach/mach_traps.h>
|
||||
#include <mach/semaphore.h>
|
||||
#include <mach/task.h>
|
||||
#include <mach/mach_traps.h>
|
||||
#include <mach/mach.h>
|
||||
#elif defined(__linux)
|
||||
#include <semaphore.h>
|
||||
#elif defined(_WIN32) || defined(_WIN64)
|
||||
#include <windows.h>
|
||||
#else
|
||||
#include <boost/interprocess/sync/interprocess_semaphore.hpp>
|
||||
#endif
|
||||
@ -35,67 +37,107 @@ namespace bergamot {
|
||||
#ifdef __APPLE__
|
||||
|
||||
class Semaphore {
|
||||
public:
|
||||
explicit Semaphore(int value) : task_(mach_task_self()) {
|
||||
ABORT_IF(KERN_SUCCESS !=
|
||||
semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value),
|
||||
"Could not create semaphore");
|
||||
}
|
||||
|
||||
~Semaphore() {
|
||||
if (KERN_SUCCESS != semaphore_destroy(task_, back_)) {
|
||||
std::cerr << "Could not destroy semaphore" << std::endl;
|
||||
abort();
|
||||
public:
|
||||
explicit Semaphore(int value) : task_(mach_task_self()) {
|
||||
ABORT_IF(KERN_SUCCESS != semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value), "Could not create semaphore");
|
||||
}
|
||||
}
|
||||
|
||||
void wait() {
|
||||
ABORT_IF(KERN_SUCCESS != semaphore_wait(back_),
|
||||
"Wait for semaphore failed");
|
||||
}
|
||||
~Semaphore() {
|
||||
if (KERN_SUCCESS != semaphore_destroy(task_, back_)) {
|
||||
std::cerr << "Could not destroy semaphore" << std::endl;
|
||||
abort();
|
||||
}
|
||||
}
|
||||
|
||||
void post() {
|
||||
ABORT_IF(KERN_SUCCESS != semaphore_signal(back_),
|
||||
"Could not post to semaphore");
|
||||
}
|
||||
void wait() {
|
||||
ABORT_IF(KERN_SUCCESS != semaphore_wait(back_), "Wait for semaphore failed");
|
||||
}
|
||||
|
||||
private:
|
||||
semaphore_t back_;
|
||||
task_t task_;
|
||||
void post() {
|
||||
ABORT_IF(KERN_SUCCESS != semaphore_signal(back_), "Could not post to semaphore");
|
||||
}
|
||||
|
||||
private:
|
||||
semaphore_t back_;
|
||||
task_t task_;
|
||||
};
|
||||
|
||||
inline void WaitSemaphore(Semaphore &semaphore) { semaphore.wait(); }
|
||||
inline void WaitSemaphore(Semaphore &semaphore) {
|
||||
semaphore.wait();
|
||||
}
|
||||
|
||||
#elif defined(__linux)
|
||||
|
||||
class Semaphore {
|
||||
public:
|
||||
explicit Semaphore(unsigned int value) {
|
||||
ABORT_IF(sem_init(&sem_, 0, value), "Could not create semaphore");
|
||||
}
|
||||
|
||||
~Semaphore() {
|
||||
if (-1 == sem_destroy(&sem_)) {
|
||||
std::cerr << "Could not destroy semaphore " << std::endl;
|
||||
abort();
|
||||
public:
|
||||
explicit Semaphore(unsigned int value) {
|
||||
ABORT_IF(sem_init(&sem_, 0, value), "Could not create semaphore");
|
||||
}
|
||||
}
|
||||
|
||||
void wait() {
|
||||
while (UTIL_UNLIKELY(-1 == sem_wait(&sem_))) {
|
||||
ABORT_IF(errno != EINTR, "Wait for semaphore failed");
|
||||
~Semaphore() {
|
||||
if (-1 == sem_destroy(&sem_)) {
|
||||
std::cerr << "Could not destroy semaphore" << std::endl;
|
||||
abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void post() {
|
||||
ABORT_IF(-1 == sem_post(&sem_), "Could not post to semaphore");
|
||||
}
|
||||
void wait() {
|
||||
while (-1 == sem_wait(&sem_)) {
|
||||
ABORT_IF(errno != EINTR, "Wait for semaphore failed");
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
sem_t sem_;
|
||||
void post() {
|
||||
ABORT_IF(-1 == sem_post(&sem_), "Could not post to semaphore");
|
||||
}
|
||||
|
||||
private:
|
||||
sem_t sem_;
|
||||
};
|
||||
|
||||
inline void WaitSemaphore(Semaphore &semaphore) { semaphore.wait(); }
|
||||
inline void WaitSemaphore(Semaphore &semaphore) {
|
||||
semaphore.wait();
|
||||
}
|
||||
|
||||
#elif defined(_WIN32) || defined(_WIN64)
|
||||
|
||||
class Semaphore {
|
||||
public:
|
||||
explicit Semaphore(LONG value) : sem_(CreateSemaphoreA(NULL, value, 2147483647, NULL)) {
|
||||
ABORT_IF(!sem_, "Could not CreateSemaphore {}", GetLastError());
|
||||
}
|
||||
|
||||
~Semaphore() {
|
||||
CloseHandle(sem_);
|
||||
}
|
||||
|
||||
|
||||
void wait() {
|
||||
while (true) {
|
||||
switch (WaitForSingleObject(sem_, 0L)) {
|
||||
case WAIT_OBJECT_0:
|
||||
return;
|
||||
case WAIT_ABANDONED:
|
||||
ABORT("A semaphore can't be abandoned, confused by Windows");
|
||||
case WAIT_TIMEOUT:
|
||||
continue;
|
||||
case WAIT_FAILED:
|
||||
ABORT("Waiting on Semaphore failed {}", GetLastError());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void post() {
|
||||
ABORT_IF(!ReleaseSemaphore(sem_, 1, NULL), "Failed to release Semaphore {}", GetLastError());
|
||||
}
|
||||
|
||||
private:
|
||||
HANDLE sem_;
|
||||
};
|
||||
|
||||
inline void WaitSemaphore(Semaphore &semaphore) {
|
||||
semaphore.wait();
|
||||
}
|
||||
|
||||
#else
|
||||
typedef boost::interprocess::interprocess_semaphore Semaphore;
|
||||
@ -113,7 +155,7 @@ inline void WaitSemaphore(Semaphore &on) {
|
||||
}
|
||||
}
|
||||
|
||||
#endif // Apple
|
||||
#endif // Cases for semaphore support
|
||||
|
||||
/**
|
||||
* Producer consumer queue safe for multiple producers and multiple consumers.
|
||||
@ -124,11 +166,13 @@ inline void WaitSemaphore(Semaphore &on) {
|
||||
* throw.
|
||||
*/
|
||||
template <class T> class PCQueue {
|
||||
public:
|
||||
public:
|
||||
explicit PCQueue(size_t size)
|
||||
: empty_(size), used_(0), storage_(new T[size]),
|
||||
end_(storage_.get() + size), produce_at_(storage_.get()),
|
||||
consume_at_(storage_.get()) {}
|
||||
: empty_(size), used_(0),
|
||||
storage_(new T[size]),
|
||||
end_(storage_.get() + size),
|
||||
produce_at_(storage_.get()),
|
||||
consume_at_(storage_.get()) {}
|
||||
|
||||
// Add a value to the queue.
|
||||
void Produce(const T &val) {
|
||||
@ -141,8 +185,7 @@ public:
|
||||
empty_.post();
|
||||
throw;
|
||||
}
|
||||
if (++produce_at_ == end_)
|
||||
produce_at_ = storage_.get();
|
||||
if (++produce_at_ == end_) produce_at_ = storage_.get();
|
||||
}
|
||||
used_.post();
|
||||
}
|
||||
@ -158,14 +201,14 @@ public:
|
||||
empty_.post();
|
||||
throw;
|
||||
}
|
||||
if (++produce_at_ == end_)
|
||||
produce_at_ = storage_.get();
|
||||
if (++produce_at_ == end_) produce_at_ = storage_.get();
|
||||
}
|
||||
used_.post();
|
||||
}
|
||||
|
||||
|
||||
// Consume a value, assigning it to out.
|
||||
T &Consume(T &out) {
|
||||
T& Consume(T &out) {
|
||||
WaitSemaphore(used_);
|
||||
{
|
||||
std::lock_guard<std::mutex> consume_lock(consume_at_mutex_);
|
||||
@ -175,15 +218,14 @@ public:
|
||||
used_.post();
|
||||
throw;
|
||||
}
|
||||
if (++consume_at_ == end_)
|
||||
consume_at_ = storage_.get();
|
||||
if (++consume_at_ == end_) consume_at_ = storage_.get();
|
||||
}
|
||||
empty_.post();
|
||||
return out;
|
||||
}
|
||||
|
||||
// Consume a value, swapping it to out.
|
||||
T &ConsumeSwap(T &out) {
|
||||
T& ConsumeSwap(T &out) {
|
||||
WaitSemaphore(used_);
|
||||
{
|
||||
std::lock_guard<std::mutex> consume_lock(consume_at_mutex_);
|
||||
@ -193,13 +235,13 @@ public:
|
||||
used_.post();
|
||||
throw;
|
||||
}
|
||||
if (++consume_at_ == end_)
|
||||
consume_at_ = storage_.get();
|
||||
if (++consume_at_ == end_) consume_at_ = storage_.get();
|
||||
}
|
||||
empty_.post();
|
||||
return out;
|
||||
}
|
||||
|
||||
|
||||
// Convenience version of Consume that copies the value to return.
|
||||
// The other version is faster.
|
||||
T Consume() {
|
||||
@ -208,7 +250,7 @@ public:
|
||||
return ret;
|
||||
}
|
||||
|
||||
private:
|
||||
private:
|
||||
// Number of empty spaces in storage_.
|
||||
Semaphore empty_;
|
||||
// Number of occupied spaces in storage_.
|
||||
@ -234,63 +276,67 @@ template <class T> struct UnboundedPage {
|
||||
};
|
||||
|
||||
template <class T> class UnboundedSingleQueue {
|
||||
public:
|
||||
UnboundedSingleQueue() : valid_(0) {
|
||||
SetFilling(new UnboundedPage<T>());
|
||||
SetReading(filling_);
|
||||
}
|
||||
|
||||
void Produce(T &&val) {
|
||||
if (filling_current_ == filling_end_) {
|
||||
UnboundedPage<T> *next = new UnboundedPage<T>();
|
||||
filling_->next = next;
|
||||
SetFilling(next);
|
||||
public:
|
||||
UnboundedSingleQueue() : valid_(0) {
|
||||
SetFilling(new UnboundedPage<T>());
|
||||
SetReading(filling_);
|
||||
}
|
||||
*(filling_current_++) = std::move(val);
|
||||
valid_.post();
|
||||
}
|
||||
|
||||
void Produce(const T &val) { Produce(T(val)); }
|
||||
|
||||
T &Consume(T &out) {
|
||||
WaitSemaphore(valid_);
|
||||
if (reading_current_ == reading_end_) {
|
||||
SetReading(reading_->next);
|
||||
void Produce(T &&val) {
|
||||
if (filling_current_ == filling_end_) {
|
||||
UnboundedPage<T> *next = new UnboundedPage<T>();
|
||||
filling_->next = next;
|
||||
SetFilling(next);
|
||||
}
|
||||
*(filling_current_++) = std::move(val);
|
||||
valid_.post();
|
||||
}
|
||||
out = std::move(*(reading_current_++));
|
||||
return out;
|
||||
}
|
||||
|
||||
// Warning: very much a no-guarantees race-condition-rich implementation!
|
||||
// But sufficient for our specific purpose: The single thread that consumes
|
||||
// is also the only one that checks Empty, and knows that it's racing.
|
||||
bool Empty() const { return reading_current_ == filling_current_; }
|
||||
void Produce(const T &val) {
|
||||
Produce(T(val));
|
||||
}
|
||||
|
||||
private:
|
||||
void SetFilling(UnboundedPage<T> *to) {
|
||||
filling_ = to;
|
||||
filling_current_ = to->entries;
|
||||
filling_end_ = filling_current_ + sizeof(to->entries) / sizeof(T);
|
||||
}
|
||||
void SetReading(UnboundedPage<T> *to) {
|
||||
reading_.reset(to);
|
||||
reading_current_ = to->entries;
|
||||
reading_end_ = reading_current_ + sizeof(to->entries) / sizeof(T);
|
||||
}
|
||||
T& Consume(T &out) {
|
||||
WaitSemaphore(valid_);
|
||||
if (reading_current_ == reading_end_) {
|
||||
SetReading(reading_->next);
|
||||
}
|
||||
out = std::move(*(reading_current_++));
|
||||
return out;
|
||||
}
|
||||
|
||||
Semaphore valid_;
|
||||
// Warning: very much a no-guarantees race-condition-rich implementation!
|
||||
// But sufficient for our specific purpose: The single thread that consumes
|
||||
// is also the only one that checks Empty, and knows that it's racing.
|
||||
bool Empty() const {
|
||||
return reading_current_ == filling_current_;
|
||||
}
|
||||
|
||||
UnboundedPage<T> *filling_;
|
||||
private:
|
||||
void SetFilling(UnboundedPage<T> *to) {
|
||||
filling_ = to;
|
||||
filling_current_ = to->entries;
|
||||
filling_end_ = filling_current_ + sizeof(to->entries) / sizeof(T);
|
||||
}
|
||||
void SetReading(UnboundedPage<T> *to) {
|
||||
reading_.reset(to);
|
||||
reading_current_ = to->entries;
|
||||
reading_end_ = reading_current_ + sizeof(to->entries) / sizeof(T);
|
||||
}
|
||||
|
||||
std::unique_ptr<UnboundedPage<T>> reading_;
|
||||
Semaphore valid_;
|
||||
|
||||
T *filling_current_;
|
||||
T *filling_end_;
|
||||
T *reading_current_;
|
||||
T *reading_end_;
|
||||
UnboundedPage<T> *filling_;
|
||||
|
||||
UnboundedSingleQueue(const UnboundedSingleQueue &) = delete;
|
||||
UnboundedSingleQueue &operator=(const UnboundedSingleQueue &) = delete;
|
||||
std::unique_ptr<UnboundedPage<T> > reading_;
|
||||
|
||||
T *filling_current_;
|
||||
T *filling_end_;
|
||||
T *reading_current_;
|
||||
T *reading_end_;
|
||||
|
||||
UnboundedSingleQueue(const UnboundedSingleQueue &) = delete;
|
||||
UnboundedSingleQueue &operator=(const UnboundedSingleQueue &) = delete;
|
||||
};
|
||||
|
||||
} // namespace bergamot
|
||||
|
Loading…
Reference in New Issue
Block a user