diff --git a/eden/fs/service/EdenServiceHandler.cpp b/eden/fs/service/EdenServiceHandler.cpp index 23fd60ad2f..83cba28eea 100644 --- a/eden/fs/service/EdenServiceHandler.cpp +++ b/eden/fs/service/EdenServiceHandler.cpp @@ -551,19 +551,22 @@ EdenServiceHandler::subscribeStreamTemporary( // and change callbacks. We can't know the id until after we've // created them both, so we need to share an optional id between them. auto handle = std::make_shared>(); + auto disconnected = std::make_shared>(false); // This is called when the subscription channel is torn down - auto onDisconnect = [weakMount, handle] { + auto onDisconnect = [weakMount, handle, disconnected] { XLOG(ERR) << "streaming client disconnected"; auto mount = weakMount.lock(); if (mount) { + disconnected->store(true); mount->getJournal().cancelSubscriber(handle->value()); } }; // Set up the actual publishing instance - auto [reader, writer] = - createStreamPublisher(std::move(onDisconnect)); + auto streamAndPublisher = + apache::thrift::ServerStream::createPublisher( + std::move(onDisconnect)); // A little wrapper around the StreamPublisher. // This is needed because the destructor for StreamPublisherState @@ -571,25 +574,31 @@ EdenServiceHandler::subscribeStreamTemporary( // We don't have an easy way to trigger this outside of just calling // it in a destructor, so that's what we do here. struct Publisher { - apache::thrift::StreamPublisher publisher; + apache::thrift::ServerStreamPublisher publisher; + std::shared_ptr> disconnected; explicit Publisher( - apache::thrift::StreamPublisher publisher) - : publisher(std::move(publisher)) {} + apache::thrift::ServerStreamPublisher publisher, + std::shared_ptr> disconnected) + : publisher(std::move(publisher)), + disconnected(std::move(disconnected)) {} ~Publisher() { // We have to send an exception as part of the completion, otherwise // thrift doesn't seem to notify the peer of the shutdown - std::move(publisher).complete( - folly::make_exception_wrapper( - "subscriber terminated")); + if (!disconnected->load()) { + std::move(publisher).complete( + folly::make_exception_wrapper( + "subscriber terminated")); + } } }; - auto stream = std::make_shared(std::move(writer)); + auto stream = std::make_shared( + std::move(streamAndPublisher.second), std::move(disconnected)); // This is called each time the journal is updated - auto onJournalChange = [weakMount, stream]() mutable { + auto onJournalChange = [weakMount, stream = std::move(stream)]() mutable { auto mount = weakMount.lock(); if (mount) { auto& journal = mount->getJournal(); @@ -613,7 +622,7 @@ EdenServiceHandler::subscribeStreamTemporary( handle->emplace( edenMount->getJournal().registerSubscriber(std::move(onJournalChange))); - return std::move(reader); + return std::move(streamAndPublisher.first); } #endif // !_WIN32