small RpcServer simplifications

Summary:
I'm trying to track down a subtle lifetime rule violation in
EdenMount. While reading RpcServer, I noticed a few possible
simplifications.

Reviewed By: xavierd

Differential Revision: D44849773

fbshipit-source-id: 4c27c47a7e2c211dcd040acce955e9b2f617b55b
This commit is contained in:
Chad Austin 2023-04-20 18:49:40 -07:00 committed by Facebook GitHub Bot
parent 455e7c6efa
commit d8400c6faa
6 changed files with 42 additions and 52 deletions

View File

@ -230,10 +230,9 @@ void Mountd::initialize(folly::SocketAddress addr, bool registerWithRpcbind) {
}
}
void Mountd::initialize(folly::File&& socket) {
void Mountd::initialize(folly::File socket) {
XLOG(DBG7) << "initializing mountd: " << socket.fd();
server_->initialize(
std::move(socket), RpcServer::InitialSocketType::SERVER_SOCKET);
server_->initializeServerSocket(std::move(socket));
}
uint32_t Mountd::getProgramNumber() {

View File

@ -49,7 +49,7 @@ class Mountd {
* host, EdenFS won't be able to register itself.
*/
void initialize(folly::SocketAddress addr, bool registerWithRpcbind);
void initialize(folly::File&& socket);
void initialize(folly::File socket);
uint32_t getProgramNumber();

View File

@ -2046,12 +2046,10 @@ void Nfsd3::initialize(folly::SocketAddress addr, bool registerWithRpcbind) {
}
}
void Nfsd3::initialize(folly::File&& connectedSocket) {
void Nfsd3::initialize(folly::File connectedSocket) {
XLOG(DBG7) << "Initializing nfsd3 with connected socket: "
<< connectedSocket.fd();
server_->initialize(
std::move(connectedSocket),
RpcServer::InitialSocketType::CONNECTED_SOCKET);
server_->initializeConnectedSocket(std::move(connectedSocket));
}
void Nfsd3::invalidate(AbsolutePath path, mode_t mode) {

View File

@ -146,7 +146,7 @@ class Nfsd3 final : public FsChannel {
~Nfsd3();
void initialize(folly::SocketAddress addr, bool registerWithRpcbind);
void initialize(folly::File&& connectedSocket);
void initialize(folly::File connectedSocket);
/**
* Trigger an invalidation for the given path.

View File

@ -199,7 +199,7 @@ folly::SemiFuture<folly::Unit> RpcTcpHandler::resetReader(
.ensure([this, proc = proc_, stopReason]() {
XLOG(DBG7) << "Pending Requests complete;"
<< "finishing destroying this rpc tcp handler";
this->sock_->getEventBase()->dcheckIsInEventBaseThread();
this->sock_->getEventBase()->checkIsInEventBaseThread();
if (auto owningServer = this->owningServer_.lock()) {
owningServer->unregisterRpcHandler(this);
}
@ -555,12 +555,10 @@ RpcServer::RpcServer(
rpcTcpHandlers_{} {}
void RpcServer::initialize(folly::SocketAddress addr) {
evb_->checkIsInEventBaseThread();
acceptCb_.reset(new RpcServer::RpcAcceptCallback{
proc_,
evb_,
threadPool_,
structuredLogger_,
std::weak_ptr<RpcServer>{shared_from_this()}});
proc_, evb_, threadPool_, structuredLogger_, weak_from_this()});
// Ask kernel to assign us a port on the loopback interface
serverSocket_->bind(addr);
@ -570,39 +568,32 @@ void RpcServer::initialize(folly::SocketAddress addr) {
serverSocket_->startAccepting();
}
void RpcServer::initialize(folly::File&& socket, InitialSocketType type) {
switch (type) {
case InitialSocketType::CONNECTED_SOCKET:
XLOG(DBG7) << "Initializing server from connected socket: "
<< socket.fd();
// Note we don't initialize the accepting socket in this case. This is
// meant for server that only ever has one connected socket (nfsd3). Since
// we already have the one connected socket, we will not need the
// accepting socket to make any more connections.
rpcTcpHandlers_.wlock()->emplace_back(RpcTcpHandler::create(
proc_,
AsyncSocket::newSocket(
evb_, folly::NetworkSocket::fromFd(socket.release())),
threadPool_,
structuredLogger_,
shared_from_this()));
return;
case InitialSocketType::SERVER_SOCKET:
XLOG(DBG7) << "Initializing server from server socket: " << socket.fd();
acceptCb_.reset(new RpcServer::RpcAcceptCallback{
proc_,
evb_,
threadPool_,
structuredLogger_,
std::weak_ptr<RpcServer>{shared_from_this()}});
serverSocket_->useExistingSocket(
folly::NetworkSocket::fromFd(socket.release()));
void RpcServer::initializeConnectedSocket(folly::File socket) {
XLOG(DBG7) << "Initializing server from connected socket: " << socket.fd();
// Note we don't initialize the accepting socket in this case. This is
// meant for server that only ever has one connected socket (nfsd3). Since
// we already have the one connected socket, we will not need the
// accepting socket to make any more connections.
rpcTcpHandlers_.wlock()->emplace_back(RpcTcpHandler::create(
proc_,
AsyncSocket::newSocket(
evb_, folly::NetworkSocket::fromFd(socket.release())),
threadPool_,
structuredLogger_,
shared_from_this()));
}
serverSocket_->addAcceptCallback(acceptCb_.get(), evb_);
serverSocket_->startAccepting();
return;
}
throw std::runtime_error("Impossible socket type.");
void RpcServer::initializeServerSocket(folly::File socket) {
evb_->checkIsInEventBaseThread();
XLOG(DBG7) << "Initializing server from server socket: " << socket.fd();
acceptCb_.reset(new RpcServer::RpcAcceptCallback{
proc_, evb_, threadPool_, structuredLogger_, weak_from_this()});
serverSocket_->useExistingSocket(
folly::NetworkSocket::fromFd(socket.release()));
serverSocket_->addAcceptCallback(acceptCb_.get(), evb_);
serverSocket_->startAccepting();
}
void RpcServer::registerRpcHandler(RpcTcpHandler::UniquePtr handler) {
@ -622,7 +613,7 @@ void RpcServer::unregisterRpcHandler(RpcTcpHandler* handlerToErase) {
}
folly::SemiFuture<folly::File> RpcServer::takeoverStop() {
evb_->dcheckIsInEventBaseThread();
evb_->checkIsInEventBaseThread();
XLOG(DBG7) << "Removing accept callback";
if (acceptCb_) {

View File

@ -315,13 +315,15 @@ class RpcServer : public std::enable_shared_from_this<RpcServer> {
*/
void initialize(folly::SocketAddress addr);
enum class InitialSocketType { SERVER_SOCKET, CONNECTED_SOCKET };
/**
* Initialize this server from an existing connected socket.
*/
void initializeConnectedSocket(folly::File socket);
/**
* Initialize this server from an already existing socket. connected indicates
* if this is a connected socket or server socket.
* Initialize this server from an existing server socket.
*/
void initialize(folly::File&& socket, InitialSocketType type);
void initializeServerSocket(folly::File socket);
folly::SemiFuture<folly::File> takeoverStop();