diff --git a/eden/fs/nfs/Nfsd3.cpp b/eden/fs/nfs/Nfsd3.cpp index 265c2761c2..45be6a8378 100644 --- a/eden/fs/nfs/Nfsd3.cpp +++ b/eden/fs/nfs/Nfsd3.cpp @@ -18,6 +18,7 @@ #include #include "eden/fs/nfs/NfsdRpc.h" #include "eden/fs/utils/Clock.h" +#include "eden/fs/utils/IDGen.h" #include "eden/fs/utils/SystemError.h" namespace folly { @@ -27,6 +28,10 @@ class Executor; namespace facebook::eden { namespace { +constexpr size_t kTraceBusCapacity = 25000; +static_assert(sizeof(NfsTraceEvent) == 40); +static_assert(kTraceBusCapacity * sizeof(NfsTraceEvent) == 1000000); + class Nfsd3ServerProcessor final : public RpcServerProcessor { public: explicit Nfsd3ServerProcessor( @@ -34,12 +39,16 @@ class Nfsd3ServerProcessor final : public RpcServerProcessor { const folly::Logger* straceLogger, CaseSensitivity caseSensitive, uint32_t iosize, - folly::Promise& stopPromise) + folly::Promise& stopPromise, + std::atomic& traceDetailedArguments, + std::shared_ptr>& traceBus) : dispatcher_(std::move(dispatcher)), straceLogger_(straceLogger), caseSensitive_(caseSensitive), iosize_(iosize), - stopPromise_{stopPromise} {} + stopPromise_{stopPromise}, + traceDetailedArguments_(traceDetailedArguments), + traceBus_(traceBus) {} Nfsd3ServerProcessor(const Nfsd3ServerProcessor&) = delete; Nfsd3ServerProcessor(Nfsd3ServerProcessor&&) = delete; @@ -113,6 +122,8 @@ class Nfsd3ServerProcessor final : public RpcServerProcessor { // lifetime of nfs3d. The way we currently enforce this is by waiting for // this promise to be set before destroying of the nfs3d. folly::Promise& stopPromise_; + std::atomic& traceDetailedArguments_; + std::shared_ptr>& traceBus_; }; /** @@ -1742,7 +1753,16 @@ ImmediateFuture Nfsd3ServerProcessor::dispatchRpc( "{}({})", handlerEntry.name, handlerEntry.formatArgs(deser)); - return (this->*handlerEntry.handler)(std::move(deser), std::move(ser), xid); + if (traceDetailedArguments_.load(std::memory_order_acquire)) { + traceBus_->publish( + NfsTraceEvent::start(xid, procNumber, handlerEntry.formatArgs(deser))); + } else { + traceBus_->publish(NfsTraceEvent::start(xid, procNumber)); + } + return (this->*handlerEntry.handler)(std::move(deser), std::move(ser), xid) + .ensure([this, xid, procNumber]() { + traceBus_->publish(NfsTraceEvent::finish(xid, procNumber)); + }); } void Nfsd3ServerProcessor::onSocketClosed() { @@ -1769,12 +1789,36 @@ Nfsd3::Nfsd3( straceLogger, caseSensitive, iosize, - stopPromise_), + stopPromise_, + traceDetailedArguments_, + traceBus_), evb, std::move(threadPool)), processAccessLog_(std::move(processNameCache)), invalidationExecutor_{ - folly::SerialExecutor::create(folly::getGlobalCPUExecutor())} {} + folly::SerialExecutor::create(folly::getGlobalCPUExecutor())}, + traceDetailedArguments_{0}, + traceBus_{ + TraceBus::create("NfsTrace", kTraceBusCapacity)} { + traceSubscriptionHandles_.push_back(traceBus_->subscribeFunction( + "NFS request tracking", [this](const NfsTraceEvent& event) { + switch (event.getType()) { + case NfsTraceEvent::START: { + auto state = telemetryState_.wlock(); + // allow duplicated calls since the client may retry a request + (void)state->requests.emplace( + event.getXid(), OutstandingRequest{event.getXid()}); + break; + } + case NfsTraceEvent::FINISH: { + auto state = telemetryState_.wlock(); + // allow duplicated calls since the client may retry a request + (void)state->requests.erase(event.getXid()); + break; + } + } + })); +} void Nfsd3::initialize(folly::SocketAddress addr, bool registerWithRpcbind) { server_.initialize(addr); @@ -1812,6 +1856,24 @@ folly::Future Nfsd3::flushInvalidations() { return result; } +std::vector Nfsd3::getOutstandingRequests() { + std::vector outstandingCalls; + + for (const auto& entry : telemetryState_.rlock()->requests) { + outstandingCalls.push_back(entry.second); + } + return outstandingCalls; +} + +TraceDetailedArgumentsHandle Nfsd3::traceDetailedArguments() { + auto handle = + std::shared_ptr(nullptr, [© = traceDetailedArguments_](void*) { + copy.fetch_sub(1, std::memory_order_acq_rel); + }); + traceDetailedArguments_.fetch_add(1, std::memory_order_acq_rel); + return handle; +}; + Nfsd3::~Nfsd3() { // TODO(xavierd): wait for the pending requests, // Note the socket will already have been torn down, as this is only destroyed diff --git a/eden/fs/nfs/Nfsd3.h b/eden/fs/nfs/Nfsd3.h index d204d52681..4dfe6e0247 100644 --- a/eden/fs/nfs/Nfsd3.h +++ b/eden/fs/nfs/Nfsd3.h @@ -14,6 +14,7 @@ #include "eden/fs/nfs/NfsDispatcher.h" #include "eden/fs/nfs/rpc/Server.h" +#include "eden/fs/telemetry/TraceBus.h" #include "eden/fs/utils/CaseSensitivity.h" #include "eden/fs/utils/ProcessAccessLog.h" @@ -26,6 +27,61 @@ namespace facebook::eden { class Notifications; class ProcessNameCache; +using TraceDetailedArgumentsHandle = std::shared_ptr; + +struct NfsTraceEvent : TraceEventBase { + enum Type : unsigned char { + START, + FINISH, + }; + + NfsTraceEvent() = delete; + + static NfsTraceEvent start(uint32_t xid, uint32_t procNumber) { + return NfsTraceEvent{ + xid, procNumber, StartDetails{std::unique_ptr{}}}; + } + + static NfsTraceEvent + start(uint32_t xid, uint32_t procNumber, std::string&& args) { + return NfsTraceEvent{ + xid, procNumber, StartDetails{std::make_unique(args)}}; + } + + static NfsTraceEvent finish(uint32_t xid, uint32_t procNumber) { + return NfsTraceEvent{xid, procNumber, FinishDetails{}}; + } + + Type getType() const { + return std::holds_alternative(details_) ? Type::START + : Type::FINISH; + } + + uint32_t getXid() const { + return xid_; + } + + uint32_t getProcNumber() const { + return procNumber_; + } + + private: + struct StartDetails { + std::unique_ptr arguments; + }; + + struct FinishDetails {}; + + using Details = std::variant; + + NfsTraceEvent(uint32_t xid, uint32_t procNumber, Details&& details) + : xid_{xid}, procNumber_{procNumber}, details_{std::move(details)} {} + + uint32_t xid_; + uint32_t procNumber_; + Details details_; +}; + class Nfsd3 { public: /** @@ -97,6 +153,10 @@ class Nfsd3 { struct StopData {}; + struct OutstandingRequest { + uint32_t xid; + }; + /** * Return a future that will be triggered on unmount. */ @@ -111,11 +171,38 @@ class Nfsd3 { Nfsd3& operator=(const Nfsd3&) = delete; Nfsd3& operator=(Nfsd3&&) = delete; + /** + * Returns the approximate set of outstanding NFS requests. Since + * telemetry is tracked on a background thread, the result may very slightly + * lag reality. + */ + std::vector getOutstandingRequests(); + + /** + * While the returned handle is alive, NfsTraceEvents published on the + * TraceBus will have detailed argument strings. + */ + TraceDetailedArgumentsHandle traceDetailedArguments(); + + TraceBus& getTraceBus() { + return *traceBus_; + } + private: + struct TelemetryState { + std::unordered_map requests; + }; + folly::Synchronized telemetryState_; + std::vector> traceSubscriptionHandles_; + folly::Promise stopPromise_; RpcServer server_; ProcessAccessLog processAccessLog_; folly::Executor::KeepAlive invalidationExecutor_; + std::atomic traceDetailedArguments_; + // The TraceBus must be the last member because its subscribed functions may + // close over `this` and can run until the TraceBus itself is deallocated. + std::shared_ptr> traceBus_; }; } // namespace facebook::eden