tidy up journal subscribers on unmount

Summary:
We have a couple of issues with the watchman/eden integration:

1. If you "win" the race, you can cause a segfault during shutdown
by changing files during unmount.  This causes the journal updating
code to trigger a send to the client, but the associated eventBase
has already been destroyed.

2. We don't proactively send any signal to the subscriber (in practice: watchman)
when we unmount.  Watchman detects the eden shutdown by noticing that its
socket has closed but has no way to detect an unmount.

This diff tries to connect the unmount with the set of subscribers and tries
to cause the thrift socket to close out.

Reviewed By: bolinfest

Differential Revision: D6162717

fbshipit-source-id: 42d4a005089cd9cddf204997b1643570488f04c3
This commit is contained in:
Wez Furlong 2017-10-27 08:44:19 -07:00 committed by Facebook Github Bot
parent a7caae52fc
commit d3202383c1
5 changed files with 110 additions and 34 deletions

View File

@ -363,6 +363,7 @@ Future<Unit> EdenMount::shutdown() {
} }
Future<Unit> EdenMount::shutdownImpl() { Future<Unit> EdenMount::shutdownImpl() {
journal_.wlock()->cancelAllSubscribers();
XLOG(DBG1) << "beginning shutdown for EdenMount " << getPath(); XLOG(DBG1) << "beginning shutdown for EdenMount " << getPath();
return inodeMap_->shutdown().then([this] { return inodeMap_->shutdown().then([this] {
auto oldState = state_.exchange(State::SHUT_DOWN); auto oldState = state_.exchange(State::SHUT_DOWN);

View File

@ -53,5 +53,13 @@ uint64_t Journal::registerSubscriber(folly::Function<void()>&& callback) {
void Journal::cancelSubscriber(uint64_t id) { void Journal::cancelSubscriber(uint64_t id) {
subscribers_.erase(id); subscribers_.erase(id);
} }
void Journal::cancelAllSubscribers() {
subscribers_.clear();
}
bool Journal::isSubscriberValid(uint64_t id) const {
return subscribers_.find(id) != subscribers_.end();
} }
} }
} // namespace facebook

View File

@ -68,6 +68,9 @@ class Journal {
uint64_t registerSubscriber(folly::Function<void()>&& callback); uint64_t registerSubscriber(folly::Function<void()>&& callback);
void cancelSubscriber(uint64_t id); void cancelSubscriber(uint64_t id);
void cancelAllSubscribers();
bool isSubscriberValid(uint64_t id) const;
private: private:
/** The sequence number that we'll use for the next entry /** The sequence number that we'll use for the next entry
* that we link into the chain */ * that we link into the chain */

View File

@ -16,23 +16,62 @@ using folly::StringPiece;
namespace facebook { namespace facebook {
namespace eden { namespace eden {
StreamingSubscriber::State::State(
StreamingSubscriber::Callback callback,
std::weak_ptr<EdenMount> edenMount)
: callback(std::move(callback)), edenMount(edenMount) {}
StreamingSubscriber::StreamingSubscriber( StreamingSubscriber::StreamingSubscriber(
std::unique_ptr<apache::thrift::StreamingHandlerCallback< Callback callback,
std::unique_ptr<JournalPosition>>> callback,
std::shared_ptr<EdenMount> edenMount) std::shared_ptr<EdenMount> edenMount)
: callback_(std::move(callback)), edenMount_(std::move(edenMount)) {} : state_(folly::in_place, std::move(callback), std::move(edenMount)) {
auto state = state_.wlock();
// Arrange to be told when the eventBase is about to be destroyed
state->callback->getEventBase()->runOnDestruction(this);
}
void StreamingSubscriber::runLoopCallback() noexcept {
auto state = state_.wlock();
if (state->callback) {
// We're called on the eventBase thread so we can call these
// methods directly and tear down the peer. Note that we
// should only get here in the case that the server is being
// shutdown. The individual unmount case is handled by the
// destructor.
state->callback->done();
state->callback.reset();
}
state->eventBaseAlive = false;
}
StreamingSubscriber::~StreamingSubscriber() { StreamingSubscriber::~StreamingSubscriber() {
// NOTE: we can't call callback_->done() directly from here as there is no auto state = state_.wlock();
// guarantee that we'd be destroyed on the correct thread! // If the eventBase is still live then we should tear down the peer
if (state->callback && state->eventBaseAlive) {
auto evb = state->callback->getEventBase();
// Move the callback away; we won't be able to use it
// via state-> again.
evb->runInEventBaseThread(
[callback = std::move(state->callback)]() mutable {
callback->done();
callback.reset();
});
}
} }
void StreamingSubscriber::subscribe() { void StreamingSubscriber::subscribe() {
subscriberId_ = edenMount_->getJournal() // Separately scope the wlock as the schedule() below will attempt
.wlock() // to acquire the lock for itself.
->registerSubscriber([self = shared_from_this()]() { {
self->schedule(); auto state = state_.wlock();
});
auto edenMount = state->edenMount.lock();
DCHECK(edenMount)
<< "we're called with the owner referenced, so this should always be valid";
state->subscriberId = edenMount->getJournal().wlock()->registerSubscriber(
[self = shared_from_this()]() { self->schedule(); });
}
// Suggest to the subscription that the journal has been updated so that // Suggest to the subscription that the journal has been updated so that
// it will compute initial delta information. // it will compute initial delta information.
@ -40,39 +79,51 @@ void StreamingSubscriber::subscribe() {
} }
void StreamingSubscriber::schedule() { void StreamingSubscriber::schedule() {
callback_->getEventBase() auto state = state_.rlock();
->runInEventBaseThread([self = shared_from_this()]() { if (state->callback) {
self->journalUpdated(); state->callback->getEventBase()->runInEventBaseThread(
}); [self = shared_from_this()]() { self->journalUpdated(); });
}
} }
void StreamingSubscriber::journalUpdated() { void StreamingSubscriber::journalUpdated() {
if (!callback_) { auto state = state_.wlock();
if (!state->callback) {
// We were cancelled while this callback was queued up. // We were cancelled while this callback was queued up.
// There's nothing for us to do now. // There's nothing for us to do now.
return; return;
} }
if (!callback_->isRequestActive()) { auto edenMount = state->edenMount.lock();
// Peer disconnected, so tear down the subscription bool tearDown = !edenMount || !state->callback->isRequestActive();
// TODO: is this the right way to detect this?
if (!tearDown &&
!edenMount->getJournal().rlock()->isSubscriberValid(
state->subscriberId)) {
tearDown = true;
}
if (tearDown) {
XLOG(DBG1) << "Subscription is no longer active"; XLOG(DBG1) << "Subscription is no longer active";
edenMount_->getJournal().wlock()->cancelSubscriber(subscriberId_); if (edenMount) {
callback_->done(); edenMount->getJournal().wlock()->cancelSubscriber(state->subscriberId);
callback_.reset(); }
state->callback->done();
state->callback.reset();
return; return;
} }
JournalPosition pos; JournalPosition pos;
auto delta = edenMount_->getJournal().rlock()->getLatest(); auto delta = edenMount->getJournal().rlock()->getLatest();
pos.sequenceNumber = delta->toSequence; pos.sequenceNumber = delta->toSequence;
pos.snapshotHash = StringPiece(delta->toHash.getBytes()).str(); pos.snapshotHash = StringPiece(delta->toHash.getBytes()).str();
pos.mountGeneration = edenMount_->getMountGeneration(); pos.mountGeneration = edenMount->getMountGeneration();
try { try {
// And send it // And send it
callback_->write(pos); state->callback->write(pos);
} catch (const std::exception& exc) { } catch (const std::exception& exc) {
XLOG(ERR) << "Error while sending subscription update: " << exc.what(); XLOG(ERR) << "Error while sending subscription update: " << exc.what();
} }

View File

@ -29,12 +29,12 @@ namespace eden {
*/ */
class StreamingSubscriber class StreamingSubscriber
: public std::enable_shared_from_this<StreamingSubscriber> { : public std::enable_shared_from_this<StreamingSubscriber>,
private folly::EventBase::LoopCallback {
public: public:
StreamingSubscriber( using Callback = std::unique_ptr<apache::thrift::StreamingHandlerCallback<
std::unique_ptr<apache::thrift::StreamingHandlerCallback< std::unique_ptr<JournalPosition>>>;
std::unique_ptr<JournalPosition>>> callback, StreamingSubscriber(Callback callback, std::shared_ptr<EdenMount> edenMount);
std::shared_ptr<EdenMount> edenMount);
~StreamingSubscriber(); ~StreamingSubscriber();
/** Establishes a subscription with the journal in the edenMount /** Establishes a subscription with the journal in the edenMount
@ -57,11 +57,24 @@ class StreamingSubscriber
* This is ensured by only ever calling it via the schedule() method. */ * This is ensured by only ever calling it via the schedule() method. */
void journalUpdated(); void journalUpdated();
std::unique_ptr<apache::thrift::StreamingHandlerCallback< /** We implement LoopCallback so that we can get notified when the
std::unique_ptr<JournalPosition>>> * eventBase is about to be destroyed. The other option for lifetime
callback_; * management is KeepAlive tokens but those are not suitable for us
std::shared_ptr<EdenMount> edenMount_; * because we rely on the thrift eventBase threads terminating their
uint64_t subscriberId_; * loops before we trigger our shutdown code. KeepAlive tokens block
* that from happening. The next best thing is to get notified of
* destruction and then atomically reconcile our state. */
void runLoopCallback() noexcept override;
struct State {
Callback callback;
std::weak_ptr<EdenMount> edenMount;
uint64_t subscriberId{0};
bool eventBaseAlive{true};
State(Callback callback, std::weak_ptr<EdenMount> edenMount);
};
folly::Synchronized<State> state_;
}; };
} }
} }