mononoke: initial diff for history batching

Summary:
In order to answer file history requests history we need to avoid doing serial
fetches from a blobstore.

This diff is a first attempt in implementing it. The general idea is to store precalculated history in the blobstore.
Note that at the moment it covers only the very basic case - file history for a commit with no parents, so there's quite a few "TODOs" and "unimplemented!()" in the codebase. This functionality will be extended in the next diffs

Reviewed By: krallin

Differential Revision: D17070901

fbshipit-source-id: 8150afb2509fcd2a428d2369bab58468ab774d72
This commit is contained in:
Stanislau Hlebik 2019-09-03 04:28:19 -07:00 committed by Facebook Github Bot
parent 90a4d40551
commit 00f4165a22
5 changed files with 616 additions and 2 deletions

View File

@ -0,0 +1,240 @@
// Copyright (c) 2018-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use crate::{thrift, ErrorKind, FastlogParent};
use blobstore::{Blobstore, BlobstoreBytes};
use cloned::cloned;
use context::CoreContext;
use failure_ext::{bail_err, Error};
use futures::{future, Future};
use futures_ext::BoxFuture;
use manifest::Entry;
use mononoke_types::{hash::Blake2, ChangesetId, FileUnodeId, ManifestUnodeId};
use rust_thrift::compact_protocol;
use std::collections::VecDeque;
use std::sync::Arc;
pub(crate) fn create_new_batch(
ctx: CoreContext,
blobstore: Arc<dyn Blobstore>,
unode_parents: Vec<Entry<ManifestUnodeId, FileUnodeId>>,
linknode: ChangesetId,
) -> impl Future<Item = FastlogBatch, Error = Error> {
let f = future::join_all(unode_parents.clone().into_iter().map({
cloned!(ctx, blobstore);
move |entry| {
fetch_fastlog_batch(ctx.clone(), blobstore.clone(), entry)
.and_then(move |maybe_batch| maybe_batch.ok_or(ErrorKind::NotFound(entry).into()))
}
}));
f.and_then(move |parent_batches| {
match parent_batches.get(0) {
None => future::ok(FastlogBatch::new(linknode)),
_ => {
// TODO(stash): handle other cases as well i.e. linear history and history with
// merges
unimplemented!()
}
}
})
}
pub(crate) fn fetch_fastlog_batch(
ctx: CoreContext,
blobstore: Arc<dyn Blobstore>,
unode_entry: Entry<ManifestUnodeId, FileUnodeId>,
) -> impl Future<Item = Option<FastlogBatch>, Error = Error> {
let fastlog_batch_key = generate_fastlog_batch_key(unode_entry);
blobstore
.get(ctx, fastlog_batch_key.clone())
.and_then(move |maybe_bytes| match maybe_bytes {
Some(serialized) => {
let thrift_entry: ::std::result::Result<thrift::FastlogBatch, Error> =
compact_protocol::deserialize(serialized.as_bytes()).map_err(|err| {
ErrorKind::DeserializationError(fastlog_batch_key, format!("{}", err))
.into()
});
thrift_entry.and_then(FastlogBatch::from_thrift).map(Some)
}
None => Ok(None),
})
}
pub(crate) fn save_fastlog_batch(
ctx: CoreContext,
blobstore: Arc<dyn Blobstore>,
unode_entry: Entry<ManifestUnodeId, FileUnodeId>,
batch: FastlogBatch,
) -> BoxFuture<(), Error> {
let fastlog_batch_key = generate_fastlog_batch_key(unode_entry);
let serialized = compact_protocol::serialize(&batch.into_thrift());
blobstore.put(
ctx,
fastlog_batch_key,
BlobstoreBytes::from_bytes(serialized),
)
}
fn generate_fastlog_batch_key(unode_entry: Entry<ManifestUnodeId, FileUnodeId>) -> String {
let key_part = match unode_entry {
Entry::Leaf(file_unode_id) => format!("fileunode.{}", file_unode_id),
Entry::Tree(mf_unode_id) => format!("manifestunode.{}", mf_unode_id),
};
format!("fastlogbatch.{}", key_part)
}
#[derive(Clone)]
struct ParentOffset(i32);
#[derive(Clone)]
pub struct FastlogBatch {
latest: VecDeque<(ChangesetId, Vec<ParentOffset>)>,
previous_batches: VecDeque<FastlogBatchId>,
}
impl FastlogBatch {
pub(crate) fn new(cs_id: ChangesetId) -> Self {
let mut latest = VecDeque::new();
latest.push_front((cs_id, vec![]));
FastlogBatch {
latest,
previous_batches: VecDeque::new(),
}
}
// Converts FastlogBatch into a list of (ChangesetId, Vec<FastlogParent>)
// TODO(stash): at the moment it doesn't fetch `previous_batches`
pub(crate) fn convert_to_list(
&self,
_ctx: CoreContext,
_blobstore: Arc<dyn Blobstore>,
) -> impl Future<Item = Vec<(ChangesetId, Vec<FastlogParent>)>, Error = Error> {
let mut res = vec![];
for (index, (cs_id, parent_offsets)) in self.latest.iter().enumerate() {
let mut batch_parents = vec![];
for offset in parent_offsets {
let parent_index = index + offset.0 as usize;
let batch_parent = match self.latest.get(parent_index) {
Some((p_cs_id, _)) => FastlogParent::Known(*p_cs_id),
None => FastlogParent::Unknown,
};
batch_parents.push(batch_parent);
}
res.push((*cs_id, batch_parents));
}
if !self.previous_batches.is_empty() {
// TODO(stash): handle previous_batches correctly
unimplemented!()
}
future::ok(res)
}
fn from_thrift(th: thrift::FastlogBatch) -> Result<FastlogBatch, Error> {
let latest: Result<VecDeque<_>, Error> = th
.latest
.into_iter()
.map(|hash_and_parents| {
let cs_id = ChangesetId::from_thrift(hash_and_parents.cs_id);
let offsets = hash_and_parents
.parent_offsets
.into_iter()
.map(|p| ParentOffset(p.0))
.collect();
cs_id.map(|cs_id| (cs_id, offsets))
})
.collect();
let latest = latest?;
let previous_batches: Result<VecDeque<_>, _> = th
.previous_batches
.into_iter()
.map(FastlogBatchId::from_thrift)
.collect();
let previous_batches = previous_batches?;
Ok(FastlogBatch {
latest,
previous_batches,
})
}
fn into_thrift(self) -> thrift::FastlogBatch {
let latest_thrift = self
.latest
.into_iter()
.map(|(cs_id, offsets)| {
let parent_offsets = offsets
.into_iter()
.map(|offset| thrift::ParentOffset(offset.0))
.collect();
thrift::CompressedHashAndParents {
cs_id: cs_id.into_thrift(),
parent_offsets,
}
})
.collect();
let previous_batches = self
.previous_batches
.into_iter()
.map(|previous_batch| previous_batch.into_thrift())
.collect();
thrift::FastlogBatch {
latest: latest_thrift,
previous_batches,
}
}
}
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug, Hash)]
pub struct FastlogBatchId(Blake2);
impl FastlogBatchId {
pub fn into_thrift(self) -> thrift::FastlogBatchId {
thrift::FastlogBatchId(thrift::IdType::Blake2(self.0.into_thrift()))
}
pub fn from_thrift(h: thrift::FastlogBatchId) -> Result<Self, Error> {
// This assumes that a null hash is never serialized. This should always be the
// case.
match h.0 {
thrift::IdType::Blake2(blake2) => Ok(FastlogBatchId(Blake2::from_thrift(blake2)?)),
thrift::IdType::UnknownField(x) => bail_err!(ErrorKind::InvalidThrift(
"FastlogBatchid".into(),
format!("unknown id type field: {}", x)
)),
}
}
}
#[cfg(test)]
mod test {
use super::*;
use fixtures::linear;
use mononoke_types_mocks::changesetid::ONES_CSID;
use tokio::runtime::Runtime;
#[test]
fn convert_to_list_simple() {
let ctx = CoreContext::test_mock();
let repo = linear::getrepo();
let mut rt = Runtime::new().unwrap();
let batch = FastlogBatch::new(ONES_CSID);
let blobstore = Arc::new(repo.get_blobstore());
assert_eq!(
vec![(ONES_CSID, vec![])],
rt.block_on(batch.convert_to_list(ctx, blobstore)).unwrap()
);
}
}

