use shared pointers in HgImportRequestQueue

Summary: This diff sets up the ability for us to track requests as they are shuffled around in the `std::vector<> queue`. Since the queue is a vector, and since it is sorted everytime a new element is added or removed, we cannot keep track of elements in the queue with indices or references. Instead, we will store the requests in a shared_ptr so we can maintain a pointer to the request no matter where the request is moved around to.

Reviewed By: kmancini

Differential Revision: D26355907

fbshipit-source-id: d714d689963106a4f495221dbcfcbab758ffc7b2
This commit is contained in:
Genevieve Helsel 2021-02-25 09:17:33 -08:00 committed by Facebook GitHub Bot
parent 30ad76433e
commit 8b057b96e2
5 changed files with 63 additions and 36 deletions

View File

@ -30,14 +30,22 @@ void HgImportRequestQueue::enqueue(HgImportRequest request) {
return;
}
state->queue.emplace_back(std::move(request));
std::push_heap(state->queue.begin(), state->queue.end());
state->queue.emplace_back(
std::make_shared<HgImportRequest>(std::move(request)));
std::push_heap(
state->queue.begin(),
state->queue.end(),
[](const std::shared_ptr<HgImportRequest>& lhs,
const std::shared_ptr<HgImportRequest>& rhs) {
return (*lhs) < (*rhs);
});
}
queueCV_.notify_one();
}
std::vector<HgImportRequest> HgImportRequestQueue::dequeue(size_t count) {
std::vector<std::shared_ptr<HgImportRequest>> HgImportRequestQueue::dequeue(
size_t count) {
auto state = state_.lock();
while (state->running && state->queue.empty()) {
@ -46,13 +54,13 @@ std::vector<HgImportRequest> HgImportRequestQueue::dequeue(size_t count) {
if (!state->running) {
state->queue.clear();
return std::vector<HgImportRequest>();
return std::vector<std::shared_ptr<HgImportRequest>>();
}
auto& queue = state->queue;
std::vector<HgImportRequest> result;
std::vector<HgImportRequest> putback;
std::vector<std::shared_ptr<HgImportRequest>> result;
std::vector<std::shared_ptr<HgImportRequest>> putback;
std::optional<size_t> type;
for (size_t i = 0; i < count * 3; i++) {
@ -60,16 +68,22 @@ std::vector<HgImportRequest> HgImportRequestQueue::dequeue(size_t count) {
break;
}
std::pop_heap(queue.begin(), queue.end());
std::pop_heap(
queue.begin(),
queue.end(),
[](const std::shared_ptr<HgImportRequest>& lhs,
const std::shared_ptr<HgImportRequest>& rhs) {
return (*lhs) < (*rhs);
});
auto request = std::move(queue.back());
queue.pop_back();
if (!type) {
type = request.getType();
type = request->getType();
result.emplace_back(std::move(request));
} else {
if (*type == request.getType()) {
if (*type == request->getType()) {
result.emplace_back(std::move(request));
} else {
putback.emplace_back(std::move(request));
@ -79,7 +93,13 @@ std::vector<HgImportRequest> HgImportRequestQueue::dequeue(size_t count) {
for (auto& item : putback) {
queue.emplace_back(std::move(item));
std::push_heap(queue.begin(), queue.end());
std::push_heap(
queue.begin(),
queue.end(),
[](const std::shared_ptr<HgImportRequest>& lhs,
const std::shared_ptr<HgImportRequest>& rhs) {
return (*lhs) < (*rhs);
});
}
return result;

View File

@ -34,7 +34,7 @@ class HgImportRequestQueue {
* The returned vector may have fewer requests than it requested, and all
* requests in the vector are guaranteed to be the same type.
*/
std::vector<HgImportRequest> dequeue(size_t count);
std::vector<std::shared_ptr<HgImportRequest>> dequeue(size_t count);
void stop();
@ -44,7 +44,7 @@ class HgImportRequestQueue {
struct State {
bool running = true;
std::vector<HgImportRequest> queue;
std::vector<std::shared_ptr<HgImportRequest>> queue;
};
folly::Synchronized<State, std::mutex> state_;

View File

@ -91,7 +91,7 @@ HgQueuedBackingStore::~HgQueuedBackingStore() {
}
void HgQueuedBackingStore::processBlobImportRequests(
std::vector<HgImportRequest>&& requests) {
std::vector<std::shared_ptr<HgImportRequest>>&& requests) {
std::vector<Hash> hashes;
std::vector<HgProxyHash> proxyHashes;
std::vector<folly::Promise<HgImportRequest::BlobImport::Response>*> promises;
@ -104,12 +104,13 @@ void HgQueuedBackingStore::processBlobImportRequests(
XLOG(DBG4) << "Processing blob import batch size=" << requests.size();
for (auto& request : requests) {
auto* blobImport = request.getRequest<HgImportRequest::BlobImport>();
auto* blobImport = request->getRequest<HgImportRequest::BlobImport>();
auto& hash = blobImport->hash;
auto* promise = request.getPromise<HgImportRequest::BlobImport::Response>();
auto* promise =
request->getPromise<HgImportRequest::BlobImport::Response>();
traceBus_->publish(HgImportTraceEvent::start(
request.getUnique(), HgImportTraceEvent::BLOB, blobImport->proxyHash));
request->getUnique(), HgImportTraceEvent::BLOB, blobImport->proxyHash));
XLOGF(
DBG4,
@ -143,11 +144,11 @@ void HgQueuedBackingStore::processBlobImportRequests(
.defer([request = std::move(*request), watch, stats = stats_](
auto&& result) mutable {
auto hash =
request.getRequest<HgImportRequest::BlobImport>()->hash;
request->getRequest<HgImportRequest::BlobImport>()->hash;
XLOG(DBG4) << "Imported blob from HgImporter for " << hash;
stats->getHgBackingStoreStatsForCurrentThread()
.hgBackingStoreGetBlob.addValue(watch.elapsed().count());
request.getPromise<HgImportRequest::BlobImport::Response>()
request->getPromise<HgImportRequest::BlobImport::Response>()
->setTry(std::forward<decltype(result)>(result));
}));
}
@ -157,14 +158,14 @@ void HgQueuedBackingStore::processBlobImportRequests(
}
void HgQueuedBackingStore::processTreeImportRequests(
std::vector<HgImportRequest>&& requests) {
std::vector<std::shared_ptr<HgImportRequest>>&& requests) {
for (auto& request : requests) {
auto treeImport = request.getRequest<HgImportRequest::TreeImport>();
auto treeImport = request->getRequest<HgImportRequest::TreeImport>();
traceBus_->publish(HgImportTraceEvent::start(
request.getUnique(), HgImportTraceEvent::TREE, treeImport->proxyHash));
request->getUnique(), HgImportTraceEvent::TREE, treeImport->proxyHash));
request.getPromise<HgImportRequest::TreeImport::Response>()->setWith(
request->getPromise<HgImportRequest::TreeImport::Response>()->setWith(
[store = backingStore_.get(),
hash = treeImport->hash,
proxyHash = treeImport->proxyHash,
@ -181,10 +182,10 @@ void HgQueuedBackingStore::processTreeImportRequests(
}
void HgQueuedBackingStore::processPrefetchRequests(
std::vector<HgImportRequest>&& requests) {
std::vector<std::shared_ptr<HgImportRequest>>&& requests) {
for (auto& request : requests) {
auto parameter = request.getRequest<HgImportRequest::Prefetch>();
request.getPromise<HgImportRequest::Prefetch::Response>()->setWith(
auto parameter = request->getRequest<HgImportRequest::Prefetch>();
request->getPromise<HgImportRequest::Prefetch::Response>()->setWith(
[store = backingStore_.get(),
proxyHashes = parameter->proxyHashes]() mutable {
return store
@ -206,11 +207,11 @@ void HgQueuedBackingStore::processRequest() {
const auto& first = requests.at(0);
if (first.isType<HgImportRequest::BlobImport>()) {
if (first->isType<HgImportRequest::BlobImport>()) {
processBlobImportRequests(std::move(requests));
} else if (first.isType<HgImportRequest::TreeImport>()) {
} else if (first->isType<HgImportRequest::TreeImport>()) {
processTreeImportRequests(std::move(requests));
} else if (first.isType<HgImportRequest::Prefetch>()) {
} else if (first->isType<HgImportRequest::Prefetch>()) {
processPrefetchRequests(std::move(requests));
}
}

View File

@ -157,9 +157,12 @@ class HgQueuedBackingStore : public BackingStore {
HgQueuedBackingStore(const HgQueuedBackingStore&) = delete;
HgQueuedBackingStore& operator=(const HgQueuedBackingStore&) = delete;
void processBlobImportRequests(std::vector<HgImportRequest>&& requests);
void processTreeImportRequests(std::vector<HgImportRequest>&& requests);
void processPrefetchRequests(std::vector<HgImportRequest>&& requests);
void processBlobImportRequests(
std::vector<std::shared_ptr<HgImportRequest>>&& requests);
void processTreeImportRequests(
std::vector<std::shared_ptr<HgImportRequest>>&& requests);
void processPrefetchRequests(
std::vector<std::shared_ptr<HgImportRequest>>&& requests);
/**
* The worker runloop function.

View File

@ -82,12 +82,15 @@ TEST(HgImportRequestQueueTest, getRequestByPriority) {
EXPECT_EQ(
expected,
queue.dequeue(1).at(0).getRequest<HgImportRequest::BlobImport>()->hash);
queue.dequeue(1)
.at(0)
->getRequest<HgImportRequest::BlobImport>()
->hash);
}
EXPECT_EQ(
smallHash,
queue.dequeue(1).at(0).getRequest<HgImportRequest::BlobImport>()->hash);
queue.dequeue(1).at(0)->getRequest<HgImportRequest::BlobImport>()->hash);
}
TEST(HgImportRequestQueueTest, getRequestByPriorityReverse) {
@ -109,7 +112,7 @@ TEST(HgImportRequestQueueTest, getRequestByPriorityReverse) {
queue.enqueue(std::move(largeRequest));
EXPECT_EQ(
largeHash,
queue.dequeue(1).at(0).getRequest<HgImportRequest::BlobImport>()->hash);
queue.dequeue(1).at(0)->getRequest<HgImportRequest::BlobImport>()->hash);
while (!enqueued.empty()) {
auto expected = enqueued.front();
@ -119,7 +122,7 @@ TEST(HgImportRequestQueueTest, getRequestByPriorityReverse) {
EXPECT_EQ(
expected,
request.at(0).getRequest<HgImportRequest::BlobImport>()->hash);
request.at(0)->getRequest<HgImportRequest::BlobImport>()->hash);
}
}
@ -151,7 +154,7 @@ TEST(HgImportRequestQueueTest, getMultipleRequests) {
for (int i = 0; i < 10; i++) {
EXPECT_TRUE(
enqueued_blob.find(
dequeued.at(i).getRequest<HgImportRequest::BlobImport>()->hash) !=
dequeued.at(i)->getRequest<HgImportRequest::BlobImport>()->hash) !=
enqueued_blob.end());
}
}