move waitForPendingWrites into FsChannel

Summary:
waitForPendingWrites is general to any userspace filesystem with
asynchronous write notifications. This could even include FUSE in
writeback caching mode. To remove another ifdef, move
waitForPendingWrites into FsChannel.

Reviewed By: kmancini

Differential Revision: D45167206

fbshipit-source-id: 4daec70845864d90b94739fe2011e8ed90a6a200
This commit is contained in:
Chad Austin 2023-05-11 11:55:24 -07:00 committed by Facebook GitHub Bot
parent f55c686316
commit f851c66fd0
8 changed files with 46 additions and 26 deletions

View File

@ -474,6 +474,10 @@ class FuseChannel final : public FsChannel {
return processAccessLog_;
}
ImmediateFuture<folly::Unit> waitForPendingWrites() override {
return folly::unit;
}
std::shared_ptr<Notifier> getNotifier() const {
return notifier_;
}

View File

@ -1424,13 +1424,14 @@ ImmediateFuture<VirtualInode> EdenMount::getVirtualInode(
[p = std::move(processor)]() mutable { p.reset(); });
}
ImmediateFuture<folly::Unit> EdenMount::waitForPendingNotifications() const {
#ifdef _WIN32
if (auto* channel = getPrjfsChannel()) {
return channel->waitForPendingNotifications();
ImmediateFuture<folly::Unit> EdenMount::waitForPendingWrites() const {
// TODO: This is a race condition since channel_ can be destroyed
// concurrently. We need to change EdenMount to never unset channel_.
if (channel_) {
return channel_->waitForPendingWrites();
} else {
return folly::unit;
}
#endif
return folly::unit;
}
folly::Future<CheckoutResult> EdenMount::checkout(
@ -1511,8 +1512,8 @@ folly::Future<CheckoutResult> EdenMount::checkout(
.thenValue(
[this](std::tuple<shared_ptr<const Tree>, shared_ptr<const Tree>>
treeResults) {
XLOG(DBG7) << "Checkout: waitForPendingNotifications";
return waitForPendingNotifications()
XLOG(DBG7) << "Checkout: waitForPendingWrites";
return waitForPendingWrites()
.thenValue([treeResults = std::move(treeResults)](auto&&) {
return treeResults;
})
@ -1785,7 +1786,7 @@ ImmediateFuture<Unit> EdenMount::diff(
const RootId& commitHash) const {
return objectStore_->getRootTree(commitHash, ctxPtr->getFetchContext())
.thenValue([this](std::shared_ptr<const Tree> rootTree) {
return waitForPendingNotifications().thenValue(
return waitForPendingWrites().thenValue(
[rootTree = std::move(rootTree)](auto&&) { return rootTree; });
})
.thenValue([ctxPtr, rootInode = std::move(rootInode)](

View File

@ -453,7 +453,7 @@ class EdenMount : public std::enable_shared_from_this<EdenMount> {
*
* This can be called from any thread/executor.
*/
ImmediateFuture<folly::Unit> waitForPendingNotifications() const;
ImmediateFuture<folly::Unit> waitForPendingWrites() const;
/**
* Test if the working copy persist on disk after this mount will be

View File

@ -85,6 +85,17 @@ class FsChannel {
*/
virtual ProcessAccessLog& getProcessAccessLog() = 0;
/**
* Some user-space filesystem implementations (notably Projected FS, but also
* FUSE in writeback-cache mode) receive write notifications asynchronously.
*
* In situations like Thrift requests where EdenFS must guarantee previous
* writes have been observed, call waitForPendingWrites. The returned future
* will complete when all pending write operations have been observed.
*/
FOLLY_NODISCARD virtual ImmediateFuture<folly::Unit>
waitForPendingWrites() = 0;
/**
* During checkout or other Thrift calls that modify the filesystem, those
* modifications may be invisible to the filesystem's own caches. Therefore,

View File

@ -188,6 +188,10 @@ class Nfsd3 final : public FsChannel {
bool takeoverStop() override;
ImmediateFuture<folly::Unit> waitForPendingWrites() override {
return folly::unit;
}
/**
* Wait for all pending invalidation to complete.
*

View File

@ -1311,7 +1311,7 @@ void PrjfsChannel::start(
XLOG(INFO) << "Started PrjfsChannel for: " << mountPath_;
}
ImmediateFuture<folly::Unit> PrjfsChannel::waitForPendingNotifications() {
ImmediateFuture<folly::Unit> PrjfsChannel::waitForPendingWrites() {
auto inner = getInner();
if (!inner) {
return makeImmediateFuture<folly::Unit>(std::runtime_error(fmt::format(

View File

@ -471,7 +471,7 @@ class PrjfsChannel : public FsChannel {
* The returned ImmediateFuture will complete when all the previously
* received notifications have completed.
*/
ImmediateFuture<folly::Unit> waitForPendingNotifications();
ImmediateFuture<folly::Unit> waitForPendingWrites() override;
const char* getName() const override {
return "prjfs";

View File

@ -655,7 +655,7 @@ int64_t getSyncTimeout(const SyncBehavior& sync) {
* When the SyncBehavior is unset, this default to a timeout of 60 seconds. A
* negative SyncBehavior mean to wait indefinitely.
*/
ImmediateFuture<folly::Unit> waitForPendingNotifications(
ImmediateFuture<folly::Unit> waitForPendingWrites(
const EdenMount& mount,
const SyncBehavior& sync) {
auto seconds = getSyncTimeout(sync);
@ -663,7 +663,7 @@ ImmediateFuture<folly::Unit> waitForPendingNotifications(
return folly::unit;
}
auto future = mount.waitForPendingNotifications().semi();
auto future = mount.waitForPendingWrites().semi();
if (seconds > 0) {
future = std::move(future).within(std::chrono::seconds{seconds});
}
@ -682,7 +682,7 @@ EdenServiceHandler::semifuture_synchronizeWorkingCopy(
return wrapImmediateFuture(
std::move(helper),
waitForPendingNotifications(*edenMount, *params->sync()))
waitForPendingWrites(*edenMount, *params->sync()))
.semi();
}
@ -698,7 +698,7 @@ EdenServiceHandler::semifuture_getSHA1(
auto mountPath = absolutePathFromThrift(*mountPoint);
auto [mount, rootInode] = server_->getMountAndRootInode(mountPath);
auto notificationFuture = waitForPendingNotifications(*mount, *sync);
auto notificationFuture = waitForPendingWrites(*mount, *sync);
return wrapImmediateFuture(
std::move(helper),
std::move(notificationFuture)
@ -1880,7 +1880,7 @@ EdenServiceHandler::semifuture_getEntryInformation(
return wrapImmediateFuture(
std::move(helper),
waitForPendingNotifications(*edenMount, *sync)
waitForPendingWrites(*edenMount, *sync)
.thenValue([rootInode = std::move(rootInode),
paths = std::move(paths),
objectStore,
@ -1931,7 +1931,7 @@ EdenServiceHandler::semifuture_getFileInformation(
return wrapImmediateFuture(
std::move(helper),
waitForPendingNotifications(*edenMount, *sync)
waitForPendingWrites(*edenMount, *sync)
.thenValue([rootInode = std::move(rootInode),
paths = std::move(paths),
lastCheckoutTime,
@ -2139,7 +2139,7 @@ EdenServiceHandler::semifuture_readdir(std::unique_ptr<ReaddirParams> params) {
EntryAttributeFlags::raw(*params->requestedAttributes());
return wrapImmediateFuture(
std::move(helper),
waitForPendingNotifications(*edenMount, *params->sync())
waitForPendingWrites(*edenMount, *params->sync())
.thenValue(
[edenMount = std::move(edenMount),
rootInode = std::move(rootInode),
@ -2198,7 +2198,7 @@ EdenServiceHandler::getEntryAttributes(
SyncBehavior sync,
const ObjectFetchContextPtr& fetchContext) {
auto [edenMount, _] = server_->getMountAndRootInode(mountPath);
return waitForPendingNotifications(*edenMount, sync)
return waitForPendingWrites(*edenMount, sync)
.thenValue([this,
&paths,
fetchContext = fetchContext.copy(),
@ -2433,7 +2433,7 @@ folly::SemiFuture<folly::Unit> EdenServiceHandler::semifuture_removeRecursively(
return wrapImmediateFuture(
std::move(helper),
waitForPendingNotifications(*edenMount, *params->sync())
waitForPendingWrites(*edenMount, *params->sync())
.thenValue([edenMount = edenMount,
relativePath,
fetchContext = fetchContext.copy()](folly::Unit) {
@ -2565,10 +2565,10 @@ EdenServiceHandler::semifuture_ensureMaterialized(
// execution starting by read large files on the background.
bool background = *params->background();
auto waitForPendingNotificationsFuture =
waitForPendingNotifications(*edenMount, *params->sync());
auto waitForPendingWritesFuture =
waitForPendingWrites(*edenMount, *params->sync());
auto ensureMaterializedFuture =
std::move(waitForPendingNotificationsFuture)
std::move(waitForPendingWritesFuture)
.thenValue([params = std::move(params),
edenMount = std::move(edenMount),
helper = std::move(helper)](auto&&) mutable {
@ -3352,7 +3352,7 @@ void EdenServiceHandler::debugInodeStatus(
auto mountPath = absolutePathFromThrift(*mountPoint);
auto [edenMount, rootInode] = server_->getMountAndRootInode(mountPath);
waitForPendingNotifications(*edenMount, *sync)
waitForPendingWrites(*edenMount, *sync)
.thenValue([&, edenMount = edenMount](auto&&) {
auto inode =
inodeFromUserPath(*edenMount, *path, helper->getFetchContext())
@ -3696,7 +3696,7 @@ EdenServiceHandler::semifuture_debugInvalidateNonMaterialized(
auto invalFut =
std::move(backgroundFuture)
.thenValue([edenMount = edenMount, sync = *params->sync()](auto&&) {
return waitForPendingNotifications(*edenMount, sync);
return waitForPendingWrites(*edenMount, sync);
})
.thenValue([edenMount = edenMount,
path = *params->path(),