batch imports blobs from Rust stores from EdenAPI

Summary:
NOTE: This stack works together, they are separated for easier reviewing. Check D21723465 if you want to read this stack in one place.

This diff finally connects EdenAPI with EdenFS. Previously we only had basic EdenAPI implemented but the performance was not acceptable. This is largely due to overheads in talking with the remote server. Now EdenFS is able to import multiple files at once, we efficiently use EdenAPI to import these data now.

Reviewed By: chadaustin

Differential Revision: D21576569

fbshipit-source-id: a45e832ec63d057730138551393ff7547fa2c22f
This commit is contained in:
Zeyi (Rice) Fan 2020-06-05 15:38:25 -07:00 committed by Facebook GitHub Bot
parent 854637e854
commit 3754c14ec3
3 changed files with 75 additions and 51 deletions

View File

@ -109,15 +109,54 @@ std::unique_ptr<Blob> HgDatapackStore::getBlobRemote(
return nullptr;
}
std::unique_ptr<Tree> HgDatapackStore::getTree(
const RelativePath& path,
const Hash& manifestId,
const Hash& edenTreeId,
LocalStore::WriteBatch* writeBatch) {
if (auto tree = store_.getTree(manifestId.getBytes())) {
return fromRawTree(tree.get(), edenTreeId, path, writeBatch);
void HgDatapackStore::getBlobBatch(
const std::vector<Hash>& ids,
const std::vector<HgProxyHash>& hashes,
std::vector<folly::Promise<std::unique_ptr<Blob>>*> promises) {
std::vector<Hash> blobhashes;
std::vector<std::pair<folly::ByteRange, folly::ByteRange>> requests;
size_t count = hashes.size();
requests.reserve(count);
blobhashes.reserve(count);
// `.revHash()` will return an owned `Hash` and `getBytes()` will return a
// reference to that newly created `Hash`. We need to store these `Hash` to
// avoid storing invalid pointers in `requests`. For a similar reason, we
// cannot use iterator-based loop here otherwise the reference we get will be
// pointing to the iterator.
for (size_t i = 0; i < count; i++) {
blobhashes.emplace_back(hashes[i].revHash());
}
auto blobhash = blobhashes.begin();
auto hash = hashes.begin();
for (; blobhash != blobhashes.end(); blobhash++, hash++) {
CHECK(hash != hashes.end());
requests.emplace_back(std::make_pair<>(
folly::ByteRange{hash->path().stringPiece()}, blobhash->getBytes()));
}
store_.getBlobBatch(
requests,
false,
[promises = std::move(promises), ids, requests](
size_t index, std::unique_ptr<folly::IOBuf> content) {
XLOGF(
DBG9,
"Imported name={} node={}",
folly::StringPiece{requests[index].first},
folly::hexlify(requests[index].second));
auto blob = std::make_unique<Blob>(ids[index], *content);
promises[index]->setValue(std::move(blob));
});
}
std::unique_ptr<Tree> HgDatapackStore::getTree(
const RelativePath&,
const Hash&,
const Hash&,
LocalStore::WriteBatch*) {
return nullptr;
}

View File

@ -8,6 +8,7 @@
#pragma once
#include <folly/Range.h>
#include <folly/futures/Promise.h>
#include "eden/fs/model/Blob.h"
#include "eden/fs/store/LocalStore.h"
@ -34,6 +35,16 @@ class HgDatapackStore {
const Hash& id,
const HgProxyHash& hgInfo);
/**
* Import multiple blobs at once. The vector parameters have to be the same
* length. Promises passed in will be resolved if a blob is successfully
* imported. Otherwise the promise will be left untouched.
*/
void getBlobBatch(
const std::vector<Hash>& ids,
const std::vector<HgProxyHash>& hashes,
std::vector<folly::Promise<std::unique_ptr<Blob>>*> promises);
std::unique_ptr<Tree> getTree(
const RelativePath& path,
const Hash& manifestId,

View File

@ -55,15 +55,24 @@ HgQueuedBackingStore::~HgQueuedBackingStore() {
void HgQueuedBackingStore::processBlobImportRequests(
std::vector<HgImportRequest>&& requests) {
std::vector<Hash> hashes;
std::vector<folly::Promise<HgImportRequest::BlobImport::Response>*> promises;
folly::stop_watch<std::chrono::milliseconds> watch;
hashes.reserve(requests.size());
promises.reserve(requests.size());
XLOG(DBG4) << "Processing blob import batch size=" << requests.size();
for (auto& request : requests) {
auto& hash = request.getRequest<HgImportRequest::BlobImport>()->hash;
XLOG(DBG4) << "Processing blob request for " << hash;
auto* promise = request.getPromise<HgImportRequest::BlobImport::Response>();
XLOGF(
DBG4,
"Processing blob request for {} ({:p})",
hash.toString(),
static_cast<void*>(promise));
hashes.emplace_back(hash);
promises.emplace_back(promise);
}
auto proxyHashesTry =
@ -85,49 +94,7 @@ void HgQueuedBackingStore::processBlobImportRequests(
auto proxyHashes = proxyHashesTry.value();
// logic:
// check with hgcache with the Rust code, if it does not exist there, try
// hgimporthelper and mononoke if possible
{
// check hgcache
auto request = requests.begin();
auto proxyHash = proxyHashes.begin();
auto& stats = stats_->getHgBackingStoreStatsForCurrentThread();
size_t count = 0;
XCHECK_EQ(requests.size(), proxyHashes.size());
for (; request != requests.end();) {
auto hash = request->getRequest<HgImportRequest::BlobImport>()->hash;
if (auto blob = backingStore_->getBlobFromHgCache(hash, *proxyHash)) {
XLOG(DBG4) << "Imported blob from hgcache for " << hash;
request->getPromise<decltype(blob)>()->setValue(std::move(blob));
stats.hgBackingStoreGetBlob.addValue(watch.elapsed().count());
count += 1;
// Swap-and-pop, removing fulfilled request from the list.
// It is fine to call `.back()` here because if we are in the loop
// `requests` are guaranteed to be nonempty.
// @lint-ignore HOWTOEVEN ParameterUncheckedArrayBounds
std::swap(*request, requests.back());
requests.pop_back();
// Same reason as above, if `proxyHashes` is empty it would be caught
// by the XHCECK_EQ call above.
// @lint-ignore HOWTOEVEN LocalUncheckedArrayBounds
std::swap(*proxyHash, proxyHashes.back());
proxyHashes.pop_back();
} else {
request++;
proxyHash++;
}
}
XLOG(DBG4) << "Fetched " << count << " requests from hgcache";
}
// TODO: check EdenAPI
backingStore_->getDatapackStore().getBlobBatch(hashes, proxyHashes, promises);
{
auto request = requests.begin();
@ -137,6 +104,13 @@ void HgQueuedBackingStore::processBlobImportRequests(
XCHECK_EQ(requests.size(), proxyHashes.size());
for (; request != requests.end(); request++, proxyHash++) {
if (request->getPromise<HgImportRequest::BlobImport::Response>()
->isFulfilled()) {
stats_->getHgBackingStoreStatsForCurrentThread()
.hgBackingStoreGetBlob.addValue(watch.elapsed().count());
continue;
}
futures.emplace_back(
backingStore_->fetchBlobFromHgImporter(*proxyHash)
.defer([request = std::move(*request), watch, stats = stats_](