mirror of
https://github.com/facebook/sapling.git
synced 2024-10-05 14:28:17 +03:00
add attachEventBase()/detachEventBase() to UnixSocket
Summary: Add methods to UnixSocket and FutureUnixSocket to attach and detach from an EventBase. This makes it possible to construct a UnixSocket object without having an EventBase yet and then attach it to an EventBase later. Reviewed By: bolinfest Differential Revision: D8053423 fbshipit-source-id: c4de00166dbc0e075b4e4cd81c3dd5b377ea9a52
This commit is contained in:
parent
dac67ee976
commit
163d92d617
@ -162,6 +162,10 @@ uid_t FutureUnixSocket::getRemoteUID() {
|
||||
return socket_->getRemoteUID();
|
||||
}
|
||||
|
||||
void FutureUnixSocket::closeNow() {
|
||||
socket_.reset();
|
||||
}
|
||||
|
||||
Future<Unit> FutureUnixSocket::send(Message&& msg) {
|
||||
if (!socket_) {
|
||||
return makeFuture<Unit>(
|
||||
|
@ -70,6 +70,31 @@ class FutureUnixSocket : private UnixSocket::ReceiveCallback {
|
||||
return socket_->getEventBase();
|
||||
}
|
||||
|
||||
/**
|
||||
* Attach this socket to an EventBase.
|
||||
*
|
||||
* This should only be called to set the EventBase if the UnixSocket
|
||||
* constructor was called with a null EventBase. If the EventBase was not
|
||||
* set in the constructor then attachEventBase() must be called before any
|
||||
* calls to send() or setReceiveCallback().
|
||||
*
|
||||
* This method may only be called from the EventBase's thread. If the
|
||||
* EventBase has not been started yet it may be called from another thread if
|
||||
* that thread is the only thread accessing the EventBase.
|
||||
*/
|
||||
void attachEventBase(folly::EventBase* eventBase) {
|
||||
socket_->attachEventBase(eventBase);
|
||||
}
|
||||
|
||||
/**
|
||||
* Detach from the EventBase that is being used to drive this socket.
|
||||
*
|
||||
* This may only be called from the EventBase thread.
|
||||
*/
|
||||
void detachEventBase() {
|
||||
socket_->detachEventBase();
|
||||
}
|
||||
|
||||
void setSendTimeout(std::chrono::milliseconds timeout) {
|
||||
return socket_->setSendTimeout(timeout);
|
||||
}
|
||||
@ -82,6 +107,13 @@ class FutureUnixSocket : private UnixSocket::ReceiveCallback {
|
||||
return socket_.get() != nullptr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the socket immediately.
|
||||
*
|
||||
* This aborts any send() and receive() calls that are in progress.
|
||||
*/
|
||||
void closeNow();
|
||||
|
||||
/**
|
||||
* Get the user ID of the remote peer.
|
||||
*/
|
||||
|
@ -23,7 +23,6 @@
|
||||
#include <new>
|
||||
|
||||
#include "eden/fs/utils/Bug.h"
|
||||
#include "eden/fs/utils/ControlMsg.h"
|
||||
|
||||
using folly::ByteRange;
|
||||
using folly::EventBase;
|
||||
@ -45,6 +44,17 @@ using std::vector;
|
||||
namespace facebook {
|
||||
namespace eden {
|
||||
|
||||
namespace {
|
||||
/**
|
||||
* The maximum number of file descriptors that can be sent in a SCM_RIGHTS
|
||||
* control message.
|
||||
*
|
||||
* Linux internally defines this to 253 using the SCM_MAX_FD constant in
|
||||
* linux/include/net/scm.h
|
||||
*/
|
||||
constexpr size_t kMaxFDs = 253;
|
||||
} // namespace
|
||||
|
||||
class UnixSocket::Connector : private folly::EventHandler, folly::AsyncTimeout {
|
||||
public:
|
||||
Connector(ConnectCallback* callback, EventBase* eventBase, File socket)
|
||||
@ -95,7 +105,7 @@ UnixSocket::UnixSocket(EventBase* eventBase, File socket)
|
||||
socket_{std::move(socket)},
|
||||
// Create recvControlBuffer_ with enough capacity to receive
|
||||
// the maximum number of file descriptors that can be sent at once.
|
||||
recvControlBuffer_(CMSG_SPACE(ControlMsg::kMaxFDs * sizeof(int))) {}
|
||||
recvControlBuffer_(CMSG_SPACE(kMaxFDs * sizeof(int))) {}
|
||||
|
||||
UnixSocket::~UnixSocket() {
|
||||
// The destructor should generally remain empty.
|
||||
@ -111,6 +121,20 @@ void UnixSocket::destroy() {
|
||||
DelayedDestruction::destroy();
|
||||
}
|
||||
|
||||
void UnixSocket::attachEventBase(folly::EventBase* eventBase) {
|
||||
DCHECK(!eventBase_);
|
||||
eventBase_ = eventBase;
|
||||
EventHandler::attachEventBase(eventBase);
|
||||
AsyncTimeout::attachEventBase(eventBase);
|
||||
}
|
||||
|
||||
void UnixSocket::detachEventBase() {
|
||||
DCHECK(eventBase_);
|
||||
eventBase_ = nullptr;
|
||||
EventHandler::detachEventBase();
|
||||
AsyncTimeout::detachEventBase();
|
||||
}
|
||||
|
||||
void UnixSocket::connect(
|
||||
ConnectCallback* callback,
|
||||
EventBase* eventBase,
|
||||
@ -571,7 +595,7 @@ size_t UnixSocket::initializeFirstControlMsg(
|
||||
}
|
||||
|
||||
// Compute how much space we need for the control data
|
||||
size_t fdsToSend = std::min(ControlMsg::kMaxFDs, message.files.size());
|
||||
size_t fdsToSend = std::min(kMaxFDs, message.files.size());
|
||||
size_t cmsgSpace = CMSG_SPACE(fdsToSend * sizeof(int));
|
||||
|
||||
// Allocate the buffer
|
||||
@ -604,8 +628,7 @@ size_t UnixSocket::initializeAdditionalControlMsg(
|
||||
DCHECK(!message.files.empty());
|
||||
DCHECK_GT(entry->filesSent, 0);
|
||||
|
||||
size_t fdsToSend =
|
||||
std::min(ControlMsg::kMaxFDs, message.files.size() - entry->filesSent);
|
||||
size_t fdsToSend = std::min(kMaxFDs, message.files.size() - entry->filesSent);
|
||||
auto cmsgSpace = CMSG_SPACE(fdsToSend * sizeof(int));
|
||||
|
||||
controlBuf.resize(cmsgSpace);
|
||||
|
@ -176,6 +176,25 @@ class UnixSocket : public folly::DelayedDestruction,
|
||||
return eventBase_;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attach this socket to an EventBase.
|
||||
*
|
||||
* This should only be called to set the EventBase if the UnixSocket
|
||||
* constructor was called with a null EventBase. If the EventBase was not
|
||||
* set in the constructor then attachEventBase() must be called before any
|
||||
* calls to send() or setReceiveCallback().
|
||||
*
|
||||
* This method may only be called from the EventBase's thread.
|
||||
*/
|
||||
void attachEventBase(folly::EventBase* eventBase);
|
||||
|
||||
/**
|
||||
* Detach from the EventBase that is being used to drive this socket.
|
||||
*
|
||||
* This may only be called from the EventBase thread.
|
||||
*/
|
||||
void detachEventBase();
|
||||
|
||||
/**
|
||||
* Destroy the UnixSocket object.
|
||||
*
|
||||
|
@ -244,3 +244,53 @@ TEST(FutureUnixSocket, receiveQueue) {
|
||||
}
|
||||
EXPECT_EQ(sendMessages.size(), receivedMessages.size());
|
||||
}
|
||||
|
||||
TEST(FutureUnixSocket, attachEventBase) {
|
||||
// A helper function to attach sockets to an EventBase, send a message, then
|
||||
// detach from the EventBase
|
||||
auto test = [&](EventBase* evb, FutureUnixSocket& s1, FutureUnixSocket& s2) {
|
||||
s1.attachEventBase(evb);
|
||||
s2.attachEventBase(evb);
|
||||
SCOPE_EXIT {
|
||||
s1.detachEventBase();
|
||||
s2.detachEventBase();
|
||||
};
|
||||
|
||||
const std::string msgData(100, 'a');
|
||||
s1.send(UnixSocket::Message(IOBuf(IOBuf::COPY_BUFFER, msgData)))
|
||||
.then([] { XLOG(DBG3) << "send complete"; })
|
||||
.onError([](const folly::exception_wrapper& ew) {
|
||||
ADD_FAILURE() << "send error: " << ew.what();
|
||||
});
|
||||
folly::Optional<UnixSocket::Message> receivedMessage;
|
||||
s2.receive(500ms)
|
||||
.then([&receivedMessage](UnixSocket::Message&& msg) {
|
||||
receivedMessage = std::move(msg);
|
||||
})
|
||||
.onError([](const folly::exception_wrapper& ew) {
|
||||
ADD_FAILURE() << "receive error: " << ew.what();
|
||||
})
|
||||
.ensure([&]() { evb->terminateLoopSoon(); });
|
||||
|
||||
evb->loopForever();
|
||||
|
||||
EXPECT_TRUE(receivedMessage.hasValue());
|
||||
EXPECT_EQ(msgData, receivedMessage->data.moveToFbString().toStdString());
|
||||
};
|
||||
|
||||
// Create two sockets that are initially not attached to an EventBase
|
||||
auto sockets = createSocketPair();
|
||||
auto socket1 = FutureUnixSocket(nullptr, std::move(sockets.first));
|
||||
auto socket2 = FutureUnixSocket(nullptr, std::move(sockets.second));
|
||||
|
||||
// Test on one EventBase
|
||||
{
|
||||
EventBase evb1;
|
||||
test(&evb1, socket1, socket2);
|
||||
}
|
||||
// Now test using another EventBase
|
||||
{
|
||||
EventBase evb2;
|
||||
test(&evb2, socket2, socket1);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user