From d8400c6faa2fcb3e611a914686680c5402e95ac6 Mon Sep 17 00:00:00 2001 From: Chad Austin Date: Thu, 20 Apr 2023 18:49:40 -0700 Subject: [PATCH] small RpcServer simplifications Summary: I'm trying to track down a subtle lifetime rule violation in EdenMount. While reading RpcServer, I noticed a few possible simplifications. Reviewed By: xavierd Differential Revision: D44849773 fbshipit-source-id: 4c27c47a7e2c211dcd040acce955e9b2f617b55b --- eden/fs/nfs/Mountd.cpp | 5 ++- eden/fs/nfs/Mountd.h | 2 +- eden/fs/nfs/Nfsd3.cpp | 6 ++-- eden/fs/nfs/Nfsd3.h | 2 +- eden/fs/nfs/rpc/Server.cpp | 69 +++++++++++++++++--------------------- eden/fs/nfs/rpc/Server.h | 10 +++--- 6 files changed, 42 insertions(+), 52 deletions(-) diff --git a/eden/fs/nfs/Mountd.cpp b/eden/fs/nfs/Mountd.cpp index d7187471de..9334a013e0 100644 --- a/eden/fs/nfs/Mountd.cpp +++ b/eden/fs/nfs/Mountd.cpp @@ -230,10 +230,9 @@ void Mountd::initialize(folly::SocketAddress addr, bool registerWithRpcbind) { } } -void Mountd::initialize(folly::File&& socket) { +void Mountd::initialize(folly::File socket) { XLOG(DBG7) << "initializing mountd: " << socket.fd(); - server_->initialize( - std::move(socket), RpcServer::InitialSocketType::SERVER_SOCKET); + server_->initializeServerSocket(std::move(socket)); } uint32_t Mountd::getProgramNumber() { diff --git a/eden/fs/nfs/Mountd.h b/eden/fs/nfs/Mountd.h index e4c9252e03..dc26f66baa 100644 --- a/eden/fs/nfs/Mountd.h +++ b/eden/fs/nfs/Mountd.h @@ -49,7 +49,7 @@ class Mountd { * host, EdenFS won't be able to register itself. */ void initialize(folly::SocketAddress addr, bool registerWithRpcbind); - void initialize(folly::File&& socket); + void initialize(folly::File socket); uint32_t getProgramNumber(); diff --git a/eden/fs/nfs/Nfsd3.cpp b/eden/fs/nfs/Nfsd3.cpp index f77b0ba3b9..b791fdb187 100644 --- a/eden/fs/nfs/Nfsd3.cpp +++ b/eden/fs/nfs/Nfsd3.cpp @@ -2046,12 +2046,10 @@ void Nfsd3::initialize(folly::SocketAddress addr, bool registerWithRpcbind) { } } -void Nfsd3::initialize(folly::File&& connectedSocket) { +void Nfsd3::initialize(folly::File connectedSocket) { XLOG(DBG7) << "Initializing nfsd3 with connected socket: " << connectedSocket.fd(); - server_->initialize( - std::move(connectedSocket), - RpcServer::InitialSocketType::CONNECTED_SOCKET); + server_->initializeConnectedSocket(std::move(connectedSocket)); } void Nfsd3::invalidate(AbsolutePath path, mode_t mode) { diff --git a/eden/fs/nfs/Nfsd3.h b/eden/fs/nfs/Nfsd3.h index d47fdcdea1..af33d99966 100644 --- a/eden/fs/nfs/Nfsd3.h +++ b/eden/fs/nfs/Nfsd3.h @@ -146,7 +146,7 @@ class Nfsd3 final : public FsChannel { ~Nfsd3(); void initialize(folly::SocketAddress addr, bool registerWithRpcbind); - void initialize(folly::File&& connectedSocket); + void initialize(folly::File connectedSocket); /** * Trigger an invalidation for the given path. diff --git a/eden/fs/nfs/rpc/Server.cpp b/eden/fs/nfs/rpc/Server.cpp index cf5566483b..d73f26d1a5 100644 --- a/eden/fs/nfs/rpc/Server.cpp +++ b/eden/fs/nfs/rpc/Server.cpp @@ -199,7 +199,7 @@ folly::SemiFuture RpcTcpHandler::resetReader( .ensure([this, proc = proc_, stopReason]() { XLOG(DBG7) << "Pending Requests complete;" << "finishing destroying this rpc tcp handler"; - this->sock_->getEventBase()->dcheckIsInEventBaseThread(); + this->sock_->getEventBase()->checkIsInEventBaseThread(); if (auto owningServer = this->owningServer_.lock()) { owningServer->unregisterRpcHandler(this); } @@ -555,12 +555,10 @@ RpcServer::RpcServer( rpcTcpHandlers_{} {} void RpcServer::initialize(folly::SocketAddress addr) { + evb_->checkIsInEventBaseThread(); + acceptCb_.reset(new RpcServer::RpcAcceptCallback{ - proc_, - evb_, - threadPool_, - structuredLogger_, - std::weak_ptr{shared_from_this()}}); + proc_, evb_, threadPool_, structuredLogger_, weak_from_this()}); // Ask kernel to assign us a port on the loopback interface serverSocket_->bind(addr); @@ -570,39 +568,32 @@ void RpcServer::initialize(folly::SocketAddress addr) { serverSocket_->startAccepting(); } -void RpcServer::initialize(folly::File&& socket, InitialSocketType type) { - switch (type) { - case InitialSocketType::CONNECTED_SOCKET: - XLOG(DBG7) << "Initializing server from connected socket: " - << socket.fd(); - // Note we don't initialize the accepting socket in this case. This is - // meant for server that only ever has one connected socket (nfsd3). Since - // we already have the one connected socket, we will not need the - // accepting socket to make any more connections. - rpcTcpHandlers_.wlock()->emplace_back(RpcTcpHandler::create( - proc_, - AsyncSocket::newSocket( - evb_, folly::NetworkSocket::fromFd(socket.release())), - threadPool_, - structuredLogger_, - shared_from_this())); - return; - case InitialSocketType::SERVER_SOCKET: - XLOG(DBG7) << "Initializing server from server socket: " << socket.fd(); - acceptCb_.reset(new RpcServer::RpcAcceptCallback{ - proc_, - evb_, - threadPool_, - structuredLogger_, - std::weak_ptr{shared_from_this()}}); - serverSocket_->useExistingSocket( - folly::NetworkSocket::fromFd(socket.release())); +void RpcServer::initializeConnectedSocket(folly::File socket) { + XLOG(DBG7) << "Initializing server from connected socket: " << socket.fd(); + // Note we don't initialize the accepting socket in this case. This is + // meant for server that only ever has one connected socket (nfsd3). Since + // we already have the one connected socket, we will not need the + // accepting socket to make any more connections. + rpcTcpHandlers_.wlock()->emplace_back(RpcTcpHandler::create( + proc_, + AsyncSocket::newSocket( + evb_, folly::NetworkSocket::fromFd(socket.release())), + threadPool_, + structuredLogger_, + shared_from_this())); +} - serverSocket_->addAcceptCallback(acceptCb_.get(), evb_); - serverSocket_->startAccepting(); - return; - } - throw std::runtime_error("Impossible socket type."); +void RpcServer::initializeServerSocket(folly::File socket) { + evb_->checkIsInEventBaseThread(); + + XLOG(DBG7) << "Initializing server from server socket: " << socket.fd(); + acceptCb_.reset(new RpcServer::RpcAcceptCallback{ + proc_, evb_, threadPool_, structuredLogger_, weak_from_this()}); + serverSocket_->useExistingSocket( + folly::NetworkSocket::fromFd(socket.release())); + + serverSocket_->addAcceptCallback(acceptCb_.get(), evb_); + serverSocket_->startAccepting(); } void RpcServer::registerRpcHandler(RpcTcpHandler::UniquePtr handler) { @@ -622,7 +613,7 @@ void RpcServer::unregisterRpcHandler(RpcTcpHandler* handlerToErase) { } folly::SemiFuture RpcServer::takeoverStop() { - evb_->dcheckIsInEventBaseThread(); + evb_->checkIsInEventBaseThread(); XLOG(DBG7) << "Removing accept callback"; if (acceptCb_) { diff --git a/eden/fs/nfs/rpc/Server.h b/eden/fs/nfs/rpc/Server.h index c3cb496090..2313e2b7e1 100644 --- a/eden/fs/nfs/rpc/Server.h +++ b/eden/fs/nfs/rpc/Server.h @@ -315,13 +315,15 @@ class RpcServer : public std::enable_shared_from_this { */ void initialize(folly::SocketAddress addr); - enum class InitialSocketType { SERVER_SOCKET, CONNECTED_SOCKET }; + /** + * Initialize this server from an existing connected socket. + */ + void initializeConnectedSocket(folly::File socket); /** - * Initialize this server from an already existing socket. connected indicates - * if this is a connected socket or server socket. + * Initialize this server from an existing server socket. */ - void initialize(folly::File&& socket, InitialSocketType type); + void initializeServerSocket(folly::File socket); folly::SemiFuture takeoverStop();