diff --git a/eden/fs/inodes/EdenMount.cpp b/eden/fs/inodes/EdenMount.cpp index d9f5c24aa8..1e4ae844d3 100644 --- a/eden/fs/inodes/EdenMount.cpp +++ b/eden/fs/inodes/EdenMount.cpp @@ -363,6 +363,7 @@ Future EdenMount::shutdown() { } Future EdenMount::shutdownImpl() { + journal_.wlock()->cancelAllSubscribers(); XLOG(DBG1) << "beginning shutdown for EdenMount " << getPath(); return inodeMap_->shutdown().then([this] { auto oldState = state_.exchange(State::SHUT_DOWN); diff --git a/eden/fs/journal/Journal.cpp b/eden/fs/journal/Journal.cpp index 1488b06955..f0d43d7ca7 100644 --- a/eden/fs/journal/Journal.cpp +++ b/eden/fs/journal/Journal.cpp @@ -53,5 +53,13 @@ uint64_t Journal::registerSubscriber(folly::Function&& callback) { void Journal::cancelSubscriber(uint64_t id) { subscribers_.erase(id); } + +void Journal::cancelAllSubscribers() { + subscribers_.clear(); +} + +bool Journal::isSubscriberValid(uint64_t id) const { + return subscribers_.find(id) != subscribers_.end(); } } +} // namespace facebook diff --git a/eden/fs/journal/Journal.h b/eden/fs/journal/Journal.h index ba0f1e0df2..96e7d1be93 100644 --- a/eden/fs/journal/Journal.h +++ b/eden/fs/journal/Journal.h @@ -68,6 +68,9 @@ class Journal { uint64_t registerSubscriber(folly::Function&& callback); void cancelSubscriber(uint64_t id); + void cancelAllSubscribers(); + bool isSubscriberValid(uint64_t id) const; + private: /** The sequence number that we'll use for the next entry * that we link into the chain */ diff --git a/eden/fs/service/StreamingSubscriber.cpp b/eden/fs/service/StreamingSubscriber.cpp index 0db423970d..faaad02bee 100644 --- a/eden/fs/service/StreamingSubscriber.cpp +++ b/eden/fs/service/StreamingSubscriber.cpp @@ -16,23 +16,62 @@ using folly::StringPiece; namespace facebook { namespace eden { +StreamingSubscriber::State::State( + StreamingSubscriber::Callback callback, + std::weak_ptr edenMount) + : callback(std::move(callback)), edenMount(edenMount) {} + StreamingSubscriber::StreamingSubscriber( - std::unique_ptr>> callback, + Callback callback, std::shared_ptr edenMount) - : callback_(std::move(callback)), edenMount_(std::move(edenMount)) {} + : state_(folly::in_place, std::move(callback), std::move(edenMount)) { + auto state = state_.wlock(); + // Arrange to be told when the eventBase is about to be destroyed + state->callback->getEventBase()->runOnDestruction(this); +} + +void StreamingSubscriber::runLoopCallback() noexcept { + auto state = state_.wlock(); + if (state->callback) { + // We're called on the eventBase thread so we can call these + // methods directly and tear down the peer. Note that we + // should only get here in the case that the server is being + // shutdown. The individual unmount case is handled by the + // destructor. + state->callback->done(); + state->callback.reset(); + } + state->eventBaseAlive = false; +} StreamingSubscriber::~StreamingSubscriber() { - // NOTE: we can't call callback_->done() directly from here as there is no - // guarantee that we'd be destroyed on the correct thread! + auto state = state_.wlock(); + // If the eventBase is still live then we should tear down the peer + if (state->callback && state->eventBaseAlive) { + auto evb = state->callback->getEventBase(); + + // Move the callback away; we won't be able to use it + // via state-> again. + evb->runInEventBaseThread( + [callback = std::move(state->callback)]() mutable { + callback->done(); + callback.reset(); + }); + } } void StreamingSubscriber::subscribe() { - subscriberId_ = edenMount_->getJournal() - .wlock() - ->registerSubscriber([self = shared_from_this()]() { - self->schedule(); - }); + // Separately scope the wlock as the schedule() below will attempt + // to acquire the lock for itself. + { + auto state = state_.wlock(); + + auto edenMount = state->edenMount.lock(); + DCHECK(edenMount) + << "we're called with the owner referenced, so this should always be valid"; + state->subscriberId = edenMount->getJournal().wlock()->registerSubscriber( + [self = shared_from_this()]() { self->schedule(); }); + } // Suggest to the subscription that the journal has been updated so that // it will compute initial delta information. @@ -40,39 +79,51 @@ void StreamingSubscriber::subscribe() { } void StreamingSubscriber::schedule() { - callback_->getEventBase() - ->runInEventBaseThread([self = shared_from_this()]() { - self->journalUpdated(); - }); + auto state = state_.rlock(); + if (state->callback) { + state->callback->getEventBase()->runInEventBaseThread( + [self = shared_from_this()]() { self->journalUpdated(); }); + } } void StreamingSubscriber::journalUpdated() { - if (!callback_) { + auto state = state_.wlock(); + + if (!state->callback) { // We were cancelled while this callback was queued up. // There's nothing for us to do now. return; } - if (!callback_->isRequestActive()) { - // Peer disconnected, so tear down the subscription - // TODO: is this the right way to detect this? + auto edenMount = state->edenMount.lock(); + bool tearDown = !edenMount || !state->callback->isRequestActive(); + + if (!tearDown && + !edenMount->getJournal().rlock()->isSubscriberValid( + state->subscriberId)) { + tearDown = true; + } + + if (tearDown) { XLOG(DBG1) << "Subscription is no longer active"; - edenMount_->getJournal().wlock()->cancelSubscriber(subscriberId_); - callback_->done(); - callback_.reset(); + if (edenMount) { + edenMount->getJournal().wlock()->cancelSubscriber(state->subscriberId); + } + state->callback->done(); + state->callback.reset(); return; } JournalPosition pos; - auto delta = edenMount_->getJournal().rlock()->getLatest(); + auto delta = edenMount->getJournal().rlock()->getLatest(); pos.sequenceNumber = delta->toSequence; pos.snapshotHash = StringPiece(delta->toHash.getBytes()).str(); - pos.mountGeneration = edenMount_->getMountGeneration(); + pos.mountGeneration = edenMount->getMountGeneration(); try { // And send it - callback_->write(pos); + state->callback->write(pos); } catch (const std::exception& exc) { XLOG(ERR) << "Error while sending subscription update: " << exc.what(); } diff --git a/eden/fs/service/StreamingSubscriber.h b/eden/fs/service/StreamingSubscriber.h index 16051ac802..19eca4966d 100644 --- a/eden/fs/service/StreamingSubscriber.h +++ b/eden/fs/service/StreamingSubscriber.h @@ -29,12 +29,12 @@ namespace eden { */ class StreamingSubscriber - : public std::enable_shared_from_this { + : public std::enable_shared_from_this, + private folly::EventBase::LoopCallback { public: - StreamingSubscriber( - std::unique_ptr>> callback, - std::shared_ptr edenMount); + using Callback = std::unique_ptr>>; + StreamingSubscriber(Callback callback, std::shared_ptr edenMount); ~StreamingSubscriber(); /** Establishes a subscription with the journal in the edenMount @@ -57,11 +57,24 @@ class StreamingSubscriber * This is ensured by only ever calling it via the schedule() method. */ void journalUpdated(); - std::unique_ptr>> - callback_; - std::shared_ptr edenMount_; - uint64_t subscriberId_; + /** We implement LoopCallback so that we can get notified when the + * eventBase is about to be destroyed. The other option for lifetime + * management is KeepAlive tokens but those are not suitable for us + * because we rely on the thrift eventBase threads terminating their + * loops before we trigger our shutdown code. KeepAlive tokens block + * that from happening. The next best thing is to get notified of + * destruction and then atomically reconcile our state. */ + void runLoopCallback() noexcept override; + + struct State { + Callback callback; + std::weak_ptr edenMount; + uint64_t subscriberId{0}; + bool eventBaseAlive{true}; + + State(Callback callback, std::weak_ptr edenMount); + }; + folly::Synchronized state_; }; } }