mirror of
https://github.com/facebook/sapling.git
synced 2024-10-05 14:28:17 +03:00
prjfs: move notification handling to the background
Summary: Now that notifications are running in a serial executor, and are also issuing disk IO, notifications are significantly slower than they used to be. While writing to the working copy is an overall anti-pattern, some workflows (like Unity) do and it's thus critical that their performance isn't affected negatively. In order to solve this, we can simply move the handling of notifications to the background and answer the notification immediately, since notifications can no longer fail, we shouldn't need to send an error back to ProjectedFS which would anyway be ignored. The drawback is of course that applications are no longer blocked while the notification is being processed in EdenFS, and thus any operation that needs to get a synced up inode hierarchy will need to wait on all the pending notifications. Reviewed By: chadaustin, genevievehelsel Differential Revision: D32480993 fbshipit-source-id: 7ad6b07f540f7d9a52a35a0ff3b94911ef5267af
This commit is contained in:
parent
d557015f33
commit
4308f92515
@ -1195,6 +1195,15 @@ folly::Future<InodePtr> EdenMount::resolveSymlinkImpl(
|
||||
}
|
||||
#endif
|
||||
|
||||
ImmediateFuture<folly::Unit> EdenMount::waitForPendingNotifications() const {
|
||||
#ifdef _WIN32
|
||||
if (auto* channel = getPrjfsChannel()) {
|
||||
return channel->waitForPendingNotifications();
|
||||
}
|
||||
#endif
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
folly::Future<CheckoutResult> EdenMount::checkout(
|
||||
const RootId& snapshotHash,
|
||||
std::optional<pid_t> clientPid,
|
||||
@ -1249,6 +1258,15 @@ folly::Future<CheckoutResult> EdenMount::checkout(
|
||||
objectStore_->getRootTree(snapshotHash, ctx->getFetchContext());
|
||||
return collectSafe(fromTreeFuture, toTreeFuture);
|
||||
})
|
||||
.thenValue(
|
||||
[this](std::tuple<shared_ptr<const Tree>, shared_ptr<const Tree>>
|
||||
treeResults) {
|
||||
return waitForPendingNotifications()
|
||||
.thenValue([treeResults = std::move(treeResults)](auto&&) {
|
||||
return treeResults;
|
||||
})
|
||||
.semi();
|
||||
})
|
||||
.thenValue([this, ctx, checkoutTimes, stopWatch, journalDiffCallback](
|
||||
std::tuple<shared_ptr<const Tree>, shared_ptr<const Tree>>
|
||||
treeResults) {
|
||||
@ -1452,6 +1470,12 @@ Future<Unit> EdenMount::diff(DiffContext* ctxPtr, const RootId& commitHash)
|
||||
const {
|
||||
auto rootInode = getRootInode();
|
||||
return objectStore_->getRootTree(commitHash, ctxPtr->getFetchContext())
|
||||
.thenValue([this](std::shared_ptr<const Tree> rootTree) {
|
||||
return waitForPendingNotifications()
|
||||
.thenValue(
|
||||
[rootTree = std::move(rootTree)](auto&&) { return rootTree; })
|
||||
.semi();
|
||||
})
|
||||
.thenValue([ctxPtr, rootInode = std::move(rootInode)](
|
||||
std::shared_ptr<const Tree>&& rootTree) {
|
||||
return rootInode->diff(
|
||||
@ -1852,8 +1876,8 @@ folly::Promise<folly::Unit>& EdenMount::beginMount() {
|
||||
// then release the lock. This is safe for two reasons:
|
||||
//
|
||||
// * *channelMountPromise will never be destructed (e.g. by calling
|
||||
// std::optional<>::reset()) or reassigned. (channelMountPromise never goes
|
||||
// from `has_value() == true` to `has_value() == false`.)
|
||||
// std::optional<>::reset()) or reassigned. (channelMountPromise never
|
||||
// goes from `has_value() == true` to `has_value() == false`.)
|
||||
//
|
||||
// * folly::Promise is self-synchronizing; getFuture() can be called
|
||||
// concurrently with setValue()/setException().
|
||||
|
@ -317,6 +317,19 @@ class EdenMount : public std::enable_shared_from_this<EdenMount> {
|
||||
Nfsd3* FOLLY_NULLABLE getNfsdChannel() const;
|
||||
#endif
|
||||
|
||||
/**
|
||||
* Wait for all inflight notifications to complete.
|
||||
*
|
||||
* On Windows, inflight notifications are processed asynchronously and thus
|
||||
* the on-disk state of the the repository may differ from the inode state.
|
||||
* This ensures that all pending notifications have completed.
|
||||
*
|
||||
* On macOS and Linux, this immediately return.
|
||||
*
|
||||
* This can be called from any thread/executor.
|
||||
*/
|
||||
ImmediateFuture<folly::Unit> waitForPendingNotifications() const;
|
||||
|
||||
/**
|
||||
* Test if the working copy persist on disk after this mount will be
|
||||
* destroyed.
|
||||
|
@ -59,7 +59,9 @@ std::string makeDotEdenConfig(EdenMount& mount) {
|
||||
PrjfsDispatcherImpl::PrjfsDispatcherImpl(EdenMount* mount)
|
||||
: PrjfsDispatcher(mount->getStats()),
|
||||
mount_{mount},
|
||||
notificationExecutor_{folly::SerialExecutor::create()},
|
||||
executor_{1, "PrjfsDispatcher"},
|
||||
notificationExecutor_{
|
||||
folly::SerialExecutor::create(folly::getKeepAliveToken(&executor_))},
|
||||
dotEdenConfig_{makeDotEdenConfig(*mount)} {}
|
||||
|
||||
ImmediateFuture<std::vector<PrjfsDirEntry>> PrjfsDispatcherImpl::opendir(
|
||||
@ -472,28 +474,27 @@ ImmediateFuture<folly::Unit> fileNotification(
|
||||
RelativePath path,
|
||||
folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
|
||||
ObjectFetchContext& context) {
|
||||
return ImmediateFuture{
|
||||
folly::via(
|
||||
executor,
|
||||
[&mount, path = std::move(path), &context]() mutable {
|
||||
return getOnDiskState(mount, path)
|
||||
.thenValue([&mount, path = std::move(path), &context](
|
||||
OnDiskState state) mutable {
|
||||
switch (state) {
|
||||
case OnDiskState::MaterializedFile:
|
||||
return handleMaterializedFileNotification(
|
||||
mount, std::move(path), InodeType::File, context);
|
||||
case OnDiskState::MaterializedDirectory:
|
||||
return handleMaterializedFileNotification(
|
||||
mount, std::move(path), InodeType::Tree, context);
|
||||
case OnDiskState::NotPresent:
|
||||
return handleNotPresentFileNotification(
|
||||
mount, std::move(path), context);
|
||||
}
|
||||
})
|
||||
.semi();
|
||||
})
|
||||
.semi()};
|
||||
folly::via(executor, [&mount, path, &context]() mutable {
|
||||
return getOnDiskState(mount, path)
|
||||
.thenValue([&mount, path = std::move(path), &context](
|
||||
OnDiskState state) mutable {
|
||||
switch (state) {
|
||||
case OnDiskState::MaterializedFile:
|
||||
return handleMaterializedFileNotification(
|
||||
mount, std::move(path), InodeType::File, context);
|
||||
case OnDiskState::MaterializedDirectory:
|
||||
return handleMaterializedFileNotification(
|
||||
mount, std::move(path), InodeType::Tree, context);
|
||||
case OnDiskState::NotPresent:
|
||||
return handleNotPresentFileNotification(
|
||||
mount, std::move(path), context);
|
||||
}
|
||||
})
|
||||
.get();
|
||||
}).thenError([path](const folly::exception_wrapper& ew) {
|
||||
XLOG(ERR) << "While handling notification on: " << path << ": " << ew;
|
||||
});
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
@ -544,6 +545,21 @@ ImmediateFuture<folly::Unit> PrjfsDispatcherImpl::dirDeleted(
|
||||
return fileNotification(*mount_, path, notificationExecutor_, context);
|
||||
}
|
||||
|
||||
ImmediateFuture<folly::Unit>
|
||||
PrjfsDispatcherImpl::waitForPendingNotifications() {
|
||||
// Since the executor is a SequencedExecutor, and the fileNotification
|
||||
// function blocks in the executor, the body of the lambda will only be
|
||||
// executed when all previously enqueued notifications have completed.
|
||||
//
|
||||
// Note that this synchronization only guarantees that writes from a the
|
||||
// calling application thread have completed when the future complete. Writes
|
||||
// made by a concurrent process or a different thread may still be in
|
||||
// ProjectedFS queue and therefore may still be pending when the future
|
||||
// complete. This is expected and therefore not a bug.
|
||||
return ImmediateFuture{
|
||||
folly::via(notificationExecutor_, []() { return folly::unit; }).semi()};
|
||||
}
|
||||
|
||||
} // namespace facebook::eden
|
||||
|
||||
#endif
|
||||
|
@ -9,6 +9,7 @@
|
||||
|
||||
#include <folly/executors/SequencedExecutor.h>
|
||||
#include "eden/fs/prjfs/PrjfsDispatcher.h"
|
||||
#include "eden/fs/utils/UnboundedQueueExecutor.h"
|
||||
|
||||
namespace facebook::eden {
|
||||
|
||||
@ -58,11 +59,16 @@ class PrjfsDispatcherImpl : public PrjfsDispatcher {
|
||||
RelativePath oldPath,
|
||||
ObjectFetchContext& context) override;
|
||||
|
||||
ImmediateFuture<folly::Unit> waitForPendingNotifications() override;
|
||||
|
||||
private:
|
||||
// The EdenMount associated with this dispatcher.
|
||||
EdenMount* const mount_;
|
||||
|
||||
// All the notifications are dispatched to this executor.
|
||||
UnboundedQueueExecutor executor_;
|
||||
// All the notifications are dispatched to this executor. The
|
||||
// waitForPendingNotifications implementation depends on this being a
|
||||
// SequencedExecutor.
|
||||
folly::Executor::KeepAlive<folly::SequencedExecutor> notificationExecutor_;
|
||||
|
||||
const std::string dotEdenConfig_;
|
||||
|
@ -22,6 +22,8 @@
|
||||
|
||||
namespace facebook::eden {
|
||||
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
namespace {
|
||||
|
||||
detail::RcuLockedPtr getChannel(
|
||||
@ -222,6 +224,10 @@ PrjfsChannelInner::PrjfsChannelInner(
|
||||
straceLogger_(straceLogger),
|
||||
processAccessLog_(processAccessLog) {}
|
||||
|
||||
ImmediateFuture<folly::Unit> PrjfsChannelInner::waitForPendingNotifications() {
|
||||
return dispatcher_->waitForPendingNotifications();
|
||||
}
|
||||
|
||||
HRESULT PrjfsChannelInner::startEnumeration(
|
||||
std::shared_ptr<PrjfsRequestContext> context,
|
||||
const PRJ_CALLBACK_DATA* callbackData,
|
||||
@ -866,29 +872,21 @@ HRESULT PrjfsChannelInner::notification(
|
||||
return HRESULT_FROM_WIN32(ERROR_ACCESS_DENIED);
|
||||
}
|
||||
|
||||
auto fut = makeImmediateFutureWith([this,
|
||||
context,
|
||||
stat = stat,
|
||||
handler = handler,
|
||||
renderer = renderer,
|
||||
relPath = std::move(relPath),
|
||||
destPath = std::move(destPath),
|
||||
isDirectory]() mutable {
|
||||
auto requestWatch =
|
||||
std::shared_ptr<RequestMetricsScope::LockedRequestWatchList>(nullptr);
|
||||
context->startRequest(dispatcher_->getStats(), stat, requestWatch);
|
||||
auto requestWatch =
|
||||
std::shared_ptr<RequestMetricsScope::LockedRequestWatchList>(nullptr);
|
||||
context->startRequest(dispatcher_->getStats(), stat, requestWatch);
|
||||
|
||||
FB_LOG(getStraceLogger(), DBG7, renderer(relPath, destPath, isDirectory));
|
||||
return (this->*handler)(
|
||||
std::move(relPath), std::move(destPath), isDirectory, *context)
|
||||
.thenValue([context = std::move(context)](auto&&) {
|
||||
context->sendNotificationSuccess();
|
||||
});
|
||||
});
|
||||
FB_LOG(getStraceLogger(), DBG7, renderer(relPath, destPath, isDirectory));
|
||||
auto fut = (this->*handler)(
|
||||
std::move(relPath), std::move(destPath), isDirectory, *context);
|
||||
|
||||
detachAndCompleteCallback(std::move(fut), std::move(context));
|
||||
SCOPE_EXIT {
|
||||
context->finishRequest();
|
||||
};
|
||||
|
||||
return HRESULT_FROM_WIN32(ERROR_IO_PENDING);
|
||||
// Since the future should just be enqueing to an executor, it should
|
||||
// always be ready.
|
||||
return tryToHResult(std::move(fut).getTry(0ms));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1010,6 +1008,12 @@ void PrjfsChannel::start(bool readOnly, bool useNegativePathCaching) {
|
||||
XLOG(INFO) << "Started PrjfsChannel for: " << mountPath_;
|
||||
}
|
||||
|
||||
ImmediateFuture<folly::Unit> PrjfsChannel::waitForPendingNotifications() {
|
||||
auto inner = getInner();
|
||||
return inner->waitForPendingNotifications().ensure(
|
||||
[inner = std::move(inner)] {});
|
||||
}
|
||||
|
||||
folly::SemiFuture<folly::Unit> PrjfsChannel::stop() {
|
||||
XLOG(INFO) << "Stopping PrjfsChannel for: " << mountPath_;
|
||||
XCHECK(!stopPromise_.isFulfilled());
|
||||
|
@ -44,6 +44,8 @@ class PrjfsChannelInner {
|
||||
PrjfsChannelInner(const PrjfsChannelInner&) = delete;
|
||||
PrjfsChannelInner& operator=(const PrjfsChannelInner&) = delete;
|
||||
|
||||
ImmediateFuture<folly::Unit> waitForPendingNotifications();
|
||||
|
||||
/**
|
||||
* Start a directory listing.
|
||||
*
|
||||
@ -260,6 +262,23 @@ class PrjfsChannel {
|
||||
|
||||
void start(bool readOnly, bool useNegativePathCaching);
|
||||
|
||||
/**
|
||||
* Wait for all the received notifications to be fully handled.
|
||||
*
|
||||
* The PrjfsChannel will receive notifications and immediately dispatch the
|
||||
* work to a background executor and return S_OK to ProjectedFS to unblock
|
||||
* applications writing to the EdenFS repository.
|
||||
*
|
||||
* Thus an application writing to the repository may have their file creation
|
||||
* be successful prior to EdenFS having updated its inode hierarchy. This
|
||||
* discrepancy can cause issues in EdenFS for operations that exclusively
|
||||
* look at the inode hierarchy such as status/checkout/glob.
|
||||
*
|
||||
* The returned ImmediateFuture will complete when all the previously
|
||||
* received notifications have completed.
|
||||
*/
|
||||
ImmediateFuture<folly::Unit> waitForPendingNotifications();
|
||||
|
||||
/**
|
||||
* Stop the PrjfsChannel.
|
||||
*
|
||||
|
@ -118,6 +118,11 @@ class PrjfsDispatcher {
|
||||
RelativePath relPath,
|
||||
ObjectFetchContext& context) = 0;
|
||||
|
||||
/**
|
||||
* Wait for all received notifications to complete.
|
||||
*/
|
||||
virtual ImmediateFuture<folly::Unit> waitForPendingNotifications() = 0;
|
||||
|
||||
private:
|
||||
EdenStats* stats_{nullptr};
|
||||
};
|
||||
|
@ -271,16 +271,6 @@ Future<ReturnType> wrapFuture(
|
||||
return std::move(f).ensure([logHelper = std::move(logHelper)]() {});
|
||||
}
|
||||
|
||||
template <typename ReturnType>
|
||||
folly::SemiFuture<ReturnType> wrapSemiFuture(
|
||||
std::unique_ptr<ThriftLogHelper> logHelper,
|
||||
folly::SemiFuture<ReturnType>&& f) {
|
||||
return std::move(f).defer(
|
||||
[logHelper = std::move(logHelper)](folly::Try<ReturnType>&& ret) {
|
||||
return std::forward<folly::Try<ReturnType>>(ret);
|
||||
});
|
||||
}
|
||||
|
||||
template <typename ReturnType>
|
||||
ImmediateFuture<ReturnType> wrapImmediateFuture(
|
||||
std::unique_ptr<ThriftLogHelper> logHelper,
|
||||
@ -534,19 +524,31 @@ std::chrono::seconds getSyncTimeout(const SyncBehavior& sync) {
|
||||
auto seconds = sync.syncTimeoutSeconds_ref().value_or(60);
|
||||
return std::chrono::seconds{seconds};
|
||||
}
|
||||
|
||||
ImmediateFuture<folly::Unit> waitForPendingNotifications(
|
||||
const EdenMount& mount,
|
||||
std::chrono::seconds timeout) {
|
||||
if (timeout.count() == 0) {
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
return mount.waitForPendingNotifications().semi().within(timeout);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
folly::SemiFuture<folly::Unit>
|
||||
EdenServiceHandler::semifuture_synchronizeWorkingCopy(
|
||||
std::unique_ptr<std::string> mountPoint,
|
||||
std::unique_ptr<SynchronizeWorkingCopyParams> /*params*/) {
|
||||
auto helper = INSTRUMENT_THRIFT_CALL(DBG3, *mountPoint);
|
||||
std::unique_ptr<SynchronizeWorkingCopyParams> params) {
|
||||
auto timeout = getSyncTimeout(*params->sync_ref());
|
||||
auto helper = INSTRUMENT_THRIFT_CALL(DBG3, *mountPoint, timeout.count());
|
||||
auto mountPath = AbsolutePathPiece{*mountPoint};
|
||||
auto edenMount = server_->getMount(mountPath);
|
||||
|
||||
// TODO(xavierd): Actually synchronize the working copy.
|
||||
|
||||
return folly::unit;
|
||||
return wrapImmediateFuture(
|
||||
std::move(helper),
|
||||
waitForPendingNotifications(*edenMount, timeout))
|
||||
.semi();
|
||||
}
|
||||
|
||||
void EdenServiceHandler::getSHA1(
|
||||
@ -555,25 +557,31 @@ void EdenServiceHandler::getSHA1(
|
||||
unique_ptr<vector<string>> paths,
|
||||
std::unique_ptr<SyncBehavior> sync) {
|
||||
TraceBlock block("getSHA1");
|
||||
auto syncTimeout = getSyncTimeout(*sync);
|
||||
auto helper = INSTRUMENT_THRIFT_CALL(
|
||||
DBG3, *mountPoint, getSyncTimeout(*sync).count(), toLogArg(*paths));
|
||||
DBG3, *mountPoint, syncTimeout.count(), toLogArg(*paths));
|
||||
vector<ImmediateFuture<Hash20>> futures;
|
||||
auto mountPath = AbsolutePathPiece{*mountPoint};
|
||||
for (const auto& path : *paths) {
|
||||
futures.emplace_back(
|
||||
getSHA1ForPathDefensively(mountPath, path, helper->getFetchContext()));
|
||||
}
|
||||
|
||||
auto results = collectAll(std::move(futures)).get();
|
||||
for (auto& result : results) {
|
||||
out.emplace_back();
|
||||
SHA1Result& sha1Result = out.back();
|
||||
if (result.hasValue()) {
|
||||
sha1Result.sha1_ref() = thriftHash20(result.value());
|
||||
} else {
|
||||
sha1Result.error_ref() = newEdenError(result.exception());
|
||||
}
|
||||
}
|
||||
waitForPendingNotifications(*server_->getMount(mountPath), syncTimeout)
|
||||
.thenValue([&](auto&&) {
|
||||
for (const auto& path : *paths) {
|
||||
futures.emplace_back(getSHA1ForPathDefensively(
|
||||
mountPath, path, helper->getFetchContext()));
|
||||
}
|
||||
|
||||
auto results = collectAll(std::move(futures)).get();
|
||||
for (auto& result : results) {
|
||||
out.emplace_back();
|
||||
SHA1Result& sha1Result = out.back();
|
||||
if (result.hasValue()) {
|
||||
sha1Result.sha1_ref() = thriftHash20(result.value());
|
||||
} else {
|
||||
sha1Result.error_ref() = newEdenError(result.exception());
|
||||
}
|
||||
}
|
||||
})
|
||||
.get();
|
||||
}
|
||||
|
||||
ImmediateFuture<Hash20> EdenServiceHandler::getSHA1ForPathDefensively(
|
||||
@ -1249,8 +1257,9 @@ EdenServiceHandler::semifuture_getEntryInformation(
|
||||
std::unique_ptr<std::string> mountPoint,
|
||||
std::unique_ptr<std::vector<std::string>> paths,
|
||||
std::unique_ptr<SyncBehavior> sync) {
|
||||
auto syncTimeout = getSyncTimeout(*sync);
|
||||
auto helper = INSTRUMENT_THRIFT_CALL(
|
||||
DBG3, *mountPoint, getSyncTimeout(*sync).count(), toLogArg(*paths));
|
||||
DBG3, *mountPoint, syncTimeout.count(), toLogArg(*paths));
|
||||
auto mountPath = AbsolutePathPiece{*mountPoint};
|
||||
auto edenMount = server_->getMount(mountPath);
|
||||
auto rootInode = edenMount->getRootInode();
|
||||
@ -1261,29 +1270,40 @@ EdenServiceHandler::semifuture_getEntryInformation(
|
||||
// data. In the future, this should be changed to avoid allocating inodes when
|
||||
// possible.
|
||||
|
||||
return wrapSemiFuture(
|
||||
std::move(helper),
|
||||
collectAll(applyToInodes(
|
||||
rootInode,
|
||||
*paths,
|
||||
[](InodePtr inode) { return inode->getType(); },
|
||||
fetchContext))
|
||||
.deferValue([](vector<Try<dtype_t>> done) {
|
||||
auto out = std::make_unique<vector<EntryInformationOrError>>();
|
||||
out->reserve(done.size());
|
||||
for (auto& item : done) {
|
||||
EntryInformationOrError result;
|
||||
if (item.hasException()) {
|
||||
result.error_ref() = newEdenError(item.exception());
|
||||
} else {
|
||||
EntryInformation info;
|
||||
info.dtype_ref() = static_cast<Dtype>(item.value());
|
||||
result.info_ref() = info;
|
||||
}
|
||||
out->emplace_back(std::move(result));
|
||||
}
|
||||
return out;
|
||||
}));
|
||||
return wrapImmediateFuture(
|
||||
std::move(helper),
|
||||
waitForPendingNotifications(*edenMount, syncTimeout)
|
||||
.thenValue([rootInode = std::move(rootInode),
|
||||
paths = std::move(paths),
|
||||
&fetchContext](auto&&) {
|
||||
return collectAll(applyToInodes(
|
||||
rootInode,
|
||||
*paths,
|
||||
[](InodePtr inode) {
|
||||
return inode->getType();
|
||||
},
|
||||
fetchContext))
|
||||
.deferValue([](vector<Try<dtype_t>> done) {
|
||||
auto out = std::make_unique<
|
||||
vector<EntryInformationOrError>>();
|
||||
out->reserve(done.size());
|
||||
for (auto& item : done) {
|
||||
EntryInformationOrError result;
|
||||
if (item.hasException()) {
|
||||
result.error_ref() =
|
||||
newEdenError(item.exception());
|
||||
} else {
|
||||
EntryInformation info;
|
||||
info.dtype_ref() =
|
||||
static_cast<Dtype>(item.value());
|
||||
result.info_ref() = info;
|
||||
}
|
||||
out->emplace_back(std::move(result));
|
||||
}
|
||||
return out;
|
||||
});
|
||||
}))
|
||||
.semi();
|
||||
}
|
||||
|
||||
folly::SemiFuture<std::unique_ptr<std::vector<FileInformationOrError>>>
|
||||
@ -1291,8 +1311,9 @@ EdenServiceHandler::semifuture_getFileInformation(
|
||||
std::unique_ptr<std::string> mountPoint,
|
||||
std::unique_ptr<std::vector<std::string>> paths,
|
||||
std::unique_ptr<SyncBehavior> sync) {
|
||||
auto syncTimeout = getSyncTimeout(*sync);
|
||||
auto helper = INSTRUMENT_THRIFT_CALL(
|
||||
DBG3, *mountPoint, getSyncTimeout(*sync).count(), toLogArg(*paths));
|
||||
DBG3, *mountPoint, syncTimeout.count(), toLogArg(*paths));
|
||||
auto mountPath = AbsolutePathPiece{*mountPoint};
|
||||
auto edenMount = server_->getMount(mountPath);
|
||||
auto rootInode = edenMount->getRootInode();
|
||||
@ -1301,43 +1322,55 @@ EdenServiceHandler::semifuture_getFileInformation(
|
||||
// paths. It's possible to resolve this request directly from source control
|
||||
// data. In the future, this should be changed to avoid allocating inodes when
|
||||
// possible.
|
||||
return wrapSemiFuture(
|
||||
std::move(helper),
|
||||
collectAll(applyToInodes(
|
||||
rootInode,
|
||||
*paths,
|
||||
[&fetchContext](InodePtr inode) {
|
||||
return inode->stat(fetchContext)
|
||||
.thenValue([](struct stat st) {
|
||||
FileInformation info;
|
||||
info.size_ref() = st.st_size;
|
||||
auto ts = stMtime(st);
|
||||
info.mtime_ref()->seconds_ref() = ts.tv_sec;
|
||||
info.mtime_ref()->nanoSeconds_ref() = ts.tv_nsec;
|
||||
info.mode_ref() = st.st_mode;
|
||||
return wrapImmediateFuture(
|
||||
std::move(helper),
|
||||
waitForPendingNotifications(*edenMount, syncTimeout)
|
||||
.thenValue([rootInode = std::move(rootInode),
|
||||
paths = std::move(paths),
|
||||
&fetchContext](auto&&) {
|
||||
return collectAll(
|
||||
applyToInodes(
|
||||
rootInode,
|
||||
*paths,
|
||||
[&fetchContext](InodePtr inode) {
|
||||
return inode->stat(fetchContext)
|
||||
.thenValue([](struct stat st) {
|
||||
FileInformation info;
|
||||
info.size_ref() = st.st_size;
|
||||
auto ts = stMtime(st);
|
||||
info.mtime_ref()->seconds_ref() =
|
||||
ts.tv_sec;
|
||||
info.mtime_ref()->nanoSeconds_ref() =
|
||||
ts.tv_nsec;
|
||||
info.mode_ref() = st.st_mode;
|
||||
|
||||
FileInformationOrError result;
|
||||
result.info_ref() = info;
|
||||
|
||||
return result;
|
||||
})
|
||||
.semi();
|
||||
},
|
||||
fetchContext))
|
||||
.deferValue([](vector<Try<FileInformationOrError>>&&
|
||||
done) {
|
||||
auto out =
|
||||
std::make_unique<vector<FileInformationOrError>>();
|
||||
out->reserve(done.size());
|
||||
for (auto& item : done) {
|
||||
if (item.hasException()) {
|
||||
FileInformationOrError result;
|
||||
result.info_ref() = info;
|
||||
|
||||
return result;
|
||||
})
|
||||
.semi();
|
||||
},
|
||||
fetchContext))
|
||||
.deferValue([](vector<Try<FileInformationOrError>>&& done) {
|
||||
auto out = std::make_unique<vector<FileInformationOrError>>();
|
||||
out->reserve(done.size());
|
||||
for (auto& item : done) {
|
||||
if (item.hasException()) {
|
||||
FileInformationOrError result;
|
||||
result.error_ref() = newEdenError(item.exception());
|
||||
out->emplace_back(std::move(result));
|
||||
} else {
|
||||
out->emplace_back(item.value());
|
||||
}
|
||||
}
|
||||
return out;
|
||||
}));
|
||||
result.error_ref() =
|
||||
newEdenError(item.exception());
|
||||
out->emplace_back(std::move(result));
|
||||
} else {
|
||||
out->emplace_back(item.value());
|
||||
}
|
||||
}
|
||||
return out;
|
||||
});
|
||||
}))
|
||||
.semi();
|
||||
}
|
||||
|
||||
#define ATTR_BITMASK(req, attr) \
|
||||
@ -1354,43 +1387,58 @@ EdenServiceHandler::semifuture_getAttributesFromFiles(
|
||||
// Get requested attributes for each path
|
||||
auto helper = INSTRUMENT_THRIFT_CALL(
|
||||
DBG3, mountPoint, syncTimeout.count(), toLogArg(paths));
|
||||
vector<ImmediateFuture<BlobMetadata>> futures;
|
||||
for (const auto& p : paths) {
|
||||
futures.emplace_back(
|
||||
getBlobMetadataForPath(mountPath, p, helper->getFetchContext()));
|
||||
}
|
||||
auto& fetchContext = helper->getFetchContext();
|
||||
|
||||
// Collect all futures into a single tuple
|
||||
auto allRes = facebook::eden::collectAll(std::move(futures));
|
||||
return wrapImmediateFuture(
|
||||
std::move(helper),
|
||||
std::move(allRes).thenValue(
|
||||
[paths = std::move(paths),
|
||||
reqBitmask](std::vector<folly::Try<BlobMetadata>>&& allRes) {
|
||||
auto res = std::make_unique<GetAttributesFromFilesResult>();
|
||||
auto sizeRequested = ATTR_BITMASK(reqBitmask, FILE_SIZE);
|
||||
auto sha1Requested = ATTR_BITMASK(reqBitmask, SHA1_HASH);
|
||||
for (const auto& tryMetadata : allRes) {
|
||||
FileAttributeDataOrError file_res;
|
||||
// check for exceptions. if found, return EdenError early
|
||||
if (tryMetadata.hasException()) {
|
||||
file_res.set_error(
|
||||
newEdenError(tryMetadata.exception()));
|
||||
} else { /* No exceptions, fill in data */
|
||||
FileAttributeData file_data;
|
||||
const auto& metadata = tryMetadata.value();
|
||||
// Only fill in requested fields
|
||||
if (sha1Requested) {
|
||||
file_data.sha1_ref() = thriftHash20(metadata.sha1);
|
||||
}
|
||||
if (sizeRequested) {
|
||||
file_data.fileSize_ref() = metadata.size;
|
||||
}
|
||||
file_res.data_ref() = file_data;
|
||||
}
|
||||
res->res_ref()->emplace_back(file_res);
|
||||
waitForPendingNotifications(
|
||||
*server_->getMount(mountPath), syncTimeout)
|
||||
.thenValue([this,
|
||||
paths = std::move(paths),
|
||||
&fetchContext,
|
||||
mountPath,
|
||||
reqBitmask](auto&&) mutable {
|
||||
vector<ImmediateFuture<BlobMetadata>> futures;
|
||||
for (const auto& p : paths) {
|
||||
futures.emplace_back(
|
||||
getBlobMetadataForPath(mountPath, p, fetchContext));
|
||||
}
|
||||
return res;
|
||||
|
||||
// Collect all futures into a single tuple
|
||||
return facebook::eden::collectAll(std::move(futures))
|
||||
.thenValue([paths = std::move(paths), reqBitmask](
|
||||
std::vector<folly::Try<BlobMetadata>>&&
|
||||
allRes) {
|
||||
auto res =
|
||||
std::make_unique<GetAttributesFromFilesResult>();
|
||||
auto sizeRequested =
|
||||
ATTR_BITMASK(reqBitmask, FILE_SIZE);
|
||||
auto sha1Requested =
|
||||
ATTR_BITMASK(reqBitmask, SHA1_HASH);
|
||||
for (const auto& tryMetadata : allRes) {
|
||||
FileAttributeDataOrError file_res;
|
||||
// check for exceptions. if found, return EdenError
|
||||
// early
|
||||
if (tryMetadata.hasException()) {
|
||||
file_res.set_error(
|
||||
newEdenError(tryMetadata.exception()));
|
||||
} else { /* No exceptions, fill in data */
|
||||
FileAttributeData file_data;
|
||||
const auto& metadata = tryMetadata.value();
|
||||
// Only fill in requested fields
|
||||
if (sha1Requested) {
|
||||
file_data.sha1_ref() =
|
||||
thriftHash20(metadata.sha1);
|
||||
}
|
||||
if (sizeRequested) {
|
||||
file_data.fileSize_ref() = metadata.size;
|
||||
}
|
||||
file_res.data_ref() = file_data;
|
||||
}
|
||||
res->res_ref()->emplace_back(file_res);
|
||||
}
|
||||
return res;
|
||||
});
|
||||
}))
|
||||
.semi();
|
||||
}
|
||||
@ -2103,18 +2151,24 @@ void EdenServiceHandler::debugInodeStatus(
|
||||
eden_constants::DIS_COMPUTE_BLOB_SIZES_;
|
||||
}
|
||||
|
||||
auto syncTimeout = getSyncTimeout(*sync);
|
||||
auto helper = INSTRUMENT_THRIFT_CALL(
|
||||
DBG2, *mountPoint, *path, flags, getSyncTimeout(*sync).count());
|
||||
DBG2, *mountPoint, *path, flags, syncTimeout.count());
|
||||
auto mountPath = AbsolutePathPiece{*mountPoint};
|
||||
auto edenMount = server_->getMount(mountPath);
|
||||
|
||||
auto inode = inodeFromUserPath(*edenMount, *path, helper->getFetchContext())
|
||||
.asTreePtr();
|
||||
auto inodePath = inode->getPath().value();
|
||||
waitForPendingNotifications(*edenMount, syncTimeout)
|
||||
.thenValue([&](auto&&) {
|
||||
auto inode =
|
||||
inodeFromUserPath(*edenMount, *path, helper->getFetchContext())
|
||||
.asTreePtr();
|
||||
auto inodePath = inode->getPath().value();
|
||||
|
||||
InodeStatusCallbacks callbacks{edenMount.get(), flags, inodeInfo};
|
||||
traverseObservedInodes(*inode, inodePath, callbacks);
|
||||
callbacks.fillBlobSizes(helper->getFetchContext());
|
||||
InodeStatusCallbacks callbacks{edenMount.get(), flags, inodeInfo};
|
||||
traverseObservedInodes(*inode, inodePath, callbacks);
|
||||
callbacks.fillBlobSizes(helper->getFetchContext());
|
||||
})
|
||||
.get();
|
||||
}
|
||||
|
||||
void EdenServiceHandler::debugOutstandingFuseCalls(
|
||||
|
@ -11,7 +11,11 @@ import typing
|
||||
from pathlib import Path, PurePath
|
||||
|
||||
from facebook.eden.constants import STATS_MOUNTS_STATS
|
||||
from facebook.eden.ttypes import GetStatInfoParams, JournalInfo
|
||||
from facebook.eden.ttypes import (
|
||||
GetStatInfoParams,
|
||||
JournalInfo,
|
||||
SynchronizeWorkingCopyParams,
|
||||
)
|
||||
|
||||
from .lib import testcase
|
||||
from .lib.hgrepo import HgRepository
|
||||
@ -218,6 +222,10 @@ class JournalInfoTest(testcase.EdenRepoTest):
|
||||
|
||||
def journal_stats(self) -> JournalInfo:
|
||||
with self.get_thrift_client() as thrift_client:
|
||||
thrift_client.synchronizeWorkingCopy(
|
||||
self.mount.encode("utf-8"), SynchronizeWorkingCopyParams()
|
||||
)
|
||||
|
||||
stats = thrift_client.getStatInfo(
|
||||
GetStatInfoParams(statsMask=STATS_MOUNTS_STATS)
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user