sapling/eden/fs/service/StreamingSubscriber.cpp
Jon Maltiel Swenson 79b7db4d91 Make EventBase destruction callbacks safely cancellable
Summary: Currently, `runOnDestruction` aims to be thread-safe; new callbacks are added to the `onDestructionCallbacks_` list while the associated mutex is held. However, the caller may own the `LoopCallback` and wish to destroy/cancel it before the `EventBase` destructor runs, and this callback cancellation is not thread-safe, since unlinking does not happen under the lock protecting `onDestructionCallbacks_`. The primary motivation of this diff is to make on-destruction callback cancellation thread-safe; in particular, it is safe to cancel an on-destruction callback concurrently with `~EventBase()`.

Reviewed By: spalamarchuk

Differential Revision: D13440552

fbshipit-source-id: 65cee1e361d37647920baaad4490dd26b791315d
2019-01-24 15:57:39 -08:00

135 lines
3.9 KiB
C++

/*
* Copyright (c) 2017-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include "StreamingSubscriber.h"
#include <folly/logging/xlog.h>
using folly::StringPiece;
namespace facebook {
namespace eden {
StreamingSubscriber::State::State(StreamingSubscriber::Callback callback)
: callback(std::move(callback)) {}
void StreamingSubscriber::onEventBaseDestruction() noexcept {
auto state = state_.wlock();
if (state->callback) {
// We're called on the eventBase thread so we can call these
// methods directly and tear down the peer. Note that we
// should only get here in the case that the server is being
// shutdown. The individual unmount case is handled by the
// destructor.
state->callback->done();
state->callback.reset();
}
state->eventBaseAlive = false;
}
void StreamingSubscriber::subscribe(
Callback callback,
std::shared_ptr<EdenMount> edenMount) {
auto self =
std::make_shared<StreamingSubscriber>(std::move(callback), edenMount);
// Separately scope the lock as the schedule() below will attempt to acquire
// it for itself.
{
auto state = self->state_.wlock();
// Arrange to be told when the eventBase is about to be destroyed
state->callback->getEventBase()->runOnDestruction(*self);
state->subscriberId =
edenMount->getJournal().registerSubscriber([self] { schedule(self); });
}
// Suggest to the subscription that the journal has been updated so that
// it will compute initial delta information.
schedule(self);
}
StreamingSubscriber::StreamingSubscriber(
Callback callback,
std::shared_ptr<EdenMount> edenMount)
: edenMount_(std::move(edenMount)),
state_(folly::in_place, std::move(callback)) {}
StreamingSubscriber::~StreamingSubscriber() {
// Cancel the EventBase::OnDestructionCallback
cancel();
auto state = state_.wlock();
// If the eventBase is still live then we should tear down the peer
if (state->callback) {
CHECK(state->eventBaseAlive);
auto evb = state->callback->getEventBase();
// Move the callback away; we won't be able to use it
// via state-> again.
evb->runInEventBaseThread(
[callback = std::move(state->callback)]() mutable {
callback->done();
callback.reset();
});
}
}
void StreamingSubscriber::schedule(std::shared_ptr<StreamingSubscriber> self) {
auto state = self->state_.rlock();
if (state->callback) {
state->callback->getEventBase()->runInEventBaseThread(
[self] { self->journalUpdated(); });
}
}
void StreamingSubscriber::journalUpdated() {
auto edenMount = edenMount_.lock();
if (!edenMount) {
XLOG(DBG1) << "Mount is released: subscription is no longer active";
auto state = state_.wlock();
state->callback->done();
state->callback.reset();
return;
}
auto state = state_.wlock();
if (!state->callback) {
// We were cancelled while this callback was queued up.
// There's nothing for us to do now.
return;
}
auto& journal = edenMount->getJournal();
if (!state->callback->isRequestActive() ||
!journal.isSubscriberValid(state->subscriberId)) {
XLOG(DBG1) << "Subscription is no longer active";
journal.cancelSubscriber(state->subscriberId);
state->callback->done();
state->callback.reset();
return;
}
JournalPosition pos;
auto delta = journal.getLatest();
pos.sequenceNumber = delta->toSequence;
pos.snapshotHash = StringPiece(delta->toHash.getBytes()).str();
pos.mountGeneration = edenMount->getMountGeneration();
try {
// And send it
state->callback->write(pos);
} catch (const std::exception& exc) {
XLOG(ERR) << "Error while sending subscription update: " << exc.what();
}
}
} // namespace eden
} // namespace facebook