eden: decouple LocalStore from HgImporter

Summary: The only reason `HgImporter` still holds a pointer to LocalStore is for looking up proxy hashes. We can directly pass in the Mercurial hash and path instead.

Reviewed By: chadaustin

Differential Revision: D19049039

fbshipit-source-id: 45d1e1f363ed73dca447b22e5891495cf9ad202b
This commit is contained in:
Zeyi (Rice) Fan 2020-01-16 12:43:54 -08:00 committed by Facebook Github Bot
parent 283c6de0df
commit bd7b3a38c8
7 changed files with 65 additions and 72 deletions

View File

@ -105,19 +105,15 @@ class HgImporterThreadFactory : public folly::ThreadFactory {
public:
HgImporterThreadFactory(
AbsolutePathPiece repository,
LocalStore* localStore,
std::shared_ptr<EdenStats> stats)
: delegate_("HgImporter"),
repository_(repository),
localStore_(localStore),
stats_(std::move(stats)) {}
std::thread newThread(folly::Func&& func) override {
return delegate_.newThread([this, func = std::move(func)]() mutable {
threadLocalImporter.reset(new HgImporterManager(
repository_,
localStore_,
getSharedHgImporterStatsForCurrentThread(stats_)));
repository_, getSharedHgImporterStatsForCurrentThread(stats_)));
func();
});
}
@ -125,7 +121,6 @@ class HgImporterThreadFactory : public folly::ThreadFactory {
private:
folly::NamedThreadFactory delegate_;
AbsolutePath repository_;
LocalStore* localStore_;
std::shared_ptr<EdenStats> stats_;
};
@ -217,10 +212,7 @@ HgBackingStore::HgBackingStore(
*/
make_unique<folly::UnboundedBlockingQueue<
folly::CPUThreadPoolExecutor::CPUTask>>(),
std::make_shared<HgImporterThreadFactory>(
repository,
localStore,
stats))),
std::make_shared<HgImporterThreadFactory>(repository, stats))),
config_(config),
serverThreadPool_(serverThreadPool) {
#ifdef EDEN_HAVE_RUST_DATAPACK
@ -233,7 +225,7 @@ HgBackingStore::HgBackingStore(
}
#endif
HgImporter importer(
repository, localStore, getSharedHgImporterStatsForCurrentThread(stats));
repository, getSharedHgImporterStatsForCurrentThread(stats));
const auto& options = importer.getOptions();
initializeTreeManifestImport(options, repository);
repoName_ = options.repoName;
@ -719,12 +711,14 @@ SemiFuture<unique_ptr<Blob>> HgBackingStore::getBlob(const Hash& id) {
}));
}
futures.push_back(getBlobFromHgImporter(id).deferValue(
[stats = stats_, watch](SemiFuture<std::unique_ptr<Blob>>&& blob) {
stats->getHgBackingStoreStatsForCurrentThread()
.hgBackingStoreGetBlob.addValue(watch.elapsed().count());
return std::move(blob);
}));
futures.push_back(
getBlobFromHgImporter(hgInfo.path(), hgInfo.revHash())
.deferValue([stats = stats_,
watch](SemiFuture<std::unique_ptr<Blob>>&& blob) {
stats->getHgBackingStoreStatsForCurrentThread()
.hgBackingStoreGetBlob.addValue(watch.elapsed().count());
return std::move(blob);
}));
return folly::collectAnyWithoutException(futures).deferValue(
[](std::pair<size_t, unique_ptr<Blob>>&& result) {
@ -733,9 +727,10 @@ SemiFuture<unique_ptr<Blob>> HgBackingStore::getBlob(const Hash& id) {
}
SemiFuture<std::unique_ptr<Blob>> HgBackingStore::getBlobFromHgImporter(
const RelativePathPiece& path,
const Hash& id) {
return folly::via(importThreadPool_.get(), [id] {
return getThreadLocalImporter().importFileContents(id);
return folly::via(importThreadPool_.get(), [path = path.copy(), id] {
return getThreadLocalImporter().importFileContents(path, id);
});
}

View File

@ -156,6 +156,7 @@ class HgBackingStore : public BackingStore {
#endif
folly::SemiFuture<std::unique_ptr<Blob>> getBlobFromHgImporter(
const RelativePathPiece& path,
const Hash& id);
folly::Future<std::unique_ptr<Tree>> getTreeForCommitImpl(Hash commitID);

View File

@ -35,7 +35,6 @@
#include "eden/fs/model/Blob.h"
#include "eden/fs/model/Tree.h"
#include "eden/fs/model/TreeEntry.h"
#include "eden/fs/store/LocalStore.h"
#include "eden/fs/store/hg/HgImportPyError.h"
#include "eden/fs/store/hg/HgProxyHash.h"
#include "eden/fs/telemetry/EdenStats.h"
@ -77,11 +76,6 @@ DEFINE_string(
"this value is non-empty, the existing PYTHONPATH from the environment is "
"replaced with this value.");
DEFINE_int32(
hgManifestImportBufferSize,
256 * 1024 * 1024, // 256MB
"Buffer size for batching LocalStore writes during hg manifest imports");
namespace {
using namespace facebook::eden;
@ -135,10 +129,9 @@ class HgImporterEofError : public HgImporterError {
HgImporter::HgImporter(
AbsolutePathPiece repoPath,
LocalStore* store,
std::shared_ptr<HgImporterThreadStats> stats,
std::optional<AbsolutePath> importHelperScript)
: repoPath_{repoPath}, store_{store}, stats_{std::move(stats)} {
: repoPath_{repoPath}, stats_{std::move(stats)} {
std::vector<string> cmd;
// importHelperScript takes precedence if it was specified; this is used
@ -324,16 +317,14 @@ void HgImporter::stopHelperProcess() {
#endif
}
unique_ptr<Blob> HgImporter::importFileContents(Hash blobHash) {
// Look up the mercurial path and file revision hash,
// which we need to import the data from mercurial
HgProxyHash hgInfo(store_, blobHash, "importFileContents");
XLOG(DBG5) << "requesting file contents of '" << hgInfo.path() << "', "
<< hgInfo.revHash().toString();
unique_ptr<Blob> HgImporter::importFileContents(
RelativePathPiece path,
Hash blobHash) {
XLOG(DBG5) << "requesting file contents of '" << path << "', "
<< blobHash.toString();
// Ask the import helper process for the file contents
auto requestID = sendFileRequest(hgInfo.path(), hgInfo.revHash());
auto requestID = sendFileRequest(path, blobHash);
// Read the response. The response body contains the file contents,
// which is exactly what we want to return.
@ -347,9 +338,9 @@ unique_ptr<Blob> HgImporter::importFileContents(Hash blobHash) {
"CMD_CAT_FILE response for blob ",
blobHash,
" (",
hgInfo.path(),
path,
", ",
hgInfo.revHash(),
blobHash,
") from hg_import_helper.py is too "
"short for body length field: length = ",
header.dataLength);
@ -377,9 +368,9 @@ unique_ptr<Blob> HgImporter::importFileContents(Hash blobHash) {
"inconsistent body length received when importing blob ",
blobHash,
" (",
hgInfo.path(),
path,
", ",
hgInfo.revHash(),
blobHash,
"): bodyLength=",
bodyLength,
" responseLength=",
@ -388,8 +379,8 @@ unique_ptr<Blob> HgImporter::importFileContents(Hash blobHash) {
throw std::runtime_error(std::move(msg));
}
XLOG(DBG4) << "imported blob " << blobHash << " (" << hgInfo.path() << ", "
<< hgInfo.revHash() << "); length=" << bodyLength;
XLOG(DBG4) << "imported blob " << blobHash << " (" << path << ", " << blobHash
<< "); length=" << bodyLength;
return make_unique<Blob>(blobHash, std::move(buf));
}
@ -722,11 +713,9 @@ const ImporterOptions& HgImporter::getOptions() const {
HgImporterManager::HgImporterManager(
AbsolutePathPiece repoPath,
LocalStore* store,
std::shared_ptr<HgImporterThreadStats> stats,
std::optional<AbsolutePath> importHelperScript)
: repoPath_{repoPath},
store_{store},
stats_{std::move(stats)},
importHelperScript_{importHelperScript} {}
@ -767,9 +756,11 @@ Hash HgImporterManager::resolveManifestNode(StringPiece revName) {
});
}
unique_ptr<Blob> HgImporterManager::importFileContents(Hash blobHash) {
return retryOnError([&](HgImporter* importer) {
return importer->importFileContents(blobHash);
unique_ptr<Blob> HgImporterManager::importFileContents(
RelativePathPiece path,
Hash blobHash) {
return retryOnError([=](HgImporter* importer) {
return importer->importFileContents(path, blobHash);
});
}
@ -789,8 +780,7 @@ void HgImporterManager::fetchTree(
HgImporter* HgImporterManager::getImporter() {
if (!importer_) {
importer_ =
make_unique<HgImporter>(repoPath_, store_, stats_, importHelperScript_);
importer_ = make_unique<HgImporter>(repoPath_, stats_, importHelperScript_);
}
return importer_.get();
}

View File

@ -17,7 +17,6 @@
#endif
#include "eden/fs/eden-config.h"
#include "eden/fs/store/LocalStore.h"
#include "eden/fs/telemetry/EdenStats.h"
#include "eden/fs/utils/PathFuncs.h"
@ -90,7 +89,9 @@ class Importer {
*
* Returns an Blob containing the file contents.
*/
virtual std::unique_ptr<Blob> importFileContents(Hash blobHash) = 0;
virtual std::unique_ptr<Blob> importFileContents(
RelativePathPiece path,
Hash blobHash) = 0;
virtual void prefetchFiles(
const std::vector<std::pair<RelativePath, Hash>>& files) = 0;
@ -122,14 +123,10 @@ class HgImporter : public Importer {
public:
/**
* Create a new HgImporter object that will import data from the specified
* repository into the given LocalStore.
*
* The caller is responsible for ensuring that the LocalStore object remains
* valid for the lifetime of the HgImporter object.
* repository.
*/
HgImporter(
AbsolutePathPiece repoPath,
LocalStore* store,
std::shared_ptr<HgImporterThreadStats>,
std::optional<AbsolutePath> importHelperScript = std::nullopt);
@ -140,7 +137,9 @@ class HgImporter : public Importer {
#endif
Hash resolveManifestNode(folly::StringPiece revName) override;
std::unique_ptr<Blob> importFileContents(Hash blobHash) override;
std::unique_ptr<Blob> importFileContents(
RelativePathPiece path,
Hash blobHash) override;
void prefetchFiles(
const std::vector<std::pair<RelativePath, Hash>>& files) override;
void fetchTree(RelativePathPiece path, Hash pathManifestNode) override;
@ -277,7 +276,6 @@ class HgImporter : public Importer {
facebook::eden::Subprocess helper_;
#endif
const AbsolutePath repoPath_;
LocalStore* const store_{nullptr};
std::shared_ptr<HgImporterThreadStats> const stats_;
ImporterOptions options_;
uint32_t nextRequestID_{0};
@ -318,13 +316,14 @@ class HgImporterManager : public Importer {
public:
HgImporterManager(
AbsolutePathPiece repoPath,
LocalStore* store,
std::shared_ptr<HgImporterThreadStats>,
std::optional<AbsolutePath> importHelperScript = std::nullopt);
Hash resolveManifestNode(folly::StringPiece revName) override;
std::unique_ptr<Blob> importFileContents(Hash blobHash) override;
std::unique_ptr<Blob> importFileContents(
RelativePathPiece path,
Hash blobHash) override;
void prefetchFiles(
const std::vector<std::pair<RelativePath, Hash>>& files) override;
void fetchTree(RelativePathPiece path, Hash pathManifestNode) override;
@ -339,7 +338,6 @@ class HgImporterManager : public Importer {
std::unique_ptr<HgImporter> importer_;
const AbsolutePath repoPath_;
LocalStore* const store_{nullptr};
std::shared_ptr<HgImporterThreadStats> const stats_;
const std::optional<AbsolutePath> importHelperScript_;
};

View File

@ -50,7 +50,6 @@ struct HgBackingStoreTest : TestRepo, ::testing::Test {
std::make_shared<MemoryLocalStore>()};
std::shared_ptr<EdenStats> stats{std::make_shared<EdenStats>()};
HgImporter importer{repo.path(),
localStore.get(),
getSharedHgImporterStatsForCurrentThread(stats)};
std::shared_ptr<HgBackingStore> backingStore{std::make_shared<HgBackingStore>(
repo.path(),

View File

@ -14,8 +14,6 @@
#include "eden/fs/model/Blob.h"
#include "eden/fs/model/Hash.h"
#include "eden/fs/model/Tree.h"
#include "eden/fs/store/LocalStore.h"
#include "eden/fs/store/MemoryLocalStore.h"
#include "eden/fs/store/hg/HgImportPyError.h"
#include "eden/fs/store/hg/HgImporter.h"
#include "eden/fs/telemetry/EdenStats.h"
@ -41,7 +39,6 @@ class HgImportTest : public ::testing::Test {
TemporaryDirectory testDir_{"eden_hg_import_test"};
AbsolutePath testPath_{testDir_.path().string()};
HgRepo repo_{testPath_ + "repo"_pc};
MemoryLocalStore localStore_;
std::shared_ptr<HgImporterThreadStats> stats_ =
std::make_shared<HgImporterThreadStats>();
};
@ -55,31 +52,37 @@ TEST_F(HgImportTest, importTest) {
// Set up the initial commit
repo_.mkdir("foo");
StringPiece barData = "this is a test file\n";
repo_.writeFile("foo/bar.txt", barData);
RelativePathPiece filePath{"foo/bar.txt"};
repo_.writeFile(filePath, barData);
repo_.hg("add");
auto commit1 = repo_.commit("Initial commit");
// Import the root tree
HgImporter importer(repo_.path(), &localStore_, stats_);
HgImporter importer(repo_.path(), stats_);
auto fileHash = repo_.hg("manifest", "--debug").substr(0, 40);
auto blob = importer.importFileContents(filePath, Hash{fileHash});
EXPECT_BLOB_EQ(blob, barData);
// Test importing objects that do not exist
Hash noSuchHash = makeTestHash("123");
EXPECT_THROW_RE(
importer.importFileContents(noSuchHash),
importer.importFileContents(filePath, noSuchHash),
std::exception,
"value not present in store");
"no match found");
EXPECT_THROW_RE(
importer.importFileContents(commit1),
importer.importFileContents(RelativePathPiece{"hello"}, commit1),
std::exception,
"value not present in store");
"no match found");
}
// TODO(T33797958): Check hg_importer_helper's exit code on Windows (in
// HgImportTest).
#ifndef _WIN32
TEST_F(HgImportTest, importerHelperExitsCleanly) {
HgImporter importer(repo_.path(), &localStore_, stats_);
HgImporter importer(repo_.path(), stats_);
auto status = importer.debugStopHelperProcess();
EXPECT_EQ(status.str(), "exited with status 0");
}

View File

@ -704,13 +704,20 @@ class HgServer(object):
try:
return attr_of(fctx)
except KeyError:
except KeyError as e:
# There is a race condition in Mercurial's repacking which may be
# triggered by debugedenimporthelper since we have multiple
# processes for importing files. So we will retry once for this
# type of error to avoid restarting the importer when this happens.
self.repo.ui.develwarn("Retrying due to possible race condition")
if not hasattr(self.repo.fileslog, "contentstore"):
# we are not using remotefilelog in tests, and normal filelog
# does not have a contentstore. So fail immediately
raise ResetRepoError(e)
self.repo.fileslog.contentstore.markforrefresh()
try:
return attr_of(fctx)
except Exception as e: