Add API to trigger fast shutdown of AsyncService (#297)

Add a way to AsyncService to shut down without finishing the full queue
through `AsyncService::clear()`. The default behaviour is that
`AsyncService::~AsyncService()` will wait for any pending translation
requests to finish.

One can call `AsyncService::clear()` before the calls to the destructor
to ensure there is no work for the service to finish before the workers
can stop and join. Marian batches that are already in progress will not
stop. We are not trying to cause interrupts in threads or something that
complex. However, these single batches often do not take that long to
complete.

Changes:

 - Add clear() to AsyncService
 - Add clear() to BatchingPool
 - Documentation

See also:  XapaJIaMnu/translateLocally#80
This commit is contained in:
Jelmer 2022-01-21 13:14:57 +00:00 committed by GitHub
parent 7099b9e9ad
commit aef76c03a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 35 additions and 0 deletions

View File

@ -30,5 +30,7 @@ size_t AggregateBatchingPool::generateBatch(Ptr<TranslationModel>& model, Batch&
return /*numSentences=*/0;
}
void AggregateBatchingPool::clear() { aggregateQueue_.clear(); }
} // namespace bergamot
} // namespace marian

View File

@ -58,6 +58,10 @@ class AggregateBatchingPool {
/// @returns Number of sentences in the generated batch.
size_t generateBatch(Ptr<TranslationModel>& model, Batch& batch);
/// Clear the aggregate queue. Does not clear the underlying model/request pairs but the next call
/// to `generateBatch()` will return 0. (Unless `enqueueRequest()` was called in the mean time.)
void clear();
private:
std::unordered_set<std::shared_ptr<TranslationModel>, HashPtr<TranslationModel>> aggregateQueue_;
};

View File

@ -79,5 +79,11 @@ size_t BatchingPool::enqueueRequest(Ptr<Request> request) {
return toBeFreshlyTranslated;
}
void BatchingPool::clear() {
for (size_t length = 0; length < bucket_.size(); length++) {
bucket_[length].clear();
}
}
} // namespace bergamot
} // namespace marian

View File

@ -26,6 +26,9 @@ class BatchingPool {
// requests optimizing for both padding and priority.
size_t generateBatch(Batch &batch);
// Removes any pending requests from the pool.
void clear();
private:
size_t miniBatchWords_;
std::vector<std::set<RequestSentence>> bucket_;

View File

@ -139,12 +139,15 @@ AsyncService::AsyncService(const AsyncService::Config &config)
}
}
void AsyncService::clear() { safeBatchingPool_.clear(); }
AsyncService::~AsyncService() {
safeBatchingPool_.shutdown();
for (std::thread &worker : workers_) {
assert(worker.joinable());
worker.join();
}
workers_.clear();
}
void AsyncService::pivot(std::shared_ptr<TranslationModel> first, std::shared_ptr<TranslationModel> second,

View File

@ -159,7 +159,11 @@ class AsyncService {
void pivot(std::shared_ptr<TranslationModel> first, std::shared_ptr<TranslationModel> second, std::string &&source,
CallbackType clientCallback, const ResponseOptions &options = ResponseOptions());
/// Clears all pending requests.
void clear();
/// Thread joins and proper shutdown are required to be handled explicitly.
/// If you do not want to wait, call `clear()` before destructor.
~AsyncService();
TranslationCache::Stats cacheStats() { return cache_.stats(); }

View File

@ -27,6 +27,13 @@ void ThreadsafeBatchingPool<BatchingPoolType>::enqueueRequest(Args &&... args) {
work_.notify_all();
}
template <class BatchingPoolType>
void ThreadsafeBatchingPool<BatchingPoolType>::clear() {
std::unique_lock<std::mutex> lock(mutex_);
backend_.clear();
enqueued_ = 0;
}
template <class BatchingPoolType>
void ThreadsafeBatchingPool<BatchingPoolType>::shutdown() {
std::unique_lock<std::mutex> lock(mutex_);

View File

@ -43,6 +43,12 @@ class ThreadsafeBatchingPool {
template <class... Args>
size_t generateBatch(Args &&... args);
// Removes any pending requests from the batching pool.
void clear();
// Signals shut down of batching pool. After this no new requests can be enqueued,
// but all enqueued requests will be processed. To prevent this from happening,
// call `clear()` before `shutdown()`.
void shutdown();
private: