Remove deprecated method createStreamPublisher in EdenServiceHandler

Summary: Remove deprecated method createStreamPublisher with apache::thrift::ServerStream<T>::createPublisher()

Reviewed By: iahs

Differential Revision: D19961754

fbshipit-source-id: 915645f1aff648d65f54246e008fbc4454b71684
This commit is contained in:
James Zuo 2020-02-20 09:31:04 -08:00 committed by Facebook Github Bot
parent ef1ffa31e8
commit 73e1521a4e

View File

@ -551,19 +551,22 @@ EdenServiceHandler::subscribeStreamTemporary(
// and change callbacks. We can't know the id until after we've
// created them both, so we need to share an optional id between them.
auto handle = std::make_shared<std::optional<Journal::SubscriberId>>();
auto disconnected = std::make_shared<std::atomic<bool>>(false);
// This is called when the subscription channel is torn down
auto onDisconnect = [weakMount, handle] {
auto onDisconnect = [weakMount, handle, disconnected] {
XLOG(ERR) << "streaming client disconnected";
auto mount = weakMount.lock();
if (mount) {
disconnected->store(true);
mount->getJournal().cancelSubscriber(handle->value());
}
};
// Set up the actual publishing instance
auto [reader, writer] =
createStreamPublisher<JournalPosition>(std::move(onDisconnect));
auto streamAndPublisher =
apache::thrift::ServerStream<JournalPosition>::createPublisher(
std::move(onDisconnect));
// A little wrapper around the StreamPublisher.
// This is needed because the destructor for StreamPublisherState
@ -571,25 +574,31 @@ EdenServiceHandler::subscribeStreamTemporary(
// We don't have an easy way to trigger this outside of just calling
// it in a destructor, so that's what we do here.
struct Publisher {
apache::thrift::StreamPublisher<JournalPosition> publisher;
apache::thrift::ServerStreamPublisher<JournalPosition> publisher;
std::shared_ptr<std::atomic<bool>> disconnected;
explicit Publisher(
apache::thrift::StreamPublisher<JournalPosition> publisher)
: publisher(std::move(publisher)) {}
apache::thrift::ServerStreamPublisher<JournalPosition> publisher,
std::shared_ptr<std::atomic<bool>> disconnected)
: publisher(std::move(publisher)),
disconnected(std::move(disconnected)) {}
~Publisher() {
// We have to send an exception as part of the completion, otherwise
// thrift doesn't seem to notify the peer of the shutdown
std::move(publisher).complete(
folly::make_exception_wrapper<std::runtime_error>(
"subscriber terminated"));
if (!disconnected->load()) {
std::move(publisher).complete(
folly::make_exception_wrapper<std::runtime_error>(
"subscriber terminated"));
}
}
};
auto stream = std::make_shared<Publisher>(std::move(writer));
auto stream = std::make_shared<Publisher>(
std::move(streamAndPublisher.second), std::move(disconnected));
// This is called each time the journal is updated
auto onJournalChange = [weakMount, stream]() mutable {
auto onJournalChange = [weakMount, stream = std::move(stream)]() mutable {
auto mount = weakMount.lock();
if (mount) {
auto& journal = mount->getJournal();
@ -613,7 +622,7 @@ EdenServiceHandler::subscribeStreamTemporary(
handle->emplace(
edenMount->getJournal().registerSubscriber(std::move(onJournalChange)));
return std::move(reader);
return std::move(streamAndPublisher.first);
}
#endif // !_WIN32