sapling/eden/fs/service/StreamingSubscriber.cpp
Adam Simpkins 8e3c09a99a move folly/experimental/logging to folly/logging/
Summary:
Promote the folly logging code out of the experimental subdirectory.
We have been using this for several months in a few projects and are pretty
happy with it so far.

After moving it out of the experimental/ subdirectory I plan to update
folly::Init() to automatically support configuring it via a `--logging` command
line flag (similar to the initialization it already does today for glog).

Reviewed By: yfeldblum, chadaustin

Differential Revision: D7755455

fbshipit-source-id: 052db34c97f7516728f7cbb1a5ad959def2f6efb
2018-04-30 21:29:29 -07:00

132 lines
3.9 KiB
C++

/*
* Copyright (c) 2017-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include "StreamingSubscriber.h"
#include <folly/logging/xlog.h>
using folly::StringPiece;
namespace facebook {
namespace eden {
StreamingSubscriber::State::State(StreamingSubscriber::Callback callback)
: callback(std::move(callback)) {}
void StreamingSubscriber::runLoopCallback() noexcept {
auto state = state_.wlock();
if (state->callback) {
// We're called on the eventBase thread so we can call these
// methods directly and tear down the peer. Note that we
// should only get here in the case that the server is being
// shutdown. The individual unmount case is handled by the
// destructor.
state->callback->done();
state->callback.reset();
}
state->eventBaseAlive = false;
}
void StreamingSubscriber::subscribe(
Callback callback,
std::shared_ptr<EdenMount> edenMount) {
auto self =
std::make_shared<StreamingSubscriber>(std::move(callback), edenMount);
// Separately scope the lock as the schedule() below will attempt to acquire
// it for itself.
{
auto state = self->state_.wlock();
// Arrange to be told when the eventBase is about to be destroyed
state->callback->getEventBase()->runOnDestruction(self.get());
state->subscriberId =
edenMount->getJournal().registerSubscriber([self] { schedule(self); });
}
// Suggest to the subscription that the journal has been updated so that
// it will compute initial delta information.
schedule(self);
}
StreamingSubscriber::StreamingSubscriber(
Callback callback,
std::shared_ptr<EdenMount> edenMount)
: edenMount_(std::move(edenMount)),
state_(folly::in_place, std::move(callback)) {}
StreamingSubscriber::~StreamingSubscriber() {
auto state = state_.wlock();
// If the eventBase is still live then we should tear down the peer
if (state->callback) {
CHECK(state->eventBaseAlive);
auto evb = state->callback->getEventBase();
// Move the callback away; we won't be able to use it
// via state-> again.
evb->runInEventBaseThread(
[callback = std::move(state->callback)]() mutable {
callback->done();
callback.reset();
});
}
}
void StreamingSubscriber::schedule(std::shared_ptr<StreamingSubscriber> self) {
auto state = self->state_.rlock();
if (state->callback) {
state->callback->getEventBase()->runInEventBaseThread(
[self] { self->journalUpdated(); });
}
}
void StreamingSubscriber::journalUpdated() {
auto edenMount = edenMount_.lock();
if (!edenMount) {
XLOG(DBG1) << "Mount is released: subscription is no longer active";
auto state = state_.wlock();
state->callback->done();
state->callback.reset();
return;
}
auto state = state_.wlock();
if (!state->callback) {
// We were cancelled while this callback was queued up.
// There's nothing for us to do now.
return;
}
auto& journal = edenMount->getJournal();
if (!state->callback->isRequestActive() ||
!journal.isSubscriberValid(state->subscriberId)) {
XLOG(DBG1) << "Subscription is no longer active";
journal.cancelSubscriber(state->subscriberId);
state->callback->done();
state->callback.reset();
return;
}
JournalPosition pos;
auto delta = journal.getLatest();
pos.sequenceNumber = delta->toSequence;
pos.snapshotHash = StringPiece(delta->toHash.getBytes()).str();
pos.mountGeneration = edenMount->getMountGeneration();
try {
// And send it
state->callback->write(pos);
} catch (const std::exception& exc) {
XLOG(ERR) << "Error while sending subscription update: " << exc.what();
}
}
} // namespace eden
} // namespace facebook