View File

@ -0,0 +1,298 @@
// Copyright (c) 2018-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
/// This library is used to efficiently store file and directory history.
/// For each unode we store a FastlogBatch - thrift structure that stores latest commits and their
/// parents that modified this file or directory. Commits are stored in BFS order.
/// All FastlogBatches are stored in blobstore.
///
/// Commits also store pointers to their parents, however they are stored as an offset to the
/// commit hash in batch. I.e. if we have two commits A and B and A is an ancestor of B, then
/// batch will look like:
/// B, vec![ParentOffset(1)]
/// A, vec![]
///
/// Note that commits where a file was deleted are not stored in FastlogBatch. It also doesn't
/// store a history across deletions i.e. if a file was added, then deleted then added again in
/// commit A, FastlogBatch in commit A will contain only one entry.
///
/// RootFastlog is a derived data which derives FastlogBatch for each unode
/// that was created or modified in this commit.
use blobrepo::BlobRepo;
use blobstore::{Blobstore, BlobstoreBytes};
use bytes::Bytes;
use cloned::cloned;
use context::CoreContext;
use derive_unode_manifest::derived_data_unodes::{RootUnodeManifestId, RootUnodeManifestMapping};
use derived_data::{BonsaiDerived, BonsaiDerivedMapping};
use failure_ext::{Error, Fail};
use futures::{stream::FuturesUnordered, Future, Stream};
use futures_ext::{BoxFuture, FutureExt, StreamExt};
use manifest::{Entry, ManifestOps};
use mononoke_types::{BonsaiChangeset, ChangesetId, FileUnodeId, ManifestUnodeId};
use std::collections::HashMap;
use std::iter::FromIterator;
use std::sync::Arc;
mod fastlog_batch;
mod thrift {
pub use fastlog_thrift::*;
pub use mononoke_types_thrift::*;
}
use fastlog_batch::{create_new_batch, fetch_fastlog_batch, save_fastlog_batch};
/// Returns history for a given unode if it exists.
/// This is the public API of this crate i.e. what clients should use if they want to
/// fetch the history
pub fn prefetch_history(
ctx: CoreContext,
repo: BlobRepo,
unode_entry: Entry<ManifestUnodeId, FileUnodeId>,
) -> impl Future<Item = Option<Vec<(ChangesetId, Vec<FastlogParent>)>>, Error = Error> {
let blobstore = Arc::new(repo.get_blobstore());
fetch_fastlog_batch(ctx.clone(), blobstore.clone(), unode_entry).and_then(
move |maybe_fastlog_batch| {
maybe_fastlog_batch.map(|fastlog_batch| fastlog_batch.convert_to_list(ctx, blobstore))
},
)
}
#[derive(Debug, PartialEq, Eq)]
pub enum FastlogParent {
/// Parent exists and it's stored in the batch
Known(ChangesetId),
/// Parent exists, but it's not stored in the batch (including previous_batches).
/// It needs to be fetched separately
Unknown,
}
#[derive(Debug, Fail)]
pub enum ErrorKind {
#[fail(display = "invalid Thrift structure '{}': {}", _0, _1)]
InvalidThrift(String, String),
#[fail(display = "Fastlog batch for {:?} unode not found", _0)]
NotFound(Entry<ManifestUnodeId, FileUnodeId>),
#[fail(display = "Failed to deserialize FastlogBatch for {}: {}", _0, _1)]
DeserializationError(String, String),
}
#[derive(Clone)]
pub struct RootFastlog(ChangesetId);
impl BonsaiDerived for RootFastlog {
const NAME: &'static str = "rootfastlog";
fn derive_from_parents(
ctx: CoreContext,
repo: BlobRepo,
bonsai: BonsaiChangeset,
parents: Vec<Self>,
) -> BoxFuture<Self, Error> {
// TODO(stash): we shouldn't create a RootUnodeManifestMapping mapping here -
// ideally we should create it once when Mononoke is initialized.
// But for now this is a limitation of derived data trait - it requires explicit
// passing of RootUnodeManifestMapping
let unode_mapping = RootUnodeManifestMapping::new(repo.get_blobstore());
let bcs_id = bonsai.get_changeset_id();
RootUnodeManifestId::derive(ctx.clone(), repo.clone(), Arc::new(unode_mapping), bcs_id)
.and_then(move |root_unode_mf_id| {
if parents.is_empty() {
let blobstore = Arc::new(repo.get_blobstore());
let unode_mf_id = root_unode_mf_id.manifest_unode_id().clone();
unode_mf_id
.list_all_entries(ctx.clone(), blobstore.clone())
.map(move |(_, entry)| {
create_new_batch(ctx.clone(), blobstore.clone(), vec![], bcs_id)
.and_then({
cloned!(ctx, blobstore);
move |fastlog_batch| {
save_fastlog_batch(ctx, blobstore, entry, fastlog_batch)
}
})
})
.buffered(100)
.collect()
.map(move |_| RootFastlog(bcs_id))
} else {
// TODO(stash): handle other cases as well i.e. linear history and history with
// merges
unimplemented!()
}
})
.boxify()
}
}
#[derive(Clone)]
pub struct RootFastlogMapping {
blobstore: Arc<dyn Blobstore>,
}
impl RootFastlogMapping {
pub fn new(blobstore: Arc<dyn Blobstore>) -> Self {
Self { blobstore }
}
fn format_key(&self, cs_id: &ChangesetId) -> String {
format!("derived_rootfastlog.{}", cs_id)
}
}
impl BonsaiDerivedMapping for RootFastlogMapping {
type Value = RootFastlog;
fn get(
&self,
ctx: CoreContext,
csids: Vec<ChangesetId>,
) -> BoxFuture<HashMap<ChangesetId, Self::Value>, Error> {
let gets = csids.into_iter().map(|cs_id| {
self.blobstore
.get(ctx.clone(), self.format_key(&cs_id))
.map(move |maybe_val| maybe_val.map(|_| (cs_id.clone(), RootFastlog(cs_id))))
});
FuturesUnordered::from_iter(gets)
.filter_map(|x| x) // Remove None
.collect_to()
.boxify()
}
fn put(&self, ctx: CoreContext, csid: ChangesetId, _id: Self::Value) -> BoxFuture<(), Error> {
self.blobstore.put(
ctx,
self.format_key(&csid),
// Value doesn't matter here, so just put empty Value
BlobstoreBytes::from_bytes(Bytes::new()),
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use blobrepo::save_bonsai_changesets;
use context::CoreContext;
use fixtures::{create_bonsai_changeset, linear};
use mercurial_types::HgChangesetId;
use mononoke_types::{MPath, ManifestUnodeId};
use std::str::FromStr;
use tokio::runtime::Runtime;
#[test]
fn test_derive_single_empty_commit_no_parents() {
let mut rt = Runtime::new().unwrap();
let repo = linear::getrepo();
let ctx = CoreContext::test_mock();
let bcs = create_bonsai_changeset(vec![]);
let bcs_id = bcs.get_changeset_id();
rt.block_on(save_bonsai_changesets(vec![bcs], ctx.clone(), repo.clone()))
.unwrap();
let root_unode_mf_id =
derive_fastlog_batch_and_unode(&mut rt, ctx.clone(), bcs_id.clone(), repo.clone());
let list = fetch_list(
&mut rt,
ctx.clone(),
repo.clone(),
Entry::Tree(root_unode_mf_id),
);
assert_eq!(list, vec![(bcs_id, vec![])]);
}
#[test]
fn test_derive_single_commit_no_parents() {
let mut rt = Runtime::new().unwrap();
let repo = linear::getrepo();
let ctx = CoreContext::test_mock();
// This is the initial diff with no parents
// See tests/fixtures/src/lib.rs
let hg_cs_id = HgChangesetId::from_str("2d7d4ba9ce0a6ffd222de7785b249ead9c51c536").unwrap();
let bcs_id = rt
.block_on(repo.get_bonsai_from_hg(ctx.clone(), hg_cs_id))
.unwrap()
.unwrap();
let root_unode_mf_id =
derive_fastlog_batch_and_unode(&mut rt, ctx.clone(), bcs_id.clone(), repo.clone());
let list = fetch_list(
&mut rt,
ctx.clone(),
repo.clone(),
Entry::Tree(root_unode_mf_id.clone()),
);
assert_eq!(list, vec![(bcs_id, vec![])]);
let blobstore = Arc::new(repo.get_blobstore());
let path_1 = MPath::new(&"1").unwrap();
let path_files = MPath::new(&"files").unwrap();
let entries = rt
.block_on(
root_unode_mf_id
.find_entries(ctx.clone(), blobstore.clone(), vec![path_1, path_files])
.collect(),
)
.unwrap();
let list = fetch_list(
&mut rt,
ctx.clone(),
repo.clone(),
entries.get(0).unwrap().1.clone(),
);
assert_eq!(list, vec![(bcs_id, vec![])]);
let list = fetch_list(
&mut rt,
ctx.clone(),
repo.clone(),
entries.get(1).unwrap().1.clone(),
);
assert_eq!(list, vec![(bcs_id, vec![])]);
}
fn derive_fastlog_batch_and_unode(
rt: &mut Runtime,
ctx: CoreContext,
bcs_id: ChangesetId,
repo: BlobRepo,
) -> ManifestUnodeId {
let blobstore = Arc::new(repo.get_blobstore());
let mapping = RootFastlogMapping::new(blobstore.clone());
rt.block_on(RootFastlog::derive(
ctx.clone(),
repo.clone(),
mapping,
bcs_id,
))
.unwrap();
let unode_mapping = RootUnodeManifestMapping::new(repo.get_blobstore());
let root_unode =
RootUnodeManifestId::derive(ctx.clone(), repo.clone(), Arc::new(unode_mapping), bcs_id);
let root_unode = rt.block_on(root_unode).unwrap();
root_unode.manifest_unode_id().clone()
}
fn fetch_list(
rt: &mut Runtime,
ctx: CoreContext,
repo: BlobRepo,
entry: Entry<ManifestUnodeId, FileUnodeId>,
) -> Vec<(ChangesetId, Vec<FastlogParent>)> {
let blobstore = Arc::new(repo.get_blobstore());
let batch = rt
.block_on(fetch_fastlog_batch(ctx.clone(), blobstore.clone(), entry))
.unwrap()
.expect("batch hasn't been generated yet");
rt.block_on(batch.convert_to_list(ctx, blobstore)).unwrap()
}
}

View File

@ -0,0 +1,58 @@
// Copyright (c) 2018-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
include "scm/mononoke/mononoke_types/if/mononoke_types_thrift.thrift"
typedef mononoke_types_thrift.IdType FastlogBatchId (hs.newtype)
// Structure that holds a commit graph, usually a history of a file
// or a directory hence the name. Semantically it stores list of
// (commit hash, [parent commit hashes]), however it's stored in compressed form
// described below. Compressed form is used to save space.
//
// FastlogBatch has two parts: `latest` and `previous_batches`.
// `previous_batches` field points to another FastlogBatch structures so
// FastlogBatch is a recursive structure. However normally `previous_batches`
// point to degenerate version of FastlogBatch with empty `previous_batches`
// i.e. we have only one level of nesting.
//
// In order to get the full list we need to get latest commits and concatenate
// it with lists from `previous_batches`.
//
// `latest` stores commit hashes and offsets to commit parents
// i.e. if offset is 1, then next commit is a parent of a current commit.
// For example, a list like
//
// (HASH_A, [HASH_B])
// (HASH_B, [])
//
// will be encoded as
// (HASH_A, [1]) # offset is 1, means next hash
// (HASH_B, [])
//
// A list with a merge
// (HASH_A, [HASH_B, HASH_C])
// (HASH_B, [])
// (HASH_C, [])
//
// will be encoded differently
// (HASH_A, [1, 2])
// (HASH_B, [])
// (HASH_C, [])
//
// Note that offset might point to a commit in a next FastlogBatch or even
// point to batch outside of all previous_batches.
struct FastlogBatch {
1: list<CompressedHashAndParents> latest,
2: list<FastlogBatchId> previous_batches,
}
typedef i32 ParentOffset (hs.newtype)
struct CompressedHashAndParents {
1: mononoke_types_thrift.ChangesetId cs_id,
2: list<ParentOffset> parent_offsets,
}

View File

@ -172,6 +172,24 @@ where
.boxify()
}
fn list_all_entries(
&self,
ctx: CoreContext,
blobstore: impl Blobstore + Clone,
) -> BoxStream<
(
Option<MPath>,
Entry<Self, <<Self as Loadable>::Value as Manifest>::LeafId>,
),
Error,
> {
self.find_entries(
ctx.clone(),
blobstore.clone(),
vec![PathOrPrefix::Prefix(None)],
)
}
fn diff(
&self,
ctx: CoreContext,

View File

@ -73,7 +73,7 @@ impl Blake2 {
}
#[inline]
pub(crate) fn from_thrift(b: thrift::Blake2) -> Result<Self> {
pub fn from_thrift(b: thrift::Blake2) -> Result<Self> {
// Currently this doesn't require consuming b, but hopefully with T26959816 this
// code will be able to convert a SmallVec directly into an array.
if b.0.len() != 32 {
@ -107,7 +107,7 @@ impl Blake2 {
}
}
pub(crate) fn into_thrift(self) -> thrift::Blake2 {
pub fn into_thrift(self) -> thrift::Blake2 {
// This doesn't need to consume self today, but once T26959816 is implemented it
// should be possible to do that without copying.
thrift::Blake2(self.0.to_vec())