ServiceBase -> [NonThreadedService, Service]

Through inheritance, a non-threaded and multithreaded Service are
created, both derived of the same ServiceBase class which holds the
common elements.

In preparation to solve SIGSEGV in #41. First inspections gave aborts in
thread part, and repeated SIGSEGV's in lock-policy's of shared_pointers
even in non-threaded paths.

Solving this first, to avoid ifdef or tricky paths. The non-threaded
implementation is not included in WASM builds at all, by separating out
the single-threaded logic. DRY is achieved through inheritance and
operator overloading.
This commit is contained in:
Jerin Philip 2021-02-25 23:11:09 +00:00
parent dd3dc6f932
commit cd01d7552a
10 changed files with 143 additions and 151 deletions

View File

@ -1,8 +1,10 @@
add_executable(bergamot-translator-app main.cpp)
target_link_libraries(bergamot-translator-app PRIVATE bergamot-translator)
add_executable(service-cli main-mts.cpp)
target_link_libraries(service-cli PRIVATE bergamot-translator)
if (NOT COMPILE_WASM)
add_executable(service-cli main-mts.cpp)
target_link_libraries(service-cli PRIVATE bergamot-translator)
add_executable(marian-decoder-new marian-decoder-new.cpp)
target_link_libraries(marian-decoder-new PRIVATE bergamot-translator)
add_executable(marian-decoder-new marian-decoder-new.cpp)
target_link_libraries(marian-decoder-new PRIVATE bergamot-translator)
endif()

View File

@ -1,3 +1,7 @@
if (NOT COMPILE_WASM)
set(SERVICE "service.cpp")
endif()
add_library(bergamot-translator STATIC
AbstractTranslationModel.cpp
TranslationModel.cpp
@ -8,7 +12,8 @@ add_library(bergamot-translator STATIC
batch_translator.cpp
multifactor_priority.cpp
request.cpp
service.cpp
service_base.cpp
${SERVICE}
batcher.cpp
response.cpp
batch.cpp

View File

@ -15,7 +15,7 @@
// All local project includes
#include "TranslationModel.h"
#include "translator/parser.h"
#include "translator/service.h"
#include "translator/service_base.h"
std::shared_ptr<marian::Options> parseOptions(const std::string &config) {
marian::Options options;

View File

@ -16,7 +16,7 @@
// All local project includes
#include "AbstractTranslationModel.h"
#include "translator/service.h"
#include "translator/service_base.h"
/* A Translation model that translates a plain (without any markups and emojis)
* UTF-8 encoded text. This implementation supports translation from 1 source
@ -55,9 +55,8 @@ public:
* entry of texts list will be moved to its corresponding TranslationResult
* object).
*/
std::vector<TranslationResult>
translate(std::vector<std::string> &&texts,
TranslationRequest request) override;
std::vector<TranslationResult> translate(std::vector<std::string> &&texts,
TranslationRequest request) override;
/* Check if the model can provide alignment information b/w original and
* translated text. */
@ -66,7 +65,7 @@ public:
private:
// Model configuration options
std::shared_ptr<marian::Options> configOptions_; // ORDER DEPENDECNY
marian::bergamot::Service service_; // ORDER DEPENDENCY
marian::bergamot::NonThreadedService service_; // ORDER DEPENDENCY
};
#endif /* SRC_TRANSLATOR_TRANSLATIONMODEL_H_ */

View File

@ -57,14 +57,5 @@ void Batcher::addWholeRequest(Ptr<Request> request) {
}
}
#ifdef WITH_PTHREADS
void Batcher::produceTo(PCQueue<Batch> &pcqueue) {
Batch batch;
while (cleaveBatch(batch)) {
pcqueue.ProduceSwap(batch);
}
}
#endif
} // namespace bergamot
} // namespace marian

View File

@ -25,16 +25,13 @@ public:
// which maintains priority among sentences from multiple concurrent requests.
void addSentenceWithPriority(RequestSentence &sentence);
void addWholeRequest(Ptr<Request> request);
#ifdef WITH_PTHREADS
void produceTo(PCQueue<Batch> &pcqueue);
#endif
bool operator>>(Batch &batch); // alias for cleaveBatch
private:
// Loads sentences with sentences compiled from (tentatively) multiple
// requests optimizing for both padding and priority.
bool cleaveBatch(Batch &batch);
bool operator>>(Batch &batch); // alias
private:
size_t miniBatchWords;
std::vector<std::set<RequestSentence>> bucket_;
size_t batchNumber_{0};

View File

