Collapse Service into one class instead of three (#62)

* Merging two Services

* Moving stop() logic to destructor

* We have WITH_PTHREADS back

* string based constructor on Service

* Removing now empty service_base.* files

* Hiding away pcqueue_ construction

Ugliest ifdefs I have done in my life.

* Another ifdef to hide pcqueue header file

* Missing semicolons in WITH_PTHREADS path

* Fixing async_translate residue argument from copy

* Adding comments

* Initialize batchtranslator only at one place

To reduce tax for bytebuffer loads, initialize batchtranslator only at
one place.

* \#ifdef WITH_PTHREADS -> #ifndef WASM_HIDE_THREADS

Sane platform (non WASM) is default. This truly only hide-threads from
compilation path and not switch unswitch pthreads (-lpthread).

* Review comments: Rearranging destructor, fix wrong comment

* Move loadVocabularies to service.cpp and put in anonymous namespace

* Prettifying diff: Removing unwanted empty lines

* Indicate in comments multithreaded has numWorkers translators

* Typo fix: bergamot_translator -> bergamot-translator

* Safety guards to avoid pcqueue illegal init

* Add WASM_HIDE_THREADS as a global WASM_COMPILE_FLAG

* Compile Defs: WASM_HIDE_THREADS -> __EMSCRIPTEN__

* Removing dead CMakeLists.txt code following __EMSCRIPTEN__

* Compile defs: __EMSCRIPTEN__ -> WASM
This commit is contained in:
Jerin Philip 2021-03-23 16:36:13 +00:00 committed by GitHub
parent d75dd85def
commit 34228d37bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 248 additions and 245 deletions

View File

@ -56,6 +56,5 @@ int main(int argc, char *argv[]) {
marian_decoder_minimal(response.histories(), service.targetVocab(), options);
LOG(info, "Total time: {:.5f}s wall", decoderTimer.elapsed());
service.stop();
return 0;
}

View File

@ -30,9 +30,6 @@ int main(int argc, char *argv[]) {
Response response = responseFuture.get();
std::cout << response.translation() << std::endl;
// Stop Service.
service.stop();
// Clear the memory used for the byte array
free(model_bytes); // Ideally, this should be done after the translation model has been gracefully shut down.

View File

@ -27,7 +27,5 @@ int main(int argc, char *argv[]) {
Response response = responseFuture.get();
std::cout << response.translation() << std::endl;
// Stop Service.
service.stop();
return 0;
}

View File

@ -1,24 +1,18 @@
if (NOT USE_WASM_COMPATIBLE_SOURCES)
set(MULTITHREADED_SERVICE_SOURCE "service.cpp")
endif()
add_library(bergamot-translator STATIC
AbstractTranslationModel.cpp
TranslationModel.cpp
# Following files added from browsermt/mts@nuke
byteArrayExample.cpp
text_processor.cpp
sentence_splitter.cpp
batch_translator.cpp
multifactor_priority.cpp
request.cpp
service_base.cpp
${MULTITHREADED_SERVICE_SOURCE}
batcher.cpp
response.cpp
batch.cpp
sentence_ranges.cpp
service.cpp
)
if (COMPILE_DECODER_ONLY)
# A dirty hack because of marian's bad cmake practices

View File

@ -6,53 +6,14 @@
#include <future>
#include <vector>
// All 3rd party includes
#include "3rd_party/marian-dev/src/3rd_party/yaml-cpp/yaml.h"
#include "3rd_party/marian-dev/src/common/config_parser.h"
#include "common/config_validator.h"
#include "common/options.h"
// All local project includes
#include "TranslationModel.h"
#include "translator/parser.h"
#include "translator/service_base.h"
#include "translator/service.h"
std::shared_ptr<marian::Options> parseOptions(const std::string &config) {
marian::Options options;
// @TODO(jerinphilip) There's something off here, @XapaJIaMnu suggests
// that should not be using the defaultConfig. This function only has access
// to std::string config and needs to be able to construct Options from the
// same.
// Absent the following code-segment, there is a parsing exception thrown on
// rebuilding YAML.
//
// Error: Unhandled exception of type 'N4YAML11InvalidNodeE': invalid node;
// this may result from using a map iterator as a sequence iterator, or
// vice-versa
//
// Error: Aborted from void unhandledException() in
// 3rd_party/marian-dev/src/common/logging.cpp:113
marian::ConfigParser configParser = marian::bergamot::createConfigParser();
const YAML::Node &defaultConfig = configParser.getConfig();
options.merge(defaultConfig);
// Parse configs onto defaultConfig.
options.parse(config);
YAML::Node configCopy = options.cloneToYamlNode();
marian::ConfigValidator validator(configCopy);
validator.validateOptions(marian::cli::mode::translation);
return std::make_shared<marian::Options>(options);
}
TranslationModel::TranslationModel(const std::string &config, const void * model_memory)
: configOptions_(std::move(parseOptions(config))),
AbstractTranslationModel(), service_(configOptions_, model_memory) {}
TranslationModel::TranslationModel(const std::string &config,
const void *model_memory)
: AbstractTranslationModel(), service_(config, model_memory) {}
TranslationModel::~TranslationModel() {}

View File

@ -16,7 +16,7 @@
// All local project includes
#include "AbstractTranslationModel.h"
#include "translator/service_base.h"
#include "translator/service.h"
/* A Translation model that translates a plain (without any markups and emojis)
* UTF-8 encoded text. This implementation supports translation from 1 source
@ -29,9 +29,11 @@ public:
*/
/**
* @param config Marian yml config file in the form of a string
* @param model_memory optional byte array (aligned to 64!!!) that contains the bytes of a model.bin.
* @param model_memory optional byte array (aligned to 64!!!) that contains
* the bytes of a model.bin.
*/
TranslationModel(const std::string &config, const void * model_memory = nullptr);
TranslationModel(const std::string &config,
const void *model_memory = nullptr);
~TranslationModel();
@ -69,7 +71,7 @@ public:
private:
// Model configuration options
std::shared_ptr<marian::Options> configOptions_; // ORDER DEPENDECNY
marian::bergamot::NonThreadedService service_; // ORDER DEPENDENCY
marian::bergamot::Service service_; // ORDER DEPENDENCY
};
#endif /* SRC_TRANSLATOR_TRANSLATIONMODEL_H_ */

View File

@ -12,7 +12,7 @@
#include "translator/history.h"
#include "translator/scorers.h"
#ifdef WITH_PTHREADS
#ifndef WASM
#include "pcqueue.h"
#endif

View File

@ -7,7 +7,7 @@
#include "definitions.h"
#include "request.h"
#ifdef WITH_PTHREADS
#ifndef WASM
#include "pcqueue.h"
#endif

View File

@ -1,6 +1,10 @@
#ifndef SRC_BERGAMOT_PARSER_H
#define SRC_BERGAMOT_PARSER_H
#include "3rd_party/yaml-cpp/yaml.h"
#include "common/config_parser.h"
#include "common/config_validator.h"
#include "common/options.h"
#include "marian.h"
namespace marian {
@ -22,6 +26,40 @@ inline marian::ConfigParser createConfigParser() {
return cp;
}
inline std::shared_ptr<marian::Options>
parseOptions(const std::string &config) {
marian::Options options;
// @TODO(jerinphilip) There's something off here, @XapaJIaMnu suggests
// that should not be using the defaultConfig. This function only has access
// to std::string config and needs to be able to construct Options from the
// same.
// Absent the following code-segment, there is a parsing exception thrown on
// rebuilding YAML.
//
// Error: Unhandled exception of type 'N4YAML11InvalidNodeE': invalid node;
// this may result from using a map iterator as a sequence iterator, or
// vice-versa
//
// Error: Aborted from void unhandledException() in
// 3rd_party/marian-dev/src/common/logging.cpp:113
marian::ConfigParser configParser = createConfigParser();
const YAML::Node &defaultConfig = configParser.getConfig();
options.merge(defaultConfig);
// Parse configs onto defaultConfig.
options.parse(config);
YAML::Node configCopy = options.cloneToYamlNode();
marian::ConfigValidator validator(configCopy);
validator.validateOptions(marian::cli::mode::translation);
return std::make_shared<marian::Options>(options);
}
} // namespace bergamot
} // namespace marian

