mirror of
https://github.com/facebook/sapling.git
synced 2024-10-08 07:49:11 +03:00
edf20155c9
Summary: There was a lock ordering violation in the combination of StreamingSubscriber and Journal. I changed StreamingSubscriber to always acquire the Journal lock before StreamingSubscriber's State lock. I also simplified StreamingSubscriber's API to a single static function to isolate all of the lifetime and thread safety concerns in one implementation file. This fixes a regression caused by D6162717 Reviewed By: simpkins Differential Revision: D6202770 fbshipit-source-id: 326269db15bf3200bd6321edf372daf286784fb5
135 lines
4.0 KiB
C++
135 lines
4.0 KiB
C++
/*
|
|
* Copyright (c) 2017-present, Facebook, Inc.
|
|
* All rights reserved.
|
|
*
|
|
* This source code is licensed under the BSD-style license found in the
|
|
* LICENSE file in the root directory of this source tree. An additional grant
|
|
* of patent rights can be found in the PATENTS file in the same directory.
|
|
*
|
|
*/
|
|
#include "StreamingSubscriber.h"
|
|
|
|
#include <folly/experimental/logging/xlog.h>
|
|
|
|
using folly::StringPiece;
|
|
|
|
namespace facebook {
|
|
namespace eden {
|
|
|
|
StreamingSubscriber::State::State(StreamingSubscriber::Callback callback)
|
|
: callback(std::move(callback)) {}
|
|
|
|
void StreamingSubscriber::runLoopCallback() noexcept {
|
|
auto state = state_.wlock();
|
|
if (state->callback) {
|
|
// We're called on the eventBase thread so we can call these
|
|
// methods directly and tear down the peer. Note that we
|
|
// should only get here in the case that the server is being
|
|
// shutdown. The individual unmount case is handled by the
|
|
// destructor.
|
|
state->callback->done();
|
|
state->callback.reset();
|
|
}
|
|
state->eventBaseAlive = false;
|
|
}
|
|
|
|
void StreamingSubscriber::subscribe(
|
|
Callback callback,
|
|
std::shared_ptr<EdenMount> edenMount) {
|
|
auto self =
|
|
std::make_shared<StreamingSubscriber>(std::move(callback), edenMount);
|
|
|
|
// Separately scope the locks as the schedule() below will attempt
|
|
// to acquire the locks for itself.
|
|
{
|
|
auto journal = edenMount->getJournal().wlock();
|
|
auto state = self->state_.wlock();
|
|
|
|
// Arrange to be told when the eventBase is about to be destroyed
|
|
state->callback->getEventBase()->runOnDestruction(self.get());
|
|
state->subscriberId =
|
|
journal->registerSubscriber([self] { schedule(self); });
|
|
}
|
|
|
|
// Suggest to the subscription that the journal has been updated so that
|
|
// it will compute initial delta information.
|
|
schedule(self);
|
|
}
|
|
|
|
StreamingSubscriber::StreamingSubscriber(
|
|
Callback callback,
|
|
std::shared_ptr<EdenMount> edenMount)
|
|
: edenMount_(std::move(edenMount)),
|
|
state_(folly::in_place, std::move(callback)) {}
|
|
|
|
StreamingSubscriber::~StreamingSubscriber() {
|
|
auto state = state_.wlock();
|
|
// If the eventBase is still live then we should tear down the peer
|
|
if (state->callback) {
|
|
CHECK(state->eventBaseAlive);
|
|
auto evb = state->callback->getEventBase();
|
|
|
|
// Move the callback away; we won't be able to use it
|
|
// via state-> again.
|
|
evb->runInEventBaseThread(
|
|
[callback = std::move(state->callback)]() mutable {
|
|
callback->done();
|
|
callback.reset();
|
|
});
|
|
}
|
|
}
|
|
|
|
void StreamingSubscriber::schedule(std::shared_ptr<StreamingSubscriber> self) {
|
|
auto state = self->state_.rlock();
|
|
if (state->callback) {
|
|
state->callback->getEventBase()->runInEventBaseThread(
|
|
[self] { self->journalUpdated(); });
|
|
}
|
|
}
|
|
|
|
void StreamingSubscriber::journalUpdated() {
|
|
auto edenMount = edenMount_.lock();
|
|
if (!edenMount) {
|
|
XLOG(DBG1) << "Mount is released: subscription is no longer active";
|
|
auto state = state_.wlock();
|
|
state->callback->done();
|
|
state->callback.reset();
|
|
return;
|
|
}
|
|
|
|
// The Journal lock must always be acquired before state_'s lock.
|
|
auto journal = edenMount->getJournal().ulock();
|
|
|
|
auto state = state_.wlock();
|
|
if (!state->callback) {
|
|
// We were cancelled while this callback was queued up.
|
|
// There's nothing for us to do now.
|
|
return;
|
|
}
|
|
|
|
if (!state->callback->isRequestActive() ||
|
|
!journal->isSubscriberValid(state->subscriberId)) {
|
|
XLOG(DBG1) << "Subscription is no longer active";
|
|
journal.moveFromUpgradeToWrite()->cancelSubscriber(state->subscriberId);
|
|
state->callback->done();
|
|
state->callback.reset();
|
|
return;
|
|
}
|
|
|
|
JournalPosition pos;
|
|
|
|
auto delta = journal->getLatest();
|
|
pos.sequenceNumber = delta->toSequence;
|
|
pos.snapshotHash = StringPiece(delta->toHash.getBytes()).str();
|
|
pos.mountGeneration = edenMount->getMountGeneration();
|
|
|
|
try {
|
|
// And send it
|
|
state->callback->write(pos);
|
|
} catch (const std::exception& exc) {
|
|
XLOG(ERR) << "Error while sending subscription update: " << exc.what();
|
|
}
|
|
}
|
|
} // namespace eden
|
|
} // namespace facebook
|