mirror of
https://github.com/facebook/sapling.git
synced 2024-10-05 14:28:17 +03:00
nfs: open files to do invalidation
Summary: An NFS client caches the attributes of files to avoid having to request these very frequently. What this means is that a file changed by another client (or by the server itself) may take some time to be reflected on the client, that time depends on the attribute caching configuration of the mount point. For EdenFS, files can changed in 2 ways: - Either it is changed by the user via the mount point, - Or the user runs an `hg update` For the first one, the client will simply update its attributes appropriately, but for the second one, the cached attributes will only be updated when the user does opens the file, any calls to stat prior will return the old attributes. Since EdenFS runs on the same host, we can force the attributes caches to be discarded by simply issuing an open call on the file that changed. Reviewed By: chadaustin Differential Revision: D28456482 fbshipit-source-id: 91022d35a33e436c47d94403d0c139992f880cf9
This commit is contained in:
parent
2e6735be81
commit
8391057f7d
@ -58,29 +58,35 @@ Future<vector<CheckoutConflict>> CheckoutContext::finish(Hash newSnapshot) {
|
||||
// This allows any filesystem unlink() or rename() operations to proceed.
|
||||
renameLock_.unlock();
|
||||
|
||||
if (!isDryRun()) {
|
||||
#ifndef _WIN32
|
||||
// If we have a FUSE channel, flush all invalidations we sent to the kernel
|
||||
// as part of the checkout operation. This will ensure that other processes
|
||||
// will see up-to-date data once we return.
|
||||
//
|
||||
// We do this after releasing the rename lock since some of the invalidation
|
||||
// operations may be blocked waiting on FUSE unlink() and rename() operations
|
||||
// complete.
|
||||
auto* fuseChannel = mount_->getFuseChannel();
|
||||
if (!isDryRun() && fuseChannel) {
|
||||
// If we have a FUSE channel, flush all invalidations we sent to the kernel
|
||||
// as part of the checkout operation. This will ensure that other processes
|
||||
// will see up-to-date data once we return.
|
||||
//
|
||||
// We do this after releasing the rename lock since some of the invalidation
|
||||
// operations may be blocked waiting on FUSE unlink() and rename()
|
||||
// operations complete.
|
||||
XLOG(DBG4) << "waiting for inode invalidations to complete";
|
||||
return fuseChannel->flushInvalidations().thenValue([this](auto&&) {
|
||||
folly::Future<folly::Unit> flushInvalidationsFuture;
|
||||
if (auto* fuseChannel = mount_->getFuseChannel()) {
|
||||
flushInvalidationsFuture = fuseChannel->flushInvalidations();
|
||||
} else if (auto* nfsdChannel = mount_->getNfsdChannel()) {
|
||||
flushInvalidationsFuture = nfsdChannel->flushInvalidations();
|
||||
}
|
||||
|
||||
return std::move(flushInvalidationsFuture).thenValue([this](auto&&) {
|
||||
XLOG(DBG4) << "finished processing inode invalidations";
|
||||
parentLock_.unlock();
|
||||
return std::move(*conflicts_.wlock());
|
||||
});
|
||||
}
|
||||
#else
|
||||
auto* channel = mount_->getPrjfsChannel();
|
||||
if (!isDryRun() && channel) {
|
||||
channel->flushNegativePathCache();
|
||||
}
|
||||
auto* channel = mount_->getPrjfsChannel();
|
||||
if (auto* channel = mount_->getPrjfsChannel()) {
|
||||
channel->flushNegativePathCache();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
// Release the parentLock_.
|
||||
// Once this is released other checkout operations may proceed.
|
||||
|
@ -3065,6 +3065,8 @@ folly::Try<void> TreeInode::invalidateChannelEntryCache(
|
||||
if (auto* fuseChannel = getMount()->getFuseChannel()) {
|
||||
fuseChannel->invalidateEntry(getNodeId(), name);
|
||||
}
|
||||
// For NFS, the entry cache is flushed when the directory mtime is changed.
|
||||
// Directly invalidating an entry is not possible.
|
||||
#else
|
||||
if (auto* fsChannel = getMount()->getPrjfsChannel()) {
|
||||
const auto path = getPath();
|
||||
@ -3090,6 +3092,11 @@ folly::Try<void> TreeInode::invalidateChannelDirCache(TreeInodeState&) {
|
||||
// when an entry is removed or modified. But when new entries are
|
||||
// added, the inode itself must be invalidated.
|
||||
fuseChannel->invalidateInode(getNodeId(), 0, 0);
|
||||
} else if (auto* nfsdChannel = getMount()->getNfsdChannel()) {
|
||||
const auto path = getPath();
|
||||
if (path.has_value()) {
|
||||
nfsdChannel->invalidate(getMount()->getPath() + *path);
|
||||
}
|
||||
}
|
||||
#else
|
||||
if (auto* fsChannel = getMount()->getPrjfsChannel()) {
|
||||
|
@ -14,6 +14,7 @@
|
||||
#endif
|
||||
|
||||
#include <folly/Utility.h>
|
||||
#include <folly/executors/SerialExecutor.h>
|
||||
#include <folly/futures/Future.h>
|
||||
#include "eden/fs/nfs/NfsdRpc.h"
|
||||
#include "eden/fs/utils/Clock.h"
|
||||
@ -1705,7 +1706,9 @@ Nfsd3::Nfsd3(
|
||||
iosize),
|
||||
evb,
|
||||
std::move(threadPool)),
|
||||
processAccessLog_(std::move(processNameCache)) {}
|
||||
processAccessLog_(std::move(processNameCache)),
|
||||
invalidationExecutor_{
|
||||
folly::SerialExecutor::create(folly::getGlobalCPUExecutor())} {}
|
||||
|
||||
void Nfsd3::initialize(folly::SocketAddress addr, bool registerWithRpcbind) {
|
||||
server_.initialize(addr);
|
||||
@ -1714,6 +1717,35 @@ void Nfsd3::initialize(folly::SocketAddress addr, bool registerWithRpcbind) {
|
||||
}
|
||||
}
|
||||
|
||||
void Nfsd3::invalidate(AbsolutePath path) {
|
||||
invalidationExecutor_->add([path = std::move(path)]() {
|
||||
try {
|
||||
folly::File(path.c_str());
|
||||
} catch (const std::exception& ex) {
|
||||
if (const auto* system_error =
|
||||
dynamic_cast<const std::system_error*>(&ex)) {
|
||||
if (isEnoent(*system_error)) {
|
||||
// A removed path would result in an ENOENT error, this is expected,
|
||||
// don't warn about it.
|
||||
return;
|
||||
}
|
||||
}
|
||||
XLOGF(ERR, "Couldn't invalidate {}: {}", path, folly::exceptionStr(ex));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
folly::Future<folly::Unit> Nfsd3::flushInvalidations() {
|
||||
folly::Promise<folly::Unit> promise;
|
||||
auto result = promise.getFuture();
|
||||
invalidationExecutor_->add([promise = std::move(promise)]() mutable {
|
||||
// Since the invalidationExecutor_ is a SerialExecutor, this lambda will
|
||||
// run only when all the previously added open have completed.
|
||||
promise.setValue(folly::unit);
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
Nfsd3::~Nfsd3() {
|
||||
// TODO(xavierd): wait for the pending requests, and the sockets being tore
|
||||
// down
|
||||
|
@ -59,6 +59,31 @@ class Nfsd3 {
|
||||
|
||||
void initialize(folly::SocketAddress addr, bool registerWithRpcbind);
|
||||
|
||||
/**
|
||||
* Trigger an invalidation for the given path.
|
||||
*
|
||||
* To avoid a very large amount of traffic between an NFS client and the
|
||||
* server, the client will cache attributes that the server previously
|
||||
* returned for a file. This allows stat(2) calls to be fully resolved on the
|
||||
* client. However, clients do respect a close-to-open consistency (CTO)
|
||||
* whereas opening a file will refresh the client attributes. This invalidate
|
||||
* method simply tries to open the given file in a background thread.
|
||||
*
|
||||
* Note that the open(2) call runs asynchronously in a background thread as
|
||||
* both the kernel and EdenFS are holding locks that would otherwise cause
|
||||
* EdenFS to deadlock. The flushInvalidations method below should be called
|
||||
* with all the locks released to wait for all the invalidation to complete.
|
||||
*/
|
||||
void invalidate(AbsolutePath path);
|
||||
|
||||
/**
|
||||
* Wait for all pending invalidation to complete.
|
||||
*
|
||||
* The future will complete when all the previously triggered invalidation
|
||||
* completed.
|
||||
*/
|
||||
folly::Future<folly::Unit> flushInvalidations();
|
||||
|
||||
/**
|
||||
* Obtain the address that this NFSv3 program is listening on.
|
||||
*/
|
||||
@ -86,6 +111,7 @@ class Nfsd3 {
|
||||
RpcServer server_;
|
||||
ProcessAccessLog processAccessLog_;
|
||||
folly::Promise<StopData> stopPromise_;
|
||||
folly::Executor::KeepAlive<folly::Executor> invalidationExecutor_;
|
||||
};
|
||||
|
||||
} // namespace facebook::eden
|
||||
|
@ -673,6 +673,19 @@ class UpdateCacheInvalidationTest(EdenHgTestCase):
|
||||
|
||||
self.assertEqual({"file1", "file2"}, self._list_contents("dir"))
|
||||
|
||||
def test_update_change_stat(self) -> None:
|
||||
self.repo.write_file("dir/file2", "foobar")
|
||||
self.repo.commit("Change file2")
|
||||
|
||||
filepath = self.get_path("dir/file2")
|
||||
prestats = os.stat(filepath)
|
||||
self.assertEqual(prestats.st_size, 6)
|
||||
|
||||
self.repo.update(self.commit4)
|
||||
|
||||
poststats = os.stat(filepath)
|
||||
self.assertEqual(poststats.st_size, 7)
|
||||
|
||||
if sys.platform == "win32":
|
||||
|
||||
def _open_locked(self, path: str, directory: bool = False) -> Handle:
|
||||
|
Loading…
Reference in New Issue
Block a user