View File

@ -5,25 +5,77 @@
#include <string>
#include <utility>
inline std::vector<marian::Ptr<const marian::Vocab>>
loadVocabularies(marian::Ptr<marian::Options> options) {
// @TODO: parallelize vocab loading for faster startup
auto vfiles = options->get<std::vector<std::string>>("vocabs");
// with the current setup, we need at least two vocabs: src and trg
ABORT_IF(vfiles.size() < 2, "Insufficient number of vocabularies.");
std::vector<marian::Ptr<marian::Vocab const>> vocabs(vfiles.size());
std::unordered_map<std::string, marian::Ptr<marian::Vocab>> vmap;
for (size_t i = 0; i < vocabs.size(); ++i) {
auto m =
vmap.emplace(std::make_pair(vfiles[i], marian::Ptr<marian::Vocab>()));
if (m.second) { // new: load the vocab
m.first->second = marian::New<marian::Vocab>(options, i);
m.first->second->load(vfiles[i]);
}
vocabs[i] = m.first->second;
}
return vocabs;
}
namespace marian {
namespace bergamot {
Service::Service(Ptr<Options> options, const void * model_memory)
: ServiceBase(options), numWorkers_(options->get<int>("cpu-threads")),
pcqueue_(numWorkers_), model_memory_{model_memory} {
if (numWorkers_ == 0) {
ABORT("Fatal: Attempt to create multithreaded instance with --cpu-threads "
"0. ");
}
Service::Service(Ptr<Options> options, const void *model_memory)
: requestId_(0), vocabs_(std::move(loadVocabularies(options))),
text_processor_(vocabs_, options), batcher_(options),
numWorkers_(options->get<int>("cpu-threads")), model_memory_(model_memory)
#ifndef WASM
// 0 elements in PCQueue is illegal and can lead to failures. Adding a
// guard to have at least one entry allocated. In the single-threaded
// case, while initialized pcqueue_ remains unused.
,
pcqueue_(std::max<size_t>(1, numWorkers_))
#endif
{
translators_.reserve(numWorkers_);
if (numWorkers_ == 0) {
build_translators(options, /*numTranslators=*/1);
initialize_blocking_translator();
} else {
build_translators(options, numWorkers_);
initialize_async_translators();
}
}
void Service::build_translators(Ptr<Options> options, size_t numTranslators) {
translators_.reserve(numTranslators);
for (size_t cpuId = 0; cpuId < numTranslators; cpuId++) {
marian::DeviceId deviceId(cpuId, DeviceType::cpu);
translators_.emplace_back(deviceId, vocabs_, options, model_memory_);
}
}
void Service::initialize_blocking_translator() {
translators_.back().initialize();
}
void Service::blocking_translate() {
Batch batch;
while (batcher_ >> batch) {
auto &translator = translators_.back();
translator.translate(batch);
}
}
#ifndef WASM
void Service::initialize_async_translators() {
workers_.reserve(numWorkers_);
for (size_t cpuId = 0; cpuId < numWorkers_; cpuId++) {
marian::DeviceId deviceId(cpuId, DeviceType::cpu);
translators_.emplace_back(deviceId, vocabs_, options, model_memory_);
auto &translator = translators_.back();
auto &translator = translators_[cpuId];
workers_.emplace_back([&translator, this] {
translator.initialize();
@ -42,29 +94,58 @@ Service::Service(Ptr<Options> options, const void * model_memory)
}
}
void Service::enqueue() {
void Service::async_translate() {
Batch batch;
while (batcher_ >> batch) {
pcqueue_.ProduceSwap(batch);
}
}
#else // WASM
void Service::initialize_async_translators() {
ABORT("Cannot run in async mode without multithreading.");
}
void Service::async_translate() {
ABORT("Cannot run in async mode without multithreading.");
}
#endif // WASM
std::future<Response> Service::translate(std::string &&input) {
Segments segments;
SentenceRanges sourceRanges;
text_processor_.process(input, segments, sourceRanges);
std::promise<Response> responsePromise;
auto future = responsePromise.get_future();
Ptr<Request> request = New<Request>(
requestId_++, /* lineNumberBegin = */ 0, vocabs_, std::move(input),
std::move(segments), std::move(sourceRanges), std::move(responsePromise));
batcher_.addWholeRequest(request);
if (numWorkers_ == 0) {
blocking_translate();
} else {
async_translate();
}
return future;
}
Service::~Service() {
#ifndef WASM
for (size_t workerId = 0; workerId < numWorkers_; workerId++) {
void Service::stop() {
for (auto &worker : workers_) {
Batch poison = Batch::poison();
pcqueue_.ProduceSwap(poison);
}
for (auto &worker : workers_) {
if (worker.joinable()) {
worker.join();
for (size_t workerId = 0; workerId < numWorkers_; workerId++) {
if (workers_[workerId].joinable()) {
workers_[workerId].join();
}
}
workers_.clear();
#endif
}
Service::~Service() { stop(); }
} // namespace bergamot
} // namespace marian

View File

@ -4,10 +4,13 @@
#include "batch_translator.h"
#include "batcher.h"
#include "data/types.h"
#include "pcqueue.h"
#include "response.h"
#include "service_base.h"
#include "text_processor.h"
#include "translator/parser.h"
#ifndef WASM
#include "pcqueue.h"
#endif
#include <queue>
#include <vector>
@ -15,44 +18,103 @@
namespace marian {
namespace bergamot {
class Service : public ServiceBase {
// Service exposes methods to translate an incoming blob of text to the
// Consumer of bergamot API.
//
// An example use of this API looks as follows:
//
// options = ...;
// service = Service(options);
// std::string input_blob = "Hello World";
// std::future<Response>
// response = service.translate(std::move(input_blob));
// response.wait();
// Response result = response.get();
/// Service exposes methods to translate an incoming blob of text to the
/// Consumer of bergamot API.
///
/// An example use of this API looks as follows:
///
/// options = ...;
/// service = Service(options);
/// std::string input_blob = "Hello World";
/// std::future<Response>
/// response = service.translate(std::move(input_blob));
/// response.wait();
/// Response result = response.get();
///
/// Optionally Service can be initialized by also passing model_memory for
/// purposes of efficiency (which defaults to nullpointer and then reads from
/// file supplied through config).
class Service {
public:
/**
* @param options Marian options object
* @param model_memory byte array (aligned to 64!!!) that contains the bytes of a model.bin. Optional, defaults to nullptr when not used
*/
explicit Service(Ptr<Options> options, const void * model_memory=nullptr);
// Implements enqueue and top through blocking methods.
void stop() override;
/// @param options Marian options object
/// @param model_memory byte array (aligned to 64!!!) that contains the bytes
/// of a model.bin. Optional, defaults to nullptr when not used
explicit Service(Ptr<Options> options, const void *model_memory = nullptr);
/// Construct Service from a string configuration.
/// @param [in] config string parsable as YAML expected to adhere with marian
/// config
/// @param [in] model_memory byte array (aligned to 64!!!) that contains the
/// bytes of a model.bin. Optional, defaults to nullptr when not used
explicit Service(const std::string &config,
const void *model_memory = nullptr)
: Service(parseOptions(config), model_memory) {}
/// Explicit destructor to clean up after any threads initialized in
/// asynchronous operation mode.
~Service();
/// Shared pointer to source-vocabulary.
Ptr<Vocab const> sourceVocab() const { return vocabs_.front(); }
/// Shared pointer to target vocabulary.
Ptr<Vocab const> targetVocab() const { return vocabs_.back(); }
/// To stay efficient and to refer to the string for alignments, expects
/// ownership be moved through std::move(..)
///
/// @param [in] rvalue reference of string to be translated.
std::future<Response> translate(std::string &&input);
private:
void enqueue() override;
/// Build numTranslators number of translators with options from options
void build_translators(Ptr<Options> options, size_t numTranslators);
/// Initializes a blocking translator without using std::thread
void initialize_blocking_translator();
/// Translates through direct interaction between batcher_ and translators_
void blocking_translate();
// In addition to the common members (text_processor, requestId, vocabs_,
// batcher) extends with a producer-consumer queue, vector of translator
// instances owned by service each listening to the pcqueue in separate
// threads.
/// Launches multiple workers of translators using std::thread
/// Reduces to ABORT if called when not compiled WITH_PTHREAD
void initialize_async_translators();
/// Async translate produces to a producer-consumer queue as batches are
/// generated by Batcher. In another thread, the translators consume from
/// producer-consumer queue.
/// Reduces to ABORT if called when not compiled WITH_PTHREAD
void async_translate();
size_t numWorkers_; // ORDER DEPENDENCY
PCQueue<Batch> pcqueue_; // ORDER DEPENDENCY
const void * model_memory_;
std::vector<std::thread> workers_;
/// Number of workers to launch.
size_t numWorkers_; // ORDER DEPENDENCY (pcqueue_)
const void *model_memory_; /// Model memory to load model passed as bytes.
/// Holds instances of batch translators, just one in case
/// of single-threaded application, numWorkers_ in case of multithreaded
/// setting.
std::vector<BatchTranslator> translators_;
/// Stores requestId of active request. Used to establish
/// ordering among requests and logging/book-keeping.
size_t requestId_;
/// Store vocabs representing source and target.
std::vector<Ptr<Vocab const>> vocabs_; // ORDER DEPENDENCY (text_processor_)
/// TextProcesser takes a blob of text and converts into format consumable by
/// the batch-translator and annotates sentences and words.
TextProcessor text_processor_; // ORDER DEPENDENCY (vocabs_)
/// Batcher handles generation of batches from a request, subject to
/// packing-efficiency and priority optimization heuristics.
Batcher batcher_;
// The following constructs are available providing full capabilities on a non
// WASM platform, where one does not have to hide threads.
#ifndef WASM
PCQueue<Batch> pcqueue_; // ORDER DEPENDENCY (numWorkers_)
std::vector<std::thread> workers_;
#endif // WASM
};
} // namespace bergamot

View File

@ -1,42 +0,0 @@
#include "service_base.h"
namespace marian {
namespace bergamot {
ServiceBase::ServiceBase(Ptr<Options> options)
: requestId_(0), vocabs_(std::move(loadVocabularies(options))),
text_processor_(vocabs_, options), batcher_(options) {}
std::future<Response> ServiceBase::translate(std::string &&input) {
Segments segments;
SentenceRanges sourceRanges;
text_processor_.process(input, segments, sourceRanges);
std::promise<Response> responsePromise;
auto future = responsePromise.get_future();
Ptr<Request> request = New<Request>(
requestId_++, /* lineNumberBegin = */ 0, vocabs_, std::move(input),
std::move(segments), std::move(sourceRanges), std::move(responsePromise));
batcher_.addWholeRequest(request);
enqueue();
return future;
}
NonThreadedService::NonThreadedService(Ptr<Options> options, const void * model_memory)
: ServiceBase(options),
translator_(DeviceId(0, DeviceType::cpu), vocabs_, options, model_memory) {
translator_.initialize();
}
void NonThreadedService::enqueue() {
// Queue single-threaded
Batch batch;
while (batcher_ >> batch) {
translator_.translate(batch);
}
}
} // namespace bergamot
} // namespace marian

View File

@ -1,87 +0,0 @@
#ifndef SRC_BERGAMOT_SERVICE_BASE_H_
#define SRC_BERGAMOT_SERVICE_BASE_H_
#include "batch_translator.h"
#include "batcher.h"
#include "data/types.h"
#include "response.h"
#include "text_processor.h"
#include <queue>
#include <vector>
namespace marian {
namespace bergamot {
// This file describes the base class ServiceBase, and a non-threaded subclass
// implementing translation functionality called NonThreadedService.
class ServiceBase {
public:
/**
* @param options Marian options object
*/
explicit ServiceBase(Ptr<Options> options);
// Transfers ownership of input string to Service, returns a future containing
// an object which provides access to translations, other features like
// sentencemappings and (tentatively) alignments.
std::future<Response> translate(std::string &&input);
// Convenience accessor methods to extract these vocabulary outside service.
// e.g: For use in decoding histories for marian-decoder replacement.
Ptr<Vocab const> sourceVocab() const { return vocabs_.front(); }
Ptr<Vocab const> targetVocab() const { return vocabs_.back(); }
// Wraps up any thread related destruction code.
virtual void stop() = 0;
protected:
// Enqueue queues a request for translation, this can be synchronous, blocking
// or asynchronous and queued in the background.
virtual void enqueue() = 0;
size_t requestId_;
std::vector<Ptr<Vocab const>> vocabs_; // ORDER DEPENDENCY
TextProcessor text_processor_; // ORDER DEPENDENCY
Batcher batcher_;
};
class NonThreadedService : public ServiceBase {
public:
/**
* @param options Marian options object
* @param model_memory byte array (aligned to 64!!!) that contains the bytes of a model.bin. Provide a nullptr if not used.
*/
explicit NonThreadedService(Ptr<Options> options, const void * model_memory);
void stop() override{};
private:
// NonThreaded service overrides unimplemented functions in base-class using
// blocking mechanisms.
void enqueue() override;
// There's a single translator, launched as part of the main process.
BatchTranslator translator_;
};
// Used across Services
inline std::vector<Ptr<const Vocab>> loadVocabularies(Ptr<Options> options) {
// @TODO: parallelize vocab loading for faster startup
auto vfiles = options->get<std::vector<std::string>>("vocabs");
// with the current setup, we need at least two vocabs: src and trg
ABORT_IF(vfiles.size() < 2, "Insufficient number of vocabularies.");
std::vector<Ptr<Vocab const>> vocabs(vfiles.size());
std::unordered_map<std::string, Ptr<Vocab>> vmap;
for (size_t i = 0; i < vocabs.size(); ++i) {
auto m = vmap.emplace(std::make_pair(vfiles[i], Ptr<Vocab>()));
if (m.second) { // new: load the vocab
m.first->second = New<Vocab>(options, i);
m.first->second->load(vfiles[i]);
}
vocabs[i] = m.first->second;
}
return vocabs;
}
} // namespace bergamot
} // namespace marian
#endif // SRC_BERGAMOT_SERVICE_BASE_H_