prjfs: make readdir asynchronous

Summary:
As of right now, opendir is the expensive callbacks due to fetching the sizes
for all the files in a directory. This strategy however breaks down when
timeouts are added as very large directories would trigger the timeout, not
because EdenFS is having a hard time reaching Mononoke, but because of
bandwidth limitation.

To avoid this issue, we need to have a per-file timeout and thus makes opendir
just trigger the futures, but not wait on them. The waiting bit will happen
at readdir time which will also enable having a timeout per file.

The one drawback to this is that the ObjectFetchContext that is passed in by
opendir cannot live long enough to be used in the size future. For now, we can
use a null context, but a proper context will need to be passed in, in the
future.

Reviewed By: wez

Differential Revision: D24895089

fbshipit-source-id: e10ceae2f7c49b4a006b15a34f85d06a2863ae3a
This commit is contained in:
Xavier Deguillard 2020-11-13 14:26:17 -08:00 committed by Facebook GitHub Bot
parent dca309c1e5
commit 8b82dc96cb
12 changed files with 199 additions and 201 deletions

View File

@ -475,102 +475,16 @@ EdenDispatcher::EdenDispatcher(EdenMount* mount)
mount_{mount},
dotEdenConfig_{makeDotEdenConfig(*mount)} {}
folly::Future<folly::Unit> EdenDispatcher::opendir(
folly::Future<std::vector<FileMetadata>> EdenDispatcher::opendir(
RelativePathPiece path,
const Guid guid,
ObjectFetchContext& context) {
return mount_->getInode(path, context)
.thenValue([&context](const InodePtr inode) {
.thenValue([this](const InodePtr inode) {
auto treePtr = inode.asTreePtr();
return treePtr->readdir(context);
})
.thenValue([this, guid = std::move(guid)](auto&& dirents) {
auto [iterator, inserted] =
enumSessions_.wlock()->emplace(guid, std::move(dirents));
XDCHECK(inserted);
return folly::unit;
return treePtr->readdir();
});
}
void EdenDispatcher::closedir(const Guid& guid) {
auto erasedCount = enumSessions_.wlock()->erase(guid);
XDCHECK(erasedCount == 1);
}
HRESULT EdenDispatcher::getEnumerationData(
const PRJ_CALLBACK_DATA& callbackData,
const GUID& enumerationId,
PCWSTR searchExpression,
PRJ_DIR_ENTRY_BUFFER_HANDLE bufferHandle) noexcept {
try {
auto guid = Guid(enumerationId);
auto lockedSessions = enumSessions_.rlock();
auto sessionIterator = lockedSessions->find(guid);
if (sessionIterator == lockedSessions->end()) {
XLOG(DBG5) << "Enum instance not found: "
<< RelativePath(callbackData.FilePathName);
return HRESULT_FROM_WIN32(ERROR_INVALID_PARAMETER);
}
auto shouldRestart =
bool(callbackData.Flags & PRJ_CB_DATA_FLAG_ENUM_RESTART_SCAN);
// We won't ever get concurrent callbacks for a given enumeration, it is
// therefore safe to modify the session here even though we do not hold an
// exclusive lock to it.
auto& session = const_cast<Enumerator&>(sessionIterator->second);
if (session.isSearchExpressionEmpty() || shouldRestart) {
if (searchExpression != nullptr) {
session.saveExpression(searchExpression);
} else {
session.saveExpression(L"*");
}
}
if (shouldRestart) {
session.restart();
}
// Traverse the list enumeration list and fill the remaining entry. Start
// from where the last call left off.
bool added = false;
for (const FileMetadata* entry; (entry = session.current());
session.advance()) {
auto fileInfo = PRJ_FILE_BASIC_INFO();
fileInfo.IsDirectory = entry->isDirectory;
fileInfo.FileSize = entry->size;
XLOGF(
DBG6,
"Enum {} {} size= {}",
PathComponent(entry->name),
fileInfo.IsDirectory ? "Dir" : "File",
fileInfo.FileSize);
auto result =
PrjFillDirEntryBuffer(entry->name.c_str(), &fileInfo, bufferHandle);
if (result != S_OK) {
if (result == HRESULT_FROM_WIN32(ERROR_INSUFFICIENT_BUFFER) && added) {
// We are out of buffer space. This entry didn't make it. Return
// without increment.
break;
} else {
return result;
}
}
added = true;
}
return S_OK;
} catch (const std::exception& ex) {
return exceptionToHResult(ex);
}
}
folly::Future<std::optional<InodeMetadata>> EdenDispatcher::lookup(
RelativePath path,
ObjectFetchContext& context) {

View File

@ -11,14 +11,7 @@
#ifndef _WIN32
#include "eden/fs/fuse/Dispatcher.h"
#else
#include "folly/portability/Windows.h"
#include <ProjectedFSLib.h> // @manual
#include "eden/fs/prjfs/Dispatcher.h"
#include "eden/fs/prjfs/Enumerator.h"
#include "eden/fs/utils/Guid.h"
#include "folly/Synchronized.h"
#include "folly/container/F14Map.h"
#endif
namespace facebook {
@ -116,19 +109,10 @@ class EdenDispatcher : public Dispatcher {
override;
folly::Future<std::vector<std::string>> listxattr(InodeNumber ino) override;
#else
folly::Future<folly::Unit> opendir(
folly::Future<std::vector<FileMetadata>> opendir(
RelativePathPiece path,
const Guid guid,
ObjectFetchContext& context) override;
void closedir(const Guid& guid) override;
HRESULT getEnumerationData(
const PRJ_CALLBACK_DATA& callbackData,
const GUID& enumerationId,
PCWSTR searchExpression,
PRJ_DIR_ENTRY_BUFFER_HANDLE dirEntryBufferHandle) noexcept override;
folly::Future<std::optional<InodeMetadata>> lookup(
RelativePath path,
ObjectFetchContext& context) override;
@ -196,9 +180,6 @@ class EdenDispatcher : public Dispatcher {
// mount_ first.
InodeMap* const inodeMap_;
#else
// Set of currently active directory enumerations.
folly::Synchronized<folly::F14FastMap<Guid, Enumerator>> enumSessions_;
const std::string dotEdenConfig_;
#endif
};

View File

@ -1909,42 +1909,37 @@ TreeInode::readdir(DirList&& list, off_t off, ObjectFetchContext& context) {
#else
folly::Future<std::vector<FileMetadata>> TreeInode::readdir(
ObjectFetchContext& context) {
vector<Future<FileMetadata>> futures;
{
auto dir = contents_.rlock();
auto& entries = dir->entries;
futures.reserve(entries.size());
std::vector<FileMetadata> TreeInode::readdir() {
vector<FileMetadata> ret;
for (auto& [name, entry] : entries) {
auto isDir = entry.getDtype() == dtype_t::Dir;
auto winName = name.wide();
auto dir = contents_.rlock();
auto& entries = dir->entries;
ret.reserve(entries.size());
if (!isDir) {
// We only populates the file size for non-materialized files. For
// the materialized files, ProjectedFS will use the on-disk size.
auto hash = entry.getOptionalHash();
if (hash.has_value()) {
futures.emplace_back(
getMount()
->getObjectStore()
->getBlobSize(hash.value(), context)
.thenValue(
[winName = std::move(winName)](uint64_t size) mutable {
return FileMetadata(std::move(winName), false, size);
}));
continue;
}
for (auto& [name, entry] : entries) {
auto isDir = entry.getDtype() == dtype_t::Dir;
auto winName = name.wide();
if (!isDir) {
// We only populates the file size for non-materialized files. For
// the materialized files, ProjectedFS will use the on-disk size.
auto hash = entry.getOptionalHash();
if (hash.has_value()) {
static ObjectFetchContext* context =
ObjectFetchContext::getNullContextWithCauseDetail(
"TreeInode::readdir");
ret.emplace_back(
std::move(winName),
false,
getMount()->getObjectStore()->getBlobSize(hash.value(), *context));
continue;
}
futures.emplace_back(FileMetadata(std::move(winName), isDir, 0));
}
// We can release the content lock here.
ret.emplace_back(std::move(winName), isDir, folly::Future<size_t>(0));
}
return folly::collect(std::move(futures))
.via(getMount()->getServerState()->getThreadPool().get());
return ret;
}
#endif // _WIN32

View File

@ -158,8 +158,7 @@ class TreeInode final : public InodeBaseMetadata<DirContents> {
* way to get it, so in this function as an optimization we don't populate the
* size of materialized files.
*/
FOLLY_NODISCARD folly::Future<std::vector<FileMetadata>> readdir(
ObjectFetchContext& context);
FOLLY_NODISCARD std::vector<FileMetadata> readdir();
#endif
const folly::Synchronized<TreeInodeState>& getContents() const {

View File

@ -89,7 +89,7 @@ TEST(TreeInode, readdirTest) {
TestMount mount{builder};
auto root = mount.getEdenMount()->getRootInode();
auto result = root->readdir(ObjectFetchContext::getNullContext()).get(0ms);
auto result = root->readdir();
ASSERT_EQ(2, result.size());
EXPECT_EQ(L".eden", result[0].name);
@ -105,7 +105,7 @@ TEST(TreeInode, updateAndReaddir) {
// Test creating a new file
auto somedir = mount.getTreeInode("somedir"_relpath);
auto result = somedir->readdir(ObjectFetchContext::getNullContext()).get(0ms);
auto result = somedir->readdir();
ASSERT_EQ(3, result.size());
EXPECT_EQ(L"file1", result[0].name);
@ -114,7 +114,7 @@ TEST(TreeInode, updateAndReaddir) {
auto resultInode =
somedir->mknod("newfile.txt"_pc, S_IFREG, 0, InvalidationRequired::No);
result = somedir->readdir(ObjectFetchContext::getNullContext()).get(0ms);
result = somedir->readdir();
ASSERT_EQ(4, result.size());
EXPECT_EQ(L"file1", result[0].name);
EXPECT_EQ(L"file2", result[1].name);
@ -122,7 +122,7 @@ TEST(TreeInode, updateAndReaddir) {
EXPECT_EQ(L"newfile.txt", result[3].name);
somedir->unlink("file2"_pc, InvalidationRequired::No).get(0ms);
result = somedir->readdir(ObjectFetchContext::getNullContext()).get(0ms);
result = somedir->readdir();
ASSERT_EQ(3, result.size());
EXPECT_EQ(L"file1", result[0].name);
EXPECT_EQ(L"file3", result[1].name);
@ -132,7 +132,7 @@ TEST(TreeInode, updateAndReaddir) {
->rename(
"file3"_pc, somedir, "renamedfile.txt"_pc, InvalidationRequired::No)
.get(0ms);
result = somedir->readdir(ObjectFetchContext::getNullContext()).get(0ms);
result = somedir->readdir();
ASSERT_EQ(3, result.size());
EXPECT_EQ(L"file1", result[0].name);
EXPECT_EQ(L"newfile.txt", result[1].name);

View File

@ -10,6 +10,7 @@
#include "folly/portability/Windows.h"
#include <ProjectedFSLib.h> // @manual
#include "eden/fs/prjfs/Enumerator.h"
#include "eden/fs/utils/Guid.h"
#include "eden/fs/utils/PathFuncs.h"
@ -42,28 +43,10 @@ class Dispatcher {
/**
* Open a directory
*/
virtual folly::Future<folly::Unit> opendir(
virtual folly::Future<std::vector<FileMetadata>> opendir(
RelativePathPiece path,
const Guid guid,
ObjectFetchContext& context) = 0;
/**
* Close a directory
*/
virtual void closedir(const Guid& guid) = 0;
/**
* Read a directory
*
* @param dirEntryBufferHandle output buffer where the directory entry will be
* written to.
*/
virtual HRESULT getEnumerationData(
const PRJ_CALLBACK_DATA& callbackData,
const GUID& enumerationId,
PCWSTR searchExpression,
PRJ_DIR_ENTRY_BUFFER_HANDLE dirEntryBufferHandle) noexcept = 0;
/**
* Lookup the specified file and get its attributes.
*/

View File

@ -26,7 +26,7 @@ Enumerator::Enumerator(std::vector<FileMetadata>&& entryList)
});
}
const FileMetadata* Enumerator::current() {
FileMetadata* Enumerator::current() {
for (; listIndex_ < metadataList_.size(); listIndex_++) {
if (PrjFileNameMatch(
metadataList_[listIndex_].name.c_str(),

View File

@ -7,9 +7,11 @@
#pragma once
#include <optional>
#include <string>
#include <vector>
#include "eden/fs/model/Hash.h"
#include "folly/futures/Future.h"
namespace facebook {
namespace eden {
@ -26,15 +28,30 @@ struct FileMetadata {
//
bool isDirectory{false};
//
// File size. For directories it will ignored
//
size_t size{0};
folly::Future<size_t> getSize() {
if (cachedSize_) {
return *cachedSize_;
}
FileMetadata(std::wstring&& name, bool isDir, size_t size)
: name(std::move(name)), isDirectory(isDir), size(size) {}
return std::move(sizeFuture_).thenValue([this](size_t size) {
cachedSize_ = size;
return size;
});
}
FileMetadata() = delete;
FileMetadata(
std::wstring&& name,
bool isDir,
folly::Future<size_t> sizeFuture)
: name(std::move(name)),
isDirectory(isDir),
sizeFuture_(std::move(sizeFuture)) {}
FileMetadata() = default;
private:
folly::Future<size_t> sizeFuture_;
std::optional<size_t> cachedSize_;
};
class Enumerator {
@ -51,7 +68,7 @@ class Enumerator {
explicit Enumerator() = delete;
const FileMetadata* current();
FileMetadata* current();
void advance() {
++listIndex_;

View File

@ -20,20 +20,7 @@
namespace {
using facebook::eden::ChannelThreadStats;
using facebook::eden::Dispatcher;
using facebook::eden::exceptionToHResult;
using facebook::eden::Guid;
using facebook::eden::InodeMetadata;
using facebook::eden::makeHResultErrorExplicit;
using facebook::eden::ObjectFetchContext;
using facebook::eden::PrjfsChannel;
using facebook::eden::PrjfsRequestContext;
using facebook::eden::RelativePath;
using facebook::eden::RelativePathPiece;
using facebook::eden::RequestMetricsScope;
using facebook::eden::wideToMultibyteString;
using facebook::eden::win32ErrorToString;
using namespace facebook::eden;
#define BAIL_ON_RECURSIVE_CALL(callbackData) \
do { \
@ -76,8 +63,13 @@ HRESULT startEnumeration(
FB_LOGF(
channel->getStraceLogger(), DBG7, "opendir({}, guid={})", path, guid);
return dispatcher->opendir(path, std::move(guid), *context)
.thenValue([context](auto&&) { context->sendSuccess(); });
return dispatcher->opendir(path, *context)
.thenValue(
[context, guid = std::move(guid), channel](auto&& dirents) {
channel->addDirectoryEnumeration(
std::move(guid), std::move(dirents));
context->sendSuccess();
});
});
context->catchErrors(std::move(fut)).ensure([context] {});
@ -97,7 +89,8 @@ HRESULT endEnumeration(
auto guid = Guid(*enumerationId);
auto* channel = getChannel(callbackData);
FB_LOGF(channel->getStraceLogger(), DBG7, "closedir({})", guid);
channel->getDispatcher()->closedir(guid);
channel->removeDirectoryEnumeration(guid);
return S_OK;
} catch (const std::exception& ex) {
@ -111,17 +104,97 @@ HRESULT getEnumerationData(
PCWSTR searchExpression,
PRJ_DIR_ENTRY_BUFFER_HANDLE dirEntryBufferHandle) noexcept {
BAIL_ON_RECURSIVE_CALL(callbackData);
auto* channel = getChannel(callbackData);
FB_LOGF(
channel->getStraceLogger(),
DBG7,
"readdir({}, searchExpression={})",
Guid{*enumerationId},
searchExpression == nullptr
? "<nullptr>"
: wideToMultibyteString<std::string>(searchExpression));
return channel->getDispatcher()->getEnumerationData(
*callbackData, *enumerationId, searchExpression, dirEntryBufferHandle);
try {
auto guid = Guid(*enumerationId);
auto* channel = getChannel(callbackData);
FB_LOGF(
channel->getStraceLogger(),
DBG7,
"readdir({}, searchExpression={})",
guid,
searchExpression == nullptr
? "<nullptr>"
: wideToMultibyteString<std::string>(searchExpression));
auto optEnumerator = channel->findDirectoryEnumeration(guid);
if (!optEnumerator.has_value()) {
XLOG(DBG5) << "Directory enumeration not found: " << guid;
return HRESULT_FROM_WIN32(ERROR_INVALID_PARAMETER);
}
auto enumerator = std::move(optEnumerator).value();
auto shouldRestart =
bool(callbackData->Flags & PRJ_CB_DATA_FLAG_ENUM_RESTART_SCAN);
if (enumerator->isSearchExpressionEmpty() || shouldRestart) {
if (searchExpression != nullptr) {
enumerator->saveExpression(searchExpression);
} else {
enumerator->saveExpression(L"*");
}
}
if (shouldRestart) {
enumerator->restart();
}
auto context =
std::make_shared<PrjfsRequestContext>(channel, *callbackData);
auto fut = folly::makeFutureWith([context,
dispatcher = channel->getDispatcher(),
enumerator = std::move(enumerator),
buffer = dirEntryBufferHandle] {
auto requestWatch =
std::shared_ptr<RequestMetricsScope::LockedRequestWatchList>(nullptr);
auto histogram = &ChannelThreadStats::readDir;
context->startRequest(dispatcher->getStats(), histogram, requestWatch);
bool added = false;
for (FileMetadata* entry; (entry = enumerator->current());
enumerator->advance()) {
auto fileInfo = PRJ_FILE_BASIC_INFO();
fileInfo.IsDirectory = entry->isDirectory;
fileInfo.FileSize = entry->getSize().get();
XLOGF(
DBG6,
"Directory entry: {}, {}, size={}",
fileInfo.IsDirectory ? "Dir" : "File",
PathComponent(entry->name),
fileInfo.FileSize);
auto result =
PrjFillDirEntryBuffer(entry->name.c_str(), &fileInfo, buffer);
if (FAILED(result)) {
if (result == HRESULT_FROM_WIN32(ERROR_INSUFFICIENT_BUFFER) &&
added) {
// We are out of buffer space. This entry didn't make it. Return
// without increment.
break;
} else {
return folly::makeFuture<folly::Unit>(makeHResultErrorExplicit(
result,
fmt::format(
FMT_STRING("Adding directory entry {}"),
PathComponent(entry->name))));
}
}
added = true;
}
context->sendEnumerationSuccess(buffer);
return folly::makeFuture(folly::unit);
});
context->catchErrors(std::move(fut)).ensure([context] {});
return HRESULT_FROM_WIN32(ERROR_IO_PENDING);
} catch (const std::exception& ex) {
return exceptionToHResult(ex);
}
}
HRESULT getPlaceholderInfo(const PRJ_CALLBACK_DATA* callbackData) noexcept {

View File

@ -10,6 +10,7 @@
#include "folly/portability/Windows.h"
#include <ProjectedFSLib.h> // @manual
#include "eden/fs/prjfs/Enumerator.h"
#include "eden/fs/utils/Guid.h"
#include "eden/fs/utils/PathFuncs.h"
#include "eden/fs/utils/ProcessAccessLog.h"
@ -83,6 +84,29 @@ class PrjfsChannel {
void sendError(int32_t commandId, HRESULT error);
void addDirectoryEnumeration(Guid guid, std::vector<FileMetadata> dirents) {
auto [iterator, inserted] = enumSessions_.wlock()->emplace(
std::move(guid), std::make_shared<Enumerator>(std::move(dirents)));
XDCHECK(inserted);
}
std::optional<std::shared_ptr<Enumerator>> findDirectoryEnumeration(
Guid& guid) {
auto enumerators = enumSessions_.rlock();
auto it = enumerators->find(guid);
if (it == enumerators->end()) {
return std::nullopt;
}
return it->second;
}
void removeDirectoryEnumeration(Guid& guid) {
auto erasedCount = enumSessions_.wlock()->erase(guid);
XDCHECK(erasedCount == 1);
}
private:
//
// Channel to talk to projectedFS.
@ -98,6 +122,10 @@ class PrjfsChannel {
folly::Promise<StopData> stopPromise_;
ProcessAccessLog processAccessLog_;
// Set of currently active directory enumerations.
folly::Synchronized<folly::F14FastMap<Guid, std::shared_ptr<Enumerator>>>
enumSessions_;
};
} // namespace eden

View File

@ -57,6 +57,13 @@ class PrjfsRequestContext : public RequestContext {
return channel_->sendSuccess(commandId_, &extra);
}
void sendEnumerationSuccess(PRJ_DIR_ENTRY_BUFFER_HANDLE buffer) const {
PRJ_COMPLETE_COMMAND_EXTENDED_PARAMETERS extra{};
extra.CommandType = PRJ_COMPLETE_COMMAND_TYPE_ENUMERATION;
extra.Enumeration.DirEntryBufferHandle = buffer;
return channel_->sendSuccess(commandId_, &extra);
}
void sendError(HRESULT result) const {
return channel_->sendError(commandId_, result);
}

View File

@ -156,6 +156,7 @@ class ChannelThreadStats : public EdenThreadStatsBase {
Histogram preSetHardlink{createHistogram("prjfs.preSetHardlink_us")};
Histogram openDir{createHistogram("prjfs.opendir_us")};
Histogram readDir{createHistogram("prjfs.readdir_us")};
Histogram lookup{createHistogram("prjfs.lookup_us")};
Histogram access{createHistogram("prjfs.access_us")};
Histogram read{createHistogram("prjfs.read_us")};