sapling/eden/fs/journal/Journal.h
Wez Furlong 5ca8bcdc60 implement eden->watchman subscriptions
Summary:
This tweaks the definition of the subscribe method in the
streamingeden thrift file and implements the server side of the
thrift service, and the client side to consume it in watchman.

The definition is now a bit simpler than it was previously; we're
now just sending a stream of the updated JournalPosition objects
to the client rather than computing and sending FileDelta objects.

This is cheaper for Eden to deal with because it is very cheap to take
the current journal position and pass that over the wire.  This is
important because a burst of mutations will likely queue up a bunch
of these in quick succession.

In a future diff I'm considering adding a latency parameter for
the subscription so that we can constrain the number of updates
over a certain time period (likely in the 10's of milliseconds range).

For now I just want to prove that the concept works.

On the watchman side we just need to pull these off the wire as they are sent
by eden, then wait for the subscription stream to settle before internally
broadcasting to any connected subscribers.

Reviewed By: bolinfest

Differential Revision: D4647259

fbshipit-source-id: 03aa16e59a43195a2505a8d03bce1ccf88a8d42f
2017-03-21 13:35:20 -07:00

84 lines
3.0 KiB
C++

/*
* Copyright (c) 2016-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#pragma once
#include <folly/Function.h>
#include <cstdint>
#include <memory>
#include <unordered_map>
namespace facebook {
namespace eden {
class JournalDelta;
/** The Journal exists to answer questions about how files are changing
* over time.
*
* It contains metadata only; it is not a full snapshot of the state of
* the filesystem at a particular point in time.
* The intent is to be able query things like "which set of files changed
* between time A and time B?".
*
* In the initial implementation we are recording file names from the overlay
* but will expand this to record things like checking out different
* revisions (the prior and new revision hash) from which we can derive
* the larger list of files.
*
* The Journal class is not internally threadsafe; we make it safe
* through the use of folly::Synchronized in the EdenMount class
* that owns the Journal.
*/
class Journal {
public:
using SequenceNumber = uint64_t;
/** Add a delta to the journal
* The delta will have a new sequence number and timestamp
* applied. */
void addDelta(std::unique_ptr<JournalDelta>&& delta);
/** Get a shared, immutable reference to the tip of the journal.
* May return nullptr if there have been no changes */
std::shared_ptr<const JournalDelta> getLatest() const;
/** Replace the journal with a new delta.
* The new delta will typically be the result of JournalDelta::merge().
* No sanity checking is performed inside this function; the
* supplied delta is moved in and replaces current tip. */
void replaceJournal(std::unique_ptr<JournalDelta>&& delta);
/** Register a subscriber.
* A subscriber is just a callback that is called whenever the
* journal has changed.
* It is recommended that the subscriber callback do the minimal
* amount of work needed to schedule the real work to happen in
* some other context because journal updates are likely to happen
* in awkward contexts or in the middle of some batch of mutations
* where it is not appropriate to do any heavy lifting.
* The return value of registerSubscriber is an identifier than
* can be passed to cancelSubscriber to later remove the registration.
*/
uint64_t registerSubscriber(folly::Function<void()>&& callback);
void cancelSubscriber(uint64_t id);
private:
/** The sequence number that we'll use for the next entry
* that we link into the chain */
SequenceNumber nextSequence_{1};
/** The most recently recorded entry */
std::shared_ptr<const JournalDelta> latest_;
/** The next id to assign to subscribers */
uint64_t nextSubscriberId_{1};
/** The subscribers */
std::unordered_map<uint64_t, folly::Function<void()>> subscribers_;
};
}
}