nfs: move servicing of NFS callback to a threadpool

Summary:
By moving the work to a background threadpool, we can more quickly go back to
servicing incoming NFS requests and thus allow more work to be done
concurrently. This would allow tools like ripgrep to being able to use multiple
cores to search in the code base.

Reviewed By: genevievehelsel

Differential Revision: D27194040

fbshipit-source-id: 7f1775ddaaa7eaf8776a06d05951cb936cd3fbb5
This commit is contained in:
Xavier Deguillard 2021-03-29 09:19:29 -07:00 committed by Facebook GitHub Bot
parent 8011ba700f
commit a598d6ceb4
12 changed files with 182 additions and 41 deletions

View File

@ -503,6 +503,20 @@ class EdenConfig : private ConfigSettingManager {
*/
ConfigSetting<bool> registerMountd{"nfs:register-mountd", false, this};
/**
* Number of threads that will service the NFS requests.
*/
ConfigSetting<uint64_t> numNfsThreads{"nfs:num-servicing-threads", 8, this};
/**
* Maximum number of pending NFS requests. If more requests are inflight, the
* NFS code will block.
*/
ConfigSetting<uint64_t> maxNfsInflightRequests{
"nfs:max-inflight-requests",
1000,
this};
/**
* Kill switch for the prefetch profiles feature.
*/

View File

@ -53,7 +53,6 @@ target_link_libraries(
PUBLIC
eden_nfs_dispatcher
eden_nfs_rpc_server
eden_utils
PRIVATE
eden_nfs_nfsd_rpc
Folly::folly
@ -69,6 +68,8 @@ target_link_libraries(
PUBLIC
eden_nfs_mountd
eden_nfs_nfsd3
PRIVATE
Folly::folly
)
add_library(

View File

@ -212,8 +212,12 @@ void MountdServerProcessor::unregisterMount(AbsolutePathPiece path) {
XCHECK_EQ(numRemoved, 1u);
}
Mountd::Mountd(bool registerWithRpcbind, folly::EventBase* evb)
: proc_(std::make_shared<MountdServerProcessor>()), server_(proc_, evb) {
Mountd::Mountd(
bool registerWithRpcbind,
folly::EventBase* evb,
std::shared_ptr<folly::Executor> threadPool)
: proc_(std::make_shared<MountdServerProcessor>()),
server_(proc_, evb, std::move(threadPool)) {
if (registerWithRpcbind) {
server_.registerService(kMountdProgNumber, kMountdProgVersion);
}

View File

@ -16,6 +16,10 @@
#include "eden/fs/nfs/rpc/Server.h"
#include "eden/fs/utils/PathFuncs.h"
namespace folly {
class Executor;
}
namespace facebook::eden {
class MountdServerProcessor;
@ -38,7 +42,10 @@ class Mountd {
* to manually specify the port on which this server is bound, so registering
* is not necessary for a properly behaving EdenFS.
*/
Mountd(bool registerWithRpcbind, folly::EventBase* evb);
Mountd(
bool registerWithRpcbind,
folly::EventBase* evb,
std::shared_ptr<folly::Executor> threadPool);
/**
* Register a path as the root of a mount point.

View File

@ -8,10 +8,69 @@
#ifndef _WIN32
#include "eden/fs/nfs/NfsServer.h"
#include <folly/concurrency/DynamicBoundedQueue.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/thread_factory/NamedThreadFactory.h>
#include "eden/fs/nfs/Nfsd3.h"
namespace facebook::eden {
namespace {
using Task = folly::CPUThreadPoolExecutor::CPUTask;
using Queue = folly::DMPMCQueue<Task, true>;
/**
* Task queue that will hold the pending NFS requests.
*
* This is backed by a DMPMCQueue.
*/
class NfsTaskQueue : public folly::BlockingQueue<Task> {
public:
explicit NfsTaskQueue(uint64_t maxInflightRequests)
: queue_(Queue{maxInflightRequests}) {}
folly::BlockingQueueAddResult add(Task item) override {
queue_.enqueue(std::move(item));
return sem_.post();
}
Task take() override {
sem_.wait();
Task res;
queue_.dequeue(res);
return res;
}
folly::Optional<Task> try_take_for(std::chrono::milliseconds time) override {
if (!sem_.try_wait_for(time)) {
return folly::none;
}
Task res;
queue_.dequeue(res);
return res;
}
size_t size() override {
return queue_.size();
}
private:
folly::LifoSem sem_;
Queue queue_;
};
} // namespace
NfsServer::NfsServer(
bool registerMountdWithRpcbind,
folly::EventBase* evb,
uint64_t numServicingThreads,
uint64_t maxInflightRequests)
: evb_(evb),
threadPool_(std::make_shared<folly::CPUThreadPoolExecutor>(
numServicingThreads,
std::make_unique<NfsTaskQueue>(maxInflightRequests),
std::make_unique<folly::NamedThreadFactory>("NfsThreadPool"))),
mountd_(registerMountdWithRpcbind, evb_, threadPool_) {}
NfsServer::NfsMountInfo NfsServer::registerMount(
AbsolutePathPiece path,
InodeNumber rootIno,
@ -24,6 +83,7 @@ NfsServer::NfsMountInfo NfsServer::registerMount(
auto nfsd = std::make_unique<Nfsd3>(
false,
evb_,
threadPool_,
std::move(dispatcher),
straceLogger,
std::move(processNameCache),

View File

@ -13,6 +13,10 @@
#include "eden/fs/nfs/Mountd.h"
#include "eden/fs/nfs/Nfsd3.h"
namespace folly {
class Executor;
}
namespace facebook::eden {
class Notifications;
@ -24,7 +28,9 @@ class NfsServer {
* Create a new NFS server.
*
* This will handle the lifetime of the various programs involved in the NFS
* protocol including mountd and nfsd.
* protocol including mountd and nfsd. The requests will be serviced by a
* blocking thread pool initialized with numServicingThreads and
* maxInflightRequests.
*
* One mountd program will be created per NfsServer, while one nfsd program
* will be created per-mount point, this allows nfsd program to be only aware
@ -32,8 +38,11 @@ class NfsServer {
*
* See Mountd constructor for the meaning of registerMountdWithRpcbind.
*/
NfsServer(bool registerMountdWithRpcbind, folly::EventBase* evb)
: evb_(evb), mountd_(registerMountdWithRpcbind, evb_) {}
NfsServer(
bool registerMountdWithRpcbind,
folly::EventBase* evb,
uint64_t numServicingThreads,
uint64_t maxInflightRequests);
/**
* Return value of registerMount.
@ -85,6 +94,7 @@ class NfsServer {
private:
folly::EventBase* evb_;
std::shared_ptr<folly::Executor> threadPool_;
Mountd mountd_;
};

View File

@ -15,6 +15,10 @@
#include "eden/fs/utils/Clock.h"
#include "eden/fs/utils/SystemError.h"
namespace folly {
class Executor;
}
namespace facebook::eden {
namespace {
@ -1507,6 +1511,7 @@ folly::Future<folly::Unit> Nfsd3ServerProcessor::dispatchRpc(
Nfsd3::Nfsd3(
bool registerWithRpcbind,
folly::EventBase* evb,
std::shared_ptr<folly::Executor> threadPool,
std::unique_ptr<NfsDispatcher> dispatcher,
const folly::Logger* straceLogger,
std::shared_ptr<ProcessNameCache> processNameCache,
@ -1518,7 +1523,8 @@ Nfsd3::Nfsd3(
std::move(dispatcher),
straceLogger,
caseSensitive),
evb),
evb,
std::move(threadPool)),
processAccessLog_(std::move(processNameCache)) {
if (registerWithRpcbind) {
server_.registerService(kNfsdProgNumber, kNfsd3ProgVersion);

View File

@ -16,6 +16,10 @@
#include "eden/fs/nfs/rpc/Server.h"
#include "eden/fs/utils/ProcessAccessLog.h"
namespace folly {
class Executor;
}
namespace facebook::eden {
class Notifications;
@ -42,6 +46,7 @@ class Nfsd3 {
Nfsd3(
bool registerWithRpcbind,
folly::EventBase* evb,
std::shared_ptr<folly::Executor> threadPool,
std::unique_ptr<NfsDispatcher> dispatcher,
const folly::Logger* straceLogger,
std::shared_ptr<ProcessNameCache> processNameCache,

View File

@ -26,6 +26,8 @@ target_link_libraries(
eden_nfs_rpc
eden_nfs_portmap
Folly::folly
PRIVATE
eden_utils
)
add_subdirectory(test)

View File

@ -82,9 +82,9 @@ class RpcTcpHandler : public folly::DelayedDestruction {
}
};
void dispatchAndReply(std::unique_ptr<folly::IOBuf> input) {
DestructorGuard guard(this);
void dispatchAndReply(
std::unique_ptr<folly::IOBuf> input,
DestructorGuard guard) {
folly::makeFutureWith([this, input = std::move(input)]() mutable {
folly::io::Cursor deser(input.get());
rpc_msg_call call = XdrTrait<rpc_msg_call>::deserialize(deser);
@ -182,6 +182,7 @@ class RpcTcpHandler : public folly::DelayedDestruction {
std::shared_ptr<RpcServerProcessor> proc_;
AsyncSocket::UniquePtr sock_;
std::shared_ptr<folly::Executor> threadPool_;
std::unique_ptr<Reader> reader_;
Writer writer_;
folly::IOBufQueue readBuf_;
@ -189,9 +190,11 @@ class RpcTcpHandler : public folly::DelayedDestruction {
public:
RpcTcpHandler(
std::shared_ptr<RpcServerProcessor> proc,
std::unique_ptr<AsyncSocket, ReleasableDestructor>&& socket_)
std::unique_ptr<AsyncSocket, ReleasableDestructor>&& socket,
std::shared_ptr<folly::Executor> threadPool)
: proc_(proc),
sock_(std::move(socket_)),
sock_(std::move(socket)),
threadPool_(std::move(threadPool)),
reader_(std::make_unique<Reader>(this)),
writer_() {}
@ -225,31 +228,38 @@ class RpcTcpHandler : public folly::DelayedDestruction {
}
auto buf = readBuf_.split(c.getCurrentPosition());
buf->coalesce();
XLOG(DBG8) << "Received:\n" << folly::hexDump(buf->data(), buf->length());
DestructorGuard guard(this);
folly::via(
threadPool_.get(),
[this, buf = std::move(buf), guard = std::move(guard)]() mutable {
buf->coalesce();
// Remove the fragment framing from the buffer
// XXX: This is O(N^2) in the number of fragments.
auto data = buf->writableData();
auto remain = buf->length();
size_t totalLength = 0;
while (true) {
auto fragmentHeader = folly::Endian::big(*(uint32_t*)data);
auto len = fragmentHeader & 0x7fffffff;
bool isLast = (fragmentHeader & 0x80000000) != 0;
memmove(data, data + sizeof(uint32_t), remain - sizeof(uint32_t));
totalLength += len;
remain -= len + sizeof(uint32_t);
data += len;
if (isLast) {
break;
}
}
XLOG(DBG8) << "Received:\n"
<< folly::hexDump(buf->data(), buf->length());
buf->trimEnd(buf->length() - totalLength);
// Remove the fragment framing from the buffer
// XXX: This is O(N^2) in the number of fragments.
auto data = buf->writableData();
auto remain = buf->length();
size_t totalLength = 0;
while (true) {
auto fragmentHeader = folly::Endian::big(*(uint32_t*)data);
auto len = fragmentHeader & 0x7fffffff;
bool isLast = (fragmentHeader & 0x80000000) != 0;
memmove(data, data + sizeof(uint32_t), remain - sizeof(uint32_t));
totalLength += len;
remain -= len + sizeof(uint32_t);
data += len;
if (isLast) {
break;
}
}
dispatchAndReply(std::move(buf));
buf->trimEnd(buf->length() - totalLength);
dispatchAndReply(std::move(buf), std::move(guard));
});
}
};
@ -263,7 +273,7 @@ void RpcServer::RpcAcceptCallback::connectionAccepted(
using UniquePtr =
std::unique_ptr<RpcTcpHandler, folly::DelayedDestruction::Destructor>;
auto handler = UniquePtr(
new RpcTcpHandler(proc_, std::move(socket)),
new RpcTcpHandler(proc_, std::move(socket), threadPool_),
folly::DelayedDestruction::Destructor());
handler->setup();
}
@ -298,9 +308,11 @@ Future<folly::Unit> RpcServerProcessor::dispatchRpc(
RpcServer::RpcServer(
std::shared_ptr<RpcServerProcessor> proc,
folly::EventBase* evb)
folly::EventBase* evb,
std::shared_ptr<folly::Executor> threadPool)
: evb_(evb),
acceptCb_(new RpcServer::RpcAcceptCallback(proc, evb_)),
acceptCb_(
new RpcServer::RpcAcceptCallback(proc, evb_, std::move(threadPool))),
serverSocket_(new AsyncServerSocket(evb_)) {
// Ask kernel to assign us a port on the loopback interface
serverSocket_->bind(SocketAddress("127.0.0.1", 0));

View File

@ -15,6 +15,10 @@
#include "eden/fs/nfs/portmap/PortmapClient.h"
#include "eden/fs/nfs/rpc/Rpc.h"
namespace folly {
class Executor;
}
namespace facebook::eden {
class RpcServerProcessor {
@ -32,7 +36,16 @@ class RpcServerProcessor {
class RpcServer {
public:
RpcServer(std::shared_ptr<RpcServerProcessor> proc, folly::EventBase* evb);
/**
* Create an RPC server.
*
* Request will be received on the passed EventBase and dispatched to the
* RpcServerProcessor on the passed in threadPool.
*/
RpcServer(
std::shared_ptr<RpcServerProcessor> proc,
folly::EventBase* evb,
std::shared_ptr<folly::Executor> threadPool);
~RpcServer();
void registerService(uint32_t progNumber, uint32_t progVersion);
@ -58,8 +71,12 @@ class RpcServer {
explicit RpcAcceptCallback(
std::shared_ptr<RpcServerProcessor> proc,
folly::EventBase* evb)
: evb_(evb), proc_(proc), guard_(this) {}
folly::EventBase* evb,
std::shared_ptr<folly::Executor> threadPool)
: evb_(evb),
proc_(proc),
threadPool_(std::move(threadPool)),
guard_(this) {}
private:
void connectionAccepted(
@ -74,6 +91,7 @@ class RpcServer {
folly::EventBase* evb_;
std::shared_ptr<RpcServerProcessor> proc_;
std::shared_ptr<folly::Executor> threadPool_;
/**
* Hold a guard to ourself to avoid being deleted until the callback is

View File

@ -346,7 +346,9 @@ EdenServer::EdenServer(
edenConfig->enableNfsServer.getValue()
? std::make_shared<NfsServer>(
edenConfig->registerMountd.getValue(),
mainEventBase_)
mainEventBase_,
edenConfig->numNfsThreads.getValue(),
edenConfig->maxNfsInflightRequests.getValue())
:
#endif
nullptr,