sapling/eden/fs/service/StreamingSubscriber.h
Wez Furlong 5ca8bcdc60 implement eden->watchman subscriptions
Summary:
This tweaks the definition of the subscribe method in the
streamingeden thrift file and implements the server side of the
thrift service, and the client side to consume it in watchman.

The definition is now a bit simpler than it was previously; we're
now just sending a stream of the updated JournalPosition objects
to the client rather than computing and sending FileDelta objects.

This is cheaper for Eden to deal with because it is very cheap to take
the current journal position and pass that over the wire.  This is
important because a burst of mutations will likely queue up a bunch
of these in quick succession.

In a future diff I'm considering adding a latency parameter for
the subscription so that we can constrain the number of updates
over a certain time period (likely in the 10's of milliseconds range).

For now I just want to prove that the concept works.

On the watchman side we just need to pull these off the wire as they are sent
by eden, then wait for the subscription stream to settle before internally
broadcasting to any connected subscribers.

Reviewed By: bolinfest

Differential Revision: D4647259

fbshipit-source-id: 03aa16e59a43195a2505a8d03bce1ccf88a8d42f
2017-03-21 13:35:20 -07:00

68 lines
2.3 KiB
C++

/*
* Copyright (c) 2017-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#pragma once
#include <memory>
#include "eden/fs/inodes/EdenMount.h"
#include "eden/fs/service/gen-cpp2/StreamingEdenService.h"
namespace facebook {
namespace eden {
/** StreamingSubscriber is used to implement pushing updates to
* connected subscribers so that they can take action as files
* are modified in the eden mount.
*
* This initial implementation is relatively dumb in that it
* will immediately try to send a notification to the subscriber.
*
* Future iterations will add the ability to rate control these
* updates (no more than 1 update per specified time interval)
* and potentially also add a predicate so that we only notify
* for updates that match certain criteria.
*/
class StreamingSubscriber
: public std::enable_shared_from_this<StreamingSubscriber> {
public:
StreamingSubscriber(
std::unique_ptr<apache::thrift::StreamingHandlerCallback<
std::unique_ptr<JournalPosition>>> callback,
std::shared_ptr<EdenMount> edenMount);
~StreamingSubscriber();
/** Establishes a subscription with the journal in the edenMount
* that was passed in during construction.
* While the subscription is active, the journal holds a reference
* to this StreamingSubscriber and keeps it alive.
* As part of setting this up, pushes the initial subscription information
* to the client.
*/
void subscribe();
private:
/** Schedule a call to journalUpdated.
* The journalUpdated method will be called in the context of the
* eventBase thread that is associated with the connected client */
void schedule();
/** Compute information to send to the connected subscriber.
* This must only be called on the thread associated with the client.
* This is ensured by only ever calling it via the schedule() method. */
void journalUpdated();
std::unique_ptr<apache::thrift::StreamingHandlerCallback<
std::unique_ptr<JournalPosition>>>
callback_;
std::shared_ptr<EdenMount> edenMount_;
uint64_t subscriberId_;
};
}
}