mirror of
https://github.com/browsermt/bergamot-translator.git
synced 2024-08-15 16:40:26 +03:00
Rewriting batching for threadsafety (#155)
This does make the batcher a critical section across job submission and cleaving though. If that becomes a problem, we should go back to incoming and outgoing queues with a batcher thread. Also removes blocking mode from native compiles. Note that translateMultiple no longer guarantees great batching. Guess we could lease the mutex from ThreadsafeBatcher and create a session. There is the risk that one sentence comes in at a time and each thread grabs one sentence at a time instead of better batching. Not sure what to do about that other than some sort of Nagle algorithm. Due to non-deterministic batching, even with one thread, the regression tests will go haywire.
This commit is contained in:
parent
269edc7ce5
commit
b25f223fe4
@ -13,6 +13,7 @@ add_library(bergamot-translator STATIC
|
||||
batch.cpp
|
||||
annotation.cpp
|
||||
service.cpp
|
||||
threadsafe_batcher.cpp
|
||||
)
|
||||
if (USE_WASM_COMPATIBLE_SOURCE)
|
||||
# Using wasm compatible sources should include this compile definition;
|
||||
|
@ -7,20 +7,12 @@
|
||||
namespace marian {
|
||||
namespace bergamot {
|
||||
|
||||
// An empty batch is poison.
|
||||
class Batch {
|
||||
public:
|
||||
Batch() {}
|
||||
void clear() { sentences_.clear(); }
|
||||
|
||||
// Methods to construct and determine poison.
|
||||
static Batch poison() {
|
||||
Batch batch;
|
||||
batch.poison_ = true;
|
||||
return batch;
|
||||
}
|
||||
|
||||
bool isPoison() const { return poison_; }
|
||||
|
||||
size_t size() const { return sentences_.size(); }
|
||||
|
||||
void add(const RequestSentence &sentence);
|
||||
@ -42,7 +34,6 @@ public:
|
||||
void log();
|
||||
|
||||
private:
|
||||
bool poison_{false};
|
||||
RequestSentences sentences_;
|
||||
};
|
||||
|
||||
|
@ -13,10 +13,6 @@
|
||||
#include "translator/scorers.h"
|
||||
#include "vocabs.h"
|
||||
|
||||
#ifndef WASM_COMPATIBLE_SOURCE
|
||||
#include "pcqueue.h"
|
||||
#endif
|
||||
|
||||
namespace marian {
|
||||
namespace bergamot {
|
||||
|
||||
|
@ -20,8 +20,6 @@ void Batcher::addSentenceWithPriority(RequestSentence &sentence) {
|
||||
bucket_[bucket_id].insert(sentence);
|
||||
}
|
||||
|
||||
bool Batcher::operator>>(Batch &batch) { return cleaveBatch(batch); }
|
||||
|
||||
bool Batcher::cleaveBatch(Batch &batch) {
|
||||
// For now simply iterates on buckets and converts batches greedily. This
|
||||
// has to be enhanced with optimizing over priority. The baseline
|
||||
|
@ -7,10 +7,6 @@
|
||||
#include "definitions.h"
|
||||
#include "request.h"
|
||||
|
||||
#ifndef WASM_COMPATIBLE_SOURCE
|
||||
#include "pcqueue.h"
|
||||
#endif
|
||||
|
||||
#include <set>
|
||||
#include <vector>
|
||||
|
||||
@ -26,7 +22,10 @@ public:
|
||||
void addSentenceWithPriority(RequestSentence &sentence);
|
||||
void addWholeRequest(Ptr<Request> request);
|
||||
|
||||
bool operator>>(Batch &batch); // alias for cleaveBatch
|
||||
// indicate no more sentences will be added. Does nothing here, for parity to threadsafe version.
|
||||
void shutdown() {}
|
||||
|
||||
bool operator>>(Batch &batch) { return cleaveBatch(batch); }
|
||||
|
||||
private:
|
||||
// Loads sentences with sentences compiled from (tentatively) multiple
|
||||
|
@ -1,343 +0,0 @@
|
||||
#ifndef SRC_BERGAMOT_PCQUEUE_H_
|
||||
#define SRC_BERGAMOT_PCQUEUE_H_
|
||||
|
||||
#include "common/logging.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <cerrno>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
|
||||
#ifdef __APPLE__
|
||||
#include <mach/semaphore.h>
|
||||
#include <mach/task.h>
|
||||
#include <mach/mach_traps.h>
|
||||
#include <mach/mach.h>
|
||||
#elif defined(__linux)
|
||||
#include <semaphore.h>
|
||||
#elif defined(_WIN32) || defined(_WIN64)
|
||||
#include <windows.h>
|
||||
#else
|
||||
#include <boost/interprocess/sync/interprocess_semaphore.hpp>
|
||||
#endif
|
||||
|
||||
#if __GNUC__ >= 3
|
||||
#define UTIL_UNLIKELY(x) __builtin_expect(!!(x), 0)
|
||||
#else
|
||||
#define UTIL_UNLIKELY(x) (x)
|
||||
#endif
|
||||
|
||||
namespace marian {
|
||||
namespace bergamot {
|
||||
|
||||
/* OS X Maverick and Boost interprocess were doing "Function not implemented."
|
||||
* So this is my own wrapper around the mach kernel APIs.
|
||||
*/
|
||||
#ifdef __APPLE__
|
||||
|
||||
class Semaphore {
|
||||
public:
|
||||
explicit Semaphore(int value) : task_(mach_task_self()) {
|
||||
ABORT_IF(KERN_SUCCESS != semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value), "Could not create semaphore");
|
||||
}
|
||||
|
||||
~Semaphore() {
|
||||
if (KERN_SUCCESS != semaphore_destroy(task_, back_)) {
|
||||
std::cerr << "Could not destroy semaphore" << std::endl;
|
||||
abort();
|
||||
}
|
||||
}
|
||||
|
||||
void wait() {
|
||||
ABORT_IF(KERN_SUCCESS != semaphore_wait(back_), "Wait for semaphore failed");
|
||||
}
|
||||
|
||||
void post() {
|
||||
ABORT_IF(KERN_SUCCESS != semaphore_signal(back_), "Could not post to semaphore");
|
||||
}
|
||||
|
||||
private:
|
||||
semaphore_t back_;
|
||||
task_t task_;
|
||||
};
|
||||
|
||||
inline void WaitSemaphore(Semaphore &semaphore) {
|
||||
semaphore.wait();
|
||||
}
|
||||
|
||||
#elif defined(__linux)
|
||||
|
||||
class Semaphore {
|
||||
public:
|
||||
explicit Semaphore(unsigned int value) {
|
||||
ABORT_IF(sem_init(&sem_, 0, value), "Could not create semaphore");
|
||||
}
|
||||
|
||||
~Semaphore() {
|
||||
if (-1 == sem_destroy(&sem_)) {
|
||||
std::cerr << "Could not destroy semaphore" << std::endl;
|
||||
abort();
|
||||
}
|
||||
}
|
||||
|
||||
void wait() {
|
||||
while (-1 == sem_wait(&sem_)) {
|
||||
ABORT_IF(errno != EINTR, "Wait for semaphore failed");
|
||||
}
|
||||
}
|
||||
|
||||
void post() {
|
||||
ABORT_IF(-1 == sem_post(&sem_), "Could not post to semaphore");
|
||||
}
|
||||
|
||||
private:
|
||||
sem_t sem_;
|
||||
};
|
||||
|
||||
inline void WaitSemaphore(Semaphore &semaphore) {
|
||||
semaphore.wait();
|
||||
}
|
||||
|
||||
#elif defined(_WIN32) || defined(_WIN64)
|
||||
|
||||
class Semaphore {
|
||||
public:
|
||||
explicit Semaphore(LONG value) : sem_(CreateSemaphoreA(NULL, value, 2147483647, NULL)) {
|
||||
ABORT_IF(!sem_, "Could not CreateSemaphore {}", GetLastError());
|
||||
}
|
||||
|
||||
~Semaphore() {
|
||||
CloseHandle(sem_);
|
||||
}
|
||||
|
||||
|
||||
void wait() {
|
||||
switch (WaitForSingleObject(sem_, INFINITE)) {
|
||||
case WAIT_OBJECT_0:
|
||||
return;
|
||||
case WAIT_ABANDONED:
|
||||
ABORT("A semaphore can't be abandoned, confused by Windows");
|
||||
case WAIT_TIMEOUT:
|
||||
ABORT("Timeout on an infinite wait?");
|
||||
case WAIT_FAILED:
|
||||
ABORT("Waiting on Semaphore failed {}", GetLastError());
|
||||
}
|
||||
}
|
||||
|
||||
void post() {
|
||||
ABORT_IF(!ReleaseSemaphore(sem_, 1, NULL), "Failed to release Semaphore {}", GetLastError());
|
||||
}
|
||||
|
||||
private:
|
||||
HANDLE sem_;
|
||||
};
|
||||
|
||||
inline void WaitSemaphore(Semaphore &semaphore) {
|
||||
semaphore.wait();
|
||||
}
|
||||
|
||||
#else
|
||||
typedef boost::interprocess::interprocess_semaphore Semaphore;
|
||||
|
||||
inline void WaitSemaphore(Semaphore &on) {
|
||||
while (1) {
|
||||
try {
|
||||
on.wait();
|
||||
break;
|
||||
} catch (boost::interprocess::interprocess_exception &e) {
|
||||
if (e.get_native_error() != EINTR) {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif // Cases for semaphore support
|
||||
|
||||
/**
|
||||
* Producer consumer queue safe for multiple producers and multiple consumers.
|
||||
* T must be default constructable and have operator=.
|
||||
* The value is copied twice for Consume(T &out) or three times for Consume(),
|
||||
* so larger objects should be passed via pointer.
|
||||
* Strong exception guarantee if operator= throws. Undefined if semaphores
|
||||
* throw.
|
||||
*/
|
||||
template <class T> class PCQueue {
|
||||
public:
|
||||
explicit PCQueue(size_t size)
|
||||
: empty_(size), used_(0),
|
||||
storage_(new T[size]),
|
||||
end_(storage_.get() + size),
|
||||
produce_at_(storage_.get()),
|
||||
consume_at_(storage_.get()) {}
|
||||
|
||||
// Add a value to the queue.
|
||||
void Produce(const T &val) {
|
||||
WaitSemaphore(empty_);
|
||||
{
|
||||
std::lock_guard<std::mutex> produce_lock(produce_at_mutex_);
|
||||
try {
|
||||
*produce_at_ = val;
|
||||
} catch (...) {
|
||||
empty_.post();
|
||||
throw;
|
||||
}
|
||||
if (++produce_at_ == end_) produce_at_ = storage_.get();
|
||||
}
|
||||
used_.post();
|
||||
}
|
||||
|
||||
// Add a value to the queue, but swap it into place.
|
||||
void ProduceSwap(T &val) {
|
||||
WaitSemaphore(empty_);
|
||||
{
|
||||
std::lock_guard<std::mutex> produce_lock(produce_at_mutex_);
|
||||
try {
|
||||
std::swap(*produce_at_, val);
|
||||
} catch (...) {
|
||||
empty_.post();
|
||||
throw;
|
||||
}
|
||||
if (++produce_at_ == end_) produce_at_ = storage_.get();
|
||||
}
|
||||
used_.post();
|
||||
}
|
||||
|
||||
|
||||
// Consume a value, assigning it to out.
|
||||
T& Consume(T &out) {
|
||||
WaitSemaphore(used_);
|
||||
{
|
||||
std::lock_guard<std::mutex> consume_lock(consume_at_mutex_);
|
||||
try {
|
||||
out = *consume_at_;
|
||||
} catch (...) {
|
||||
used_.post();
|
||||
throw;
|
||||
}
|
||||
if (++consume_at_ == end_) consume_at_ = storage_.get();
|
||||
}
|
||||
empty_.post();
|
||||
return out;
|
||||
}
|
||||
|
||||
// Consume a value, swapping it to out.
|
||||
T& ConsumeSwap(T &out) {
|
||||
WaitSemaphore(used_);
|
||||
{
|
||||
std::lock_guard<std::mutex> consume_lock(consume_at_mutex_);
|
||||
try {
|
||||
std::swap(out, *consume_at_);
|
||||
} catch (...) {
|
||||
used_.post();
|
||||
throw;
|
||||
}
|
||||
if (++consume_at_ == end_) consume_at_ = storage_.get();
|
||||
}
|
||||
empty_.post();
|
||||
return out;
|
||||
}
|
||||
|
||||
|
||||
// Convenience version of Consume that copies the value to return.
|
||||
// The other version is faster.
|
||||
T Consume() {
|
||||
T ret;
|
||||
Consume(ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
private:
|
||||
// Number of empty spaces in storage_.
|
||||
Semaphore empty_;
|
||||
// Number of occupied spaces in storage_.
|
||||
Semaphore used_;
|
||||
|
||||
std::unique_ptr<T[]> storage_;
|
||||
|
||||
T *const end_;
|
||||
|
||||
// Index for next write in storage_.
|
||||
T *produce_at_;
|
||||
std::mutex produce_at_mutex_;
|
||||
|
||||
// Index for next read from storage_.
|
||||
T *consume_at_;
|
||||
std::mutex consume_at_mutex_;
|
||||
};
|
||||
|
||||
template <class T> struct UnboundedPage {
|
||||
UnboundedPage() : next(nullptr) {}
|
||||
UnboundedPage *next;
|
||||
T entries[1023];
|
||||
};
|
||||
|
||||
template <class T> class UnboundedSingleQueue {
|
||||
public:
|
||||
UnboundedSingleQueue() : valid_(0) {
|
||||
SetFilling(new UnboundedPage<T>());
|
||||
SetReading(filling_);
|
||||
}
|
||||
|
||||
void Produce(T &&val) {
|
||||
if (filling_current_ == filling_end_) {
|
||||
UnboundedPage<T> *next = new UnboundedPage<T>();
|
||||
filling_->next = next;
|
||||
SetFilling(next);
|
||||
}
|
||||
*(filling_current_++) = std::move(val);
|
||||
valid_.post();
|
||||
}
|
||||
|
||||
void Produce(const T &val) {
|
||||
Produce(T(val));
|
||||
}
|
||||
|
||||
T& Consume(T &out) {
|
||||
WaitSemaphore(valid_);
|
||||
if (reading_current_ == reading_end_) {
|
||||
SetReading(reading_->next);
|
||||
}
|
||||
out = std::move(*(reading_current_++));
|
||||
return out;
|
||||
}
|
||||
|
||||
// Warning: very much a no-guarantees race-condition-rich implementation!
|
||||
// But sufficient for our specific purpose: The single thread that consumes
|
||||
// is also the only one that checks Empty, and knows that it's racing.
|
||||
bool Empty() const {
|
||||
return reading_current_ == filling_current_;
|
||||
}
|
||||
|
||||
private:
|
||||
void SetFilling(UnboundedPage<T> *to) {
|
||||
filling_ = to;
|
||||
filling_current_ = to->entries;
|
||||
filling_end_ = filling_current_ + sizeof(to->entries) / sizeof(T);
|
||||
}
|
||||
void SetReading(UnboundedPage<T> *to) {
|
||||
reading_.reset(to);
|
||||
reading_current_ = to->entries;
|
||||
reading_end_ = reading_current_ + sizeof(to->entries) / sizeof(T);
|
||||
}
|
||||
|
||||
Semaphore valid_;
|
||||
|
||||
UnboundedPage<T> *filling_;
|
||||
|
||||
std::unique_ptr<UnboundedPage<T> > reading_;
|
||||
|
||||
T *filling_current_;
|
||||
T *filling_end_;
|
||||
T *reading_current_;
|
||||
T *reading_end_;
|
||||
|
||||
UnboundedSingleQueue(const UnboundedSingleQueue &) = delete;
|
||||
UnboundedSingleQueue &operator=(const UnboundedSingleQueue &) = delete;
|
||||
};
|
||||
|
||||
} // namespace bergamot
|
||||
} // namespace marian
|
||||
|
||||
#endif // SRC_BERGAMOT_PCQUEUE_H_
|
@ -12,91 +12,42 @@ Service::Service(Ptr<Options> options, MemoryBundle memoryBundle)
|
||||
: requestId_(0), options_(options),
|
||||
vocabs_(options, std::move(memoryBundle.vocabs)),
|
||||
text_processor_(vocabs_, options), batcher_(options),
|
||||
numWorkers_(options->get<int>("cpu-threads")),
|
||||
numWorkers_(std::max<int>(1, options->get<int>("cpu-threads"))),
|
||||
modelMemory_(std::move(memoryBundle.model)),
|
||||
shortlistMemory_(std::move(memoryBundle.shortlist))
|
||||
#ifndef WASM_COMPATIBLE_SOURCE
|
||||
// 0 elements in PCQueue is illegal and can lead to failures. Adding a
|
||||
// guard to have at least one entry allocated. In the single-threaded
|
||||
// case, while initialized pcqueue_ remains unused.
|
||||
,
|
||||
pcqueue_(std::max<size_t>(1, numWorkers_))
|
||||
shortlistMemory_(std::move(memoryBundle.shortlist))
|
||||
#ifdef WASM_COMPATIBLE_SOURCE
|
||||
, blocking_translator_(DeviceId(0, DeviceType::cpu), vocabs_, options_, &modelMemory_, &shortlistMemory_)
|
||||
#endif
|
||||
{
|
||||
|
||||
if (numWorkers_ == 0) {
|
||||
build_translators(options, /*numTranslators=*/1);
|
||||
initialize_blocking_translator();
|
||||
} else {
|
||||
build_translators(options, numWorkers_);
|
||||
initialize_async_translators();
|
||||
}
|
||||
}
|
||||
|
||||
void Service::build_translators(Ptr<Options> options, size_t numTranslators) {
|
||||
translators_.reserve(numTranslators);
|
||||
for (size_t cpuId = 0; cpuId < numTranslators; cpuId++) {
|
||||
marian::DeviceId deviceId(cpuId, DeviceType::cpu);
|
||||
translators_.emplace_back(deviceId, vocabs_, options, &modelMemory_, &shortlistMemory_);
|
||||
}
|
||||
}
|
||||
|
||||
void Service::initialize_blocking_translator() {
|
||||
translators_.back().initialize();
|
||||
}
|
||||
|
||||
void Service::blocking_translate() {
|
||||
Batch batch;
|
||||
while (batcher_ >> batch) {
|
||||
auto &translator = translators_.back();
|
||||
translator.translate(batch);
|
||||
}
|
||||
}
|
||||
|
||||
#ifndef WASM_COMPATIBLE_SOURCE
|
||||
void Service::initialize_async_translators() {
|
||||
{
|
||||
#ifdef WASM_COMPATIBLE_SOURCE
|
||||
blocking_translator_.initialize();
|
||||
#else
|
||||
workers_.reserve(numWorkers_);
|
||||
|
||||
for (size_t cpuId = 0; cpuId < numWorkers_; cpuId++) {
|
||||
auto &translator = translators_[cpuId];
|
||||
workers_.emplace_back([&translator, this] {
|
||||
workers_.emplace_back([cpuId, this] {
|
||||
marian::DeviceId deviceId(cpuId, DeviceType::cpu);
|
||||
BatchTranslator translator(deviceId, vocabs_, options_, &modelMemory_, &shortlistMemory_);
|
||||
translator.initialize();
|
||||
|
||||
// Run thread mainloop
|
||||
Batch batch;
|
||||
Histories histories;
|
||||
while (true) {
|
||||
pcqueue_.ConsumeSwap(batch);
|
||||
if (batch.isPoison()) {
|
||||
return;
|
||||
} else {
|
||||
translator.translate(batch);
|
||||
}
|
||||
// Run thread mainloop
|
||||
while (batcher_ >> batch) {
|
||||
translator.translate(batch);
|
||||
}
|
||||
});
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void Service::async_translate() {
|
||||
void Service::blockIfWASM() {
|
||||
#ifdef WASM_COMPATIBLE_SOURCE
|
||||
Batch batch;
|
||||
// There's no need to do shutdown here because it's single threaded.
|
||||
while (batcher_ >> batch) {
|
||||
pcqueue_.ProduceSwap(batch);
|
||||
blocking_translator_.translate(batch);
|
||||
}
|
||||
}
|
||||
#else // WASM_COMPATIBLE_SOURCE
|
||||
void Service::initialize_async_translators() {
|
||||
ABORT("Cannot run in async mode without multithreading.");
|
||||
#endif
|
||||
}
|
||||
|
||||
void Service::async_translate() {
|
||||
ABORT("Cannot run in async mode without multithreading.");
|
||||
}
|
||||
#endif // WASM_COMPATIBLE_SOURCE
|
||||
|
||||
std::future<Response> Service::translate(std::string &&input) {
|
||||
ResponseOptions responseOptions; // Hardcode responseOptions for now
|
||||
return translate(std::move(input), responseOptions);
|
||||
}
|
||||
|
||||
std::vector<Response>
|
||||
Service::translateMultiple(std::vector<std::string> &&inputs,
|
||||
@ -113,7 +64,7 @@ Service::translateMultiple(std::vector<std::string> &&inputs,
|
||||
|
||||
// Dispatch is called once per request so compilation of sentences from
|
||||
// multiple Requests happen.
|
||||
dispatchTranslate();
|
||||
blockIfWASM();
|
||||
|
||||
// Now wait for all Requests to complete, the future to fire and return the
|
||||
// compiled Responses, we can probably return the future, but WASM quirks(?).
|
||||
@ -148,30 +99,16 @@ std::future<Response> Service::translate(std::string &&input,
|
||||
ResponseOptions responseOptions) {
|
||||
std::future<Response> future =
|
||||
queueRequest(std::move(input), responseOptions);
|
||||
dispatchTranslate();
|
||||
blockIfWASM();
|
||||
return future;
|
||||
}
|
||||
|
||||
void Service::dispatchTranslate() {
|
||||
if (numWorkers_ == 0) {
|
||||
blocking_translate();
|
||||
} else {
|
||||
async_translate();
|
||||
}
|
||||
}
|
||||
|
||||
Service::~Service() {
|
||||
batcher_.shutdown();
|
||||
#ifndef WASM_COMPATIBLE_SOURCE
|
||||
for (size_t workerId = 0; workerId < numWorkers_; workerId++) {
|
||||
|
||||
Batch poison = Batch::poison();
|
||||
pcqueue_.ProduceSwap(poison);
|
||||
}
|
||||
|
||||
for (size_t workerId = 0; workerId < numWorkers_; workerId++) {
|
||||
if (workers_[workerId].joinable()) {
|
||||
workers_[workerId].join();
|
||||
}
|
||||
for (std::thread &worker : workers_) {
|
||||
assert(worker.joinable());
|
||||
worker.join();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
@ -2,16 +2,16 @@
|
||||
#define SRC_BERGAMOT_SERVICE_H_
|
||||
|
||||
#include "batch_translator.h"
|
||||
#include "batcher.h"
|
||||
#include "data/types.h"
|
||||
#include "response.h"
|
||||
#include "response_builder.h"
|
||||
#include "text_processor.h"
|
||||
#include "threadsafe_batcher.h"
|
||||
#include "translator/parser.h"
|
||||
#include "vocabs.h"
|
||||
|
||||
#ifndef WASM_COMPATIBLE_SOURCE
|
||||
#include "pcqueue.h"
|
||||
#include <thread>
|
||||
#endif
|
||||
|
||||
#include <queue>
|
||||
@ -83,12 +83,6 @@ public:
|
||||
/// asynchronous operation mode.
|
||||
~Service();
|
||||
|
||||
/// To stay efficient and to refer to the string for alignments, expects
|
||||
/// ownership be moved through `std::move(..)`
|
||||
///
|
||||
/// @param [in] source: rvalue reference of string to be translated.
|
||||
std::future<Response> translate(std::string &&source);
|
||||
|
||||
/// Translate an input, providing Options to construct Response. This is
|
||||
/// useful when one has to set/unset alignments or quality in the Response to
|
||||
/// save compute spent in constructing these objects.
|
||||
@ -98,7 +92,7 @@ public:
|
||||
/// some member in the Response, also specify any additional configurable
|
||||
/// parameters.
|
||||
std::future<Response> translate(std::string &&source,
|
||||
ResponseOptions options);
|
||||
ResponseOptions options = ResponseOptions());
|
||||
|
||||
/// Translate multiple text-blobs in a single *blocking* API call, providing
|
||||
/// ResponseOptions which applies across all text-blobs dictating how to
|
||||
@ -116,7 +110,6 @@ public:
|
||||
/// @param [in] translationRequest: ResponseOptions indicating whether or not
|
||||
/// to include some member in the Response, also specify any additional
|
||||
/// configurable parameters.
|
||||
|
||||
std::vector<Response>
|
||||
translateMultiple(std::vector<std::string> &&source,
|
||||
ResponseOptions responseOptions);
|
||||
@ -134,24 +127,11 @@ private:
|
||||
/// Dispatch call to translate after inserting in queue
|
||||
void dispatchTranslate();
|
||||
|
||||
/// Build numTranslators number of translators with options from options
|
||||
void build_translators(Ptr<Options> options, size_t numTranslators);
|
||||
/// Initializes a blocking translator without using std::thread
|
||||
void initialize_blocking_translator();
|
||||
/// Translates through direct interaction between batcher_ and translators_
|
||||
void blocking_translate();
|
||||
|
||||
/// Launches multiple workers of translators using std::thread
|
||||
/// Reduces to ABORT if called when not compiled WITH_PTHREAD
|
||||
void initialize_async_translators();
|
||||
/// Async translate produces to a producer-consumer queue as batches are
|
||||
/// generated by Batcher. In another thread, the translators consume from
|
||||
/// producer-consumer queue.
|
||||
/// Reduces to ABORT if called when not compiled WITH_PTHREAD
|
||||
void async_translate();
|
||||
void blockIfWASM();
|
||||
|
||||
/// Number of workers to launch.
|
||||
size_t numWorkers_; // ORDER DEPENDENCY (pcqueue_)
|
||||
size_t numWorkers_;
|
||||
|
||||
/// Options object holding the options Service was instantiated with.
|
||||
Ptr<Options> options_;
|
||||
@ -161,12 +141,6 @@ private:
|
||||
/// Shortlist memory passed as bytes.
|
||||
AlignedMemory shortlistMemory_; // ORDER DEPENDENCY (translators_)
|
||||
|
||||
/// Holds instances of batch translators, just one in case
|
||||
/// of single-threaded application, numWorkers_ in case of multithreaded
|
||||
/// setting.
|
||||
std::vector<BatchTranslator>
|
||||
translators_; // ORDER DEPENDENCY (modelMemory_, shortlistMemory_)
|
||||
|
||||
/// Stores requestId of active request. Used to establish
|
||||
/// ordering among requests and logging/book-keeping.
|
||||
|
||||
@ -180,12 +154,13 @@ private:
|
||||
|
||||
/// Batcher handles generation of batches from a request, subject to
|
||||
/// packing-efficiency and priority optimization heuristics.
|
||||
Batcher batcher_;
|
||||
ThreadsafeBatcher batcher_;
|
||||
|
||||
// The following constructs are available providing full capabilities on a non
|
||||
// WASM platform, where one does not have to hide threads.
|
||||
#ifndef WASM_COMPATIBLE_SOURCE
|
||||
PCQueue<Batch> pcqueue_; // ORDER DEPENDENCY (numWorkers_)
|
||||
#ifdef WASM_COMPATIBLE_SOURCE
|
||||
BatchTranslator blocking_translator_; // ORDER DEPENDENCY (modelMemory_, shortlistMemory_)
|
||||
#else
|
||||
std::vector<std::thread> workers_;
|
||||
#endif // WASM_COMPATIBLE_SOURCE
|
||||
};
|
||||
|
49
src/translator/threadsafe_batcher.cpp
Normal file
49
src/translator/threadsafe_batcher.cpp
Normal file
@ -0,0 +1,49 @@
|
||||
#ifndef WASM_COMPATIBLE_SOURCE
|
||||
#include "threadsafe_batcher.h"
|
||||
|
||||
#include <cassert>
|
||||
|
||||
namespace marian {
|
||||
namespace bergamot {
|
||||
|
||||
ThreadsafeBatcher::ThreadsafeBatcher(Ptr<Options> options)
|
||||
: backend_(options), enqueued_(0), shutdown_(false) {}
|
||||
|
||||
ThreadsafeBatcher::~ThreadsafeBatcher() {
|
||||
shutdown();
|
||||
}
|
||||
|
||||
void ThreadsafeBatcher::addSentenceWithPriority(RequestSentence &sentence) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
assert(!shutdown_);
|
||||
backend_.addSentenceWithPriority(sentence);
|
||||
++enqueued_;
|
||||
work_.notify_one();
|
||||
}
|
||||
|
||||
void ThreadsafeBatcher::addWholeRequest(Ptr<Request> request) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
assert(!shutdown_);
|
||||
backend_.addWholeRequest(request);
|
||||
enqueued_ += request->numSegments();
|
||||
work_.notify_all();
|
||||
}
|
||||
|
||||
void ThreadsafeBatcher::shutdown() {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
shutdown_ = true;
|
||||
work_.notify_all();
|
||||
}
|
||||
|
||||
bool ThreadsafeBatcher::operator>>(Batch &batch) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
work_.wait(lock, [this](){ return enqueued_ || shutdown_; });
|
||||
bool ret = backend_ >> batch;
|
||||
assert(ret || shutdown_);
|
||||
enqueued_ -= batch.size();
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace bergamot
|
||||
} // namespace marian
|
||||
#endif // WASM_COMPATIBLE_SOURCE
|
58
src/translator/threadsafe_batcher.h
Normal file
58
src/translator/threadsafe_batcher.h
Normal file
@ -0,0 +1,58 @@
|
||||
/* Thread-safe wrapper around batcher. */
|
||||
#ifndef SRC_BERGAMOT_THREADSAFE_BATCHER_H_
|
||||
#define SRC_BERGAMOT_THREADSAFE_BATCHER_H_
|
||||
|
||||
#include "batcher.h"
|
||||
#include "common/options.h"
|
||||
#include "definitions.h"
|
||||
|
||||
#ifndef WASM_COMPATIBLE_SOURCE
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#endif
|
||||
|
||||
namespace marian {
|
||||
namespace bergamot {
|
||||
|
||||
#ifdef WASM_COMPATIBLE_SOURCE
|
||||
// No threads, no locks.
|
||||
typedef Batcher ThreadsafeBatcher;
|
||||
#else
|
||||
|
||||
class ThreadsafeBatcher {
|
||||
public:
|
||||
explicit ThreadsafeBatcher(Ptr<Options> options);
|
||||
|
||||
~ThreadsafeBatcher();
|
||||
|
||||
// Add sentences to be translated by calling these (see Batcher). When
|
||||
// done, call shutdown.
|
||||
void addSentenceWithPriority(RequestSentence &sentence);
|
||||
void addWholeRequest(Ptr<Request> request);
|
||||
void shutdown();
|
||||
|
||||
// Get a batch out of the batcher. Return false to shutdown worker.
|
||||
bool operator>>(Batch &batch);
|
||||
|
||||
private:
|
||||
Batcher backend_;
|
||||
|
||||
// Number of sentences in backend_;
|
||||
size_t enqueued_;
|
||||
|
||||
// Are we shutting down?
|
||||
bool shutdown_;
|
||||
|
||||
// Lock on this object.
|
||||
std::mutex mutex_;
|
||||
|
||||
// Signaled when there are sentences to translate.
|
||||
std::condition_variable work_;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
} // namespace bergamot
|
||||
} // namespace marian
|
||||
|
||||
#endif // SRC_BERGAMOT_THREADSAFE_BATCHER_H_
|
Loading…
Reference in New Issue
Block a user