@ -9,105 +9,47 @@ namespace marian {
namespace bergamot {
Service::Service(Ptr<Options> options)
: requestId_(0), numWorkers_(options->get<int>("cpu-threads")),
vocabs_(std::move(loadVocabularies(options))),
text_processor_(vocabs_, options), batcher_(options)
#ifdef WITH_PTHREADS
,
pcqueue_(2 * options->get<int>("cpu-threads"))
#endif // WITH_PTHREADS
{
: ServiceBase(options), numWorkers_(options->get<int>("cpu-threads")),
pcqueue_(numWorkers_) {
if (numWorkers_ <= 0) {
ABORT("Fatal: numWorkers should be greater than 1");
}
if (numWorkers_ == 0) {
// In case workers are 0, a single-translator is created and initialized
// in the main thread.
marian::DeviceId deviceId(/*cpuId=*/0, DeviceType::cpu);
translators_.reserve(numWorkers_);
workers_.reserve(numWorkers_);
for (size_t cpuId = 0; cpuId < numWorkers_; cpuId++) {
marian::DeviceId deviceId(cpuId, DeviceType::cpu);
translators_.emplace_back(deviceId, vocabs_, options);
translators_.back().initialize();
} else {
#ifdef WITH_PTHREADS
// If workers specified are greater than 0, translators_ are populated with
// unitialized instances. These are then initialized inside
// individual threads and set to consume from producer-consumer queue.
workers_.reserve(numWorkers_);
translators_.reserve(numWorkers_);
for (size_t cpuId = 0; cpuId < numWorkers_; cpuId++) {
marian::DeviceId deviceId(cpuId, DeviceType::cpu);
translators_.emplace_back(deviceId, vocabs_, options);
auto &translator = translators_.back();
workers_.emplace_back([&translator, this] {
translator.initialize();
translator.consumeFrom(pcqueue_);
});
}
#else // WITH_PTHREADS
ABORT(
"Fatal: Service started requesting multiple threadswhile compiled with "
"COMPILE_THREAD_VARIANT=off. Please check your cmake build "
"configuration");
#endif
auto &translator = translators_.back();
workers_.emplace_back([&translator, this] {
translator.initialize();
translator.consumeFrom(pcqueue_);
});
}
}
std::future<Response> Service::translateWithCopy(std::string input) {
return translate(std::move(input));
}
std::future<Response> Service::translate(std::string &&input) {
// Takes in a blob of text. Segments and SentenceRanges are
// extracted from the input (blob of text) and used to construct a Request
// along with a promise. promise value is set by the worker completing a
// request.
//
// Batcher, which currently runs on the main thread constructs batches out of
// a single request (at the moment) and adds them into a Producer-Consumer
// queue holding a bunch of requestSentences used to construct batches.
// TODO(jerin): Make asynchronous and compile from multiple requests.
//
// returns future corresponding to the promise.
Segments segments;
SentenceRanges sourceRanges;
text_processor_.process(input, segments, sourceRanges);
std::promise<Response> responsePromise;
auto future = responsePromise.get_future();
Ptr<Request> request = New<Request>(
requestId_++, /* lineNumberBegin = */ 0, vocabs_, std::move(input),
std::move(segments), std::move(sourceRanges), std::move(responsePromise));
batcher_.addWholeRequest(request);
if (numWorkers_ > 0) {
#ifdef WITH_PTHREADS
batcher_.produceTo(pcqueue_);
#endif
} else {
// Queue single-threaded
Batch batch;
while (batcher_ >> batch) {
translators_[0].translate(batch);
}
void Service::enqueue() {
Batch batch;
while (batcher_ >> batch) {
pcqueue_.ProduceSwap(batch);
}
return future;
}
void Service::stop() {
#ifdef WITH_PTHREADS
for (auto &worker : workers_) {
Batch poison = Batch::poison();
pcqueue_.ProduceSwap(poison);
}
for (auto &worker : workers_) {
worker.join();
if (worker.joinable()) {
worker.join();
}
}
workers_.clear(); // Takes care of idempotency.
#endif
workers_.clear();
}
Service::~Service() { stop(); }

View File

@ -1,24 +1,22 @@
#ifndef SRC_BERGAMOT_SERVICE_H_
#define SRC_BERGAMOT_SERVICE_H_
#include "batch_translator.h"
#include "batcher.h"
#include "data/types.h"
#include "pcqueue.h"
#include "response.h"
#include "service_base.h"
#include "text_processor.h"
#include <queue>
#include <vector>
#include "data/types.h"
#ifdef WITH_PTHREADS
#include "pcqueue.h"
#endif
namespace marian {
namespace bergamot {
class Service {
class Service : public ServiceBase {
// Service exposes methods to translate an incoming blob of text to the
// Consumer of bergamot API.
@ -35,47 +33,15 @@ class Service {
public:
explicit Service(Ptr<Options> options);
// Constructs new string copying, calls translate internally.
std::future<Response> translateWithCopy(std::string input);
std::future<Response> translate(std::string &&input);
void stop();
Ptr<Vocab const> sourceVocab() const { return vocabs_.front(); }
Ptr<Vocab const> targetVocab() const { return vocabs_.back(); }
void enqueue() override;
void stop() override;
~Service();
private:
size_t requestId_;
size_t numWorkers_;
// vocabs are used to construct a Request, which later uses it to construct
// Response (decode from words to string).
std::vector<Ptr<Vocab const>> vocabs_; // ORDER DEPENDENCY
// Consists of:
//
// 1. text-processing class (TextProcessor), which handles breaking a blob of
// text into sentences and providing them representated by finite
// vocabulary for further processing by hte neural machine translation.
// 2. a Batcher class which handles efficient batching by minimizing
// padding wasting compute.
// 3. Multiple workers - which are instances of BatchTranslators are
// spawned in separate threads.
//
// Batcher acts as a producer for a producer-consumer queue (pcqueue_), with
// idle BatchTranslators being consumers requesting batches as they're ready.
TextProcessor text_processor_; // ORDER DEPENDENCY
Batcher batcher_;
std::vector<BatchTranslator> translators_;
#ifdef WITH_PTHREADS
PCQueue<Batch> pcqueue_;
size_t numWorkers_; // ORDER DEPENDENCY
PCQueue<Batch> pcqueue_; // ORDER DEPENDENCY
std::vector<std::thread> workers_;
#endif
std::vector<BatchTranslator> translators_;
};
std::vector<Ptr<const Vocab>> loadVocabularies(Ptr<Options> options);

View File

@ -0,0 +1,40 @@
#include "service_base.h"
namespace marian {
namespace bergamot {
ServiceBase::ServiceBase(Ptr<Options> options)
: requestId_(0), vocabs_(std::move(loadVocabularies(options))),
text_processor_(vocabs_, options), batcher_(options) {}
std::future<Response> ServiceBase::translate(std::string &&input) {
Segments segments;
SentenceRanges sourceRanges;
text_processor_.process(input, segments, sourceRanges);
std::promise<Response> responsePromise;
auto future = responsePromise.get_future();
Ptr<Request> request = New<Request>(
requestId_++, /* lineNumberBegin = */ 0, vocabs_, std::move(input),
std::move(segments), std::move(sourceRanges), std::move(responsePromise));
batcher_.addWholeRequest(request);
enqueue();
return future;
}
NonThreadedService::NonThreadedService(Ptr<Options> options)
: ServiceBase(options),
translator_(DeviceId(0, DeviceType::cpu), vocabs_, options) {}
void NonThreadedService::enqueue() {
// Queue single-threaded
Batch batch;
while (batcher_ >> batch) {
translator_.translate(batch);
}
}
} // namespace bergamot
} // namespace marian

View File

@ -0,0 +1,50 @@
#ifndef SRC_BERGAMOT_SUBSTANDARD_SERVICE_H_
#define SRC_BERGAMOT_SUBSTANDARD_SERVICE_H_
#include "batch_translator.h"
#include "batcher.h"
#include "data/types.h"
#include "response.h"
#include "text_processor.h"
#include <queue>
#include <vector>
namespace marian {
namespace bergamot {
class ServiceBase {
public:
explicit ServiceBase(Ptr<Options> options);
std::future<Response> translateWithCopy(std::string input) {
return translate(std::move(input));
};
std::future<Response> translate(std::string &&input);
Ptr<Vocab const> sourceVocab() const { return vocabs_.front(); }
Ptr<Vocab const> targetVocab() const { return vocabs_.back(); }
virtual void enqueue() = 0;
virtual void stop() = 0;
protected:
size_t requestId_;
std::vector<Ptr<Vocab const>> vocabs_; // ORDER DEPENDENCY
TextProcessor text_processor_; // ORDER DEPENDENCY
Batcher batcher_;
};
class NonThreadedService : public ServiceBase {
public:
explicit NonThreadedService(Ptr<Options> options);
void enqueue();
void stop() override{};
private:
BatchTranslator translator_;
};
std::vector<Ptr<const Vocab>> loadVocabularies(Ptr<Options> options);
} // namespace bergamot
} // namespace marian
#endif // SRC_BERGAMOT_SUBSTANDARD_SERVICE_H_