mononoke: support merges in FastlogBatch

Summary:
Finally support merges. Whenever we have a merge a completely new batch is
created.

The one thing that surprised me was that ParentOffsets actually can be
negative. For example
```
     o
  /    \
 o       o
 |       |
 o       |
 |       |
 o <---  | --- ParentOffset of this commit can be negative!
   \   /
     o
```

It happens because of BFS order of traversal - parent can be visited befor the
child. Note that negative offsets shouldn't cause problems.

Reviewed By: aslpavel

Differential Revision: D17183355

fbshipit-source-id: b5165ffef7212ce220dd338079db9e26a3030f58
This commit is contained in:
Stanislau Hlebik 2019-09-09 09:52:33 -07:00 committed by Facebook Github Bot
parent 740d4276aa
commit 609e01e237
4 changed files with 294 additions and 27 deletions

View File

@ -11,9 +11,14 @@ use context::CoreContext;
use failure_ext::Error;
use futures::{future, Future};
use futures_ext::{BoxFuture, FutureExt};
use itertools::Itertools;
use manifest::Entry;
use mononoke_types::{fastlog_batch::FastlogBatch, ChangesetId, FileUnodeId, ManifestUnodeId};
use std::collections::VecDeque;
use mononoke_types::{
fastlog_batch::{FastlogBatch, ParentOffset},
ChangesetId, FileUnodeId, ManifestUnodeId,
};
use std::collections::{HashMap, HashSet, VecDeque};
use std::iter::once;
use std::sync::Arc;
pub(crate) fn create_new_batch(
@ -35,20 +40,116 @@ pub(crate) fn create_new_batch(
match parent_batches.get(0) {
Some(parent_batch) => parent_batch
.prepend_child_with_single_parent(ctx, blobstore, linknode)
.left_future(),
.boxify(),
None => {
let mut d = VecDeque::new();
d.push_back((linknode, vec![]));
FastlogBatch::new_from_raw_list(ctx, blobstore, d).right_future()
FastlogBatch::new_from_raw_list(ctx, blobstore, d).boxify()
}
}
} else {
// TODO(stash): handle merges as well
unimplemented!()
future::join_all(parent_batches.into_iter().map({
cloned!(ctx, blobstore);
move |batch| fetch_flattened(&batch, ctx.clone(), blobstore.clone())
}))
.map(move |parents_flattened| create_merged_list(linknode, parents_flattened))
// Converts FastlogParent to ParentOffset
.map(convert_to_raw_list)
.and_then(move |raw_list| {
FastlogBatch::new_from_raw_list(ctx, blobstore, raw_list)
})
.boxify()
}
})
}
// This function creates a FastlogBatch list for a merge unode.
// It does so by taking a merge_cs_id (i.e. a linknode of this merge unode) and
// FastlogBatches for it's parents and merges them together
//
// For example, let's say we have a unode whose history graph is the following:
//
// o <- commit A
// / \
// commit B o \
// \ o <- commit C
// \ /
// o <- commit D
//
// create_merged_list() accepts commit A as merge_cs_id, [B, D] as a first parent's list
// and [C, D] as the second parent's list. The expected output is [A, B, C, D].
fn create_merged_list(
merge_cs_id: ChangesetId,
parents_lists: Vec<Vec<(ChangesetId, Vec<FastlogParent>)>>,
) -> Vec<(ChangesetId, Vec<FastlogParent>)> {
// parents_of_merge_commits preserve the order of `parents_lists`
let mut parents_of_merge_commit = vec![];
for list in parents_lists.iter() {
if let Some((p, _)) = list.get(0) {
parents_of_merge_commit.push(FastlogParent::Known(*p));
}
}
{
// Make sure we have unique parents
let mut used = HashSet::new();
parents_of_merge_commit.retain(move |p| used.insert(p.clone()));
}
let mut used = HashSet::new();
// Merge parent lists in the BFS order. To do that we assign indices to entries inside a list
// and to each parents.
// So if we have parents_lists = [[B, D], [C, D]], then after we assigned indices they
// will look like [(1, 1, B), (1, 2, C) (1, 2, D) (2, 2, D)].
let merged_parents_lists = parents_lists
.into_iter()
.enumerate() // Enumerate parents
.map(|(parent_order, parent_list)| {
// Enumerate entries inside single parent
parent_list.into_iter().enumerate().map(move |(idx, hash)| (idx, parent_order, hash))
})
.kmerge_by(|l, r| (l.0, l.1) < (r.0, r.1))
.map(|(_, _, cs_and_parents)| cs_and_parents)
.filter(|(cs_id, _)| used.insert(*cs_id));
once((merge_cs_id, parents_of_merge_commit))
.chain(merged_parents_lists)
.collect()
}
// Converts from an "external" representation (i.e. the one used by users of this library)
// to an "internal" representation (i.e. the one that we store in the blobstore).
fn convert_to_raw_list(
list: Vec<(ChangesetId, Vec<FastlogParent>)>,
) -> Vec<(ChangesetId, Vec<ParentOffset>)> {
let cs_to_idx: HashMap<_, _> = list
.iter()
.enumerate()
.map(|(idx, (cs_id, _))| (*cs_id, idx as i32))
.collect();
// Special offset that points outside of the list.
// It's used for unknown parents
let max_idx = (list.len() + 1) as i32;
let mut res = vec![];
for (current_idx, (cs_id, fastlog_parents)) in list.into_iter().enumerate() {
let current_idx = current_idx as i32;
let mut parent_offsets = vec![];
for p in fastlog_parents {
let maybe_idx = match p {
FastlogParent::Known(cs_id) => {
cs_to_idx.get(&cs_id).cloned().map(|idx| idx - current_idx)
}
FastlogParent::Unknown => None,
};
parent_offsets.push(ParentOffset::new(maybe_idx.unwrap_or(max_idx)))
}
res.push((cs_id, parent_offsets));
}
res
}
pub(crate) fn fetch_fastlog_batch_by_unode_id(
ctx: CoreContext,
blobstore: Arc<dyn Blobstore>,
@ -93,24 +194,33 @@ pub(crate) fn fetch_flattened(
ctx: CoreContext,
blobstore: Arc<dyn Blobstore>,
) -> impl Future<Item = Vec<(ChangesetId, Vec<FastlogParent>)>, Error = Error> {
batch.fetch_raw_list(ctx, blobstore).map(|full_batch| {
let mut res = vec![];
for (index, (cs_id, parent_offsets)) in full_batch.iter().enumerate() {
let mut batch_parents = vec![];
for offset in parent_offsets {
let parent_index = index + offset.num() as usize;
let batch_parent = match full_batch.get(parent_index) {
batch.fetch_raw_list(ctx, blobstore).map(flatten_raw_list)
}
fn flatten_raw_list(
raw_list: Vec<(ChangesetId, Vec<ParentOffset>)>,
) -> Vec<(ChangesetId, Vec<FastlogParent>)> {
let mut res = vec![];
for (index, (cs_id, parent_offsets)) in raw_list.iter().enumerate() {
let mut batch_parents = vec![];
for offset in parent_offsets {
// NOTE: Offset can be negative!
let parent_index = index as i32 + offset.num();
let batch_parent = if parent_index >= 0 {
match raw_list.get(parent_index as usize) {
Some((p_cs_id, _)) => FastlogParent::Known(*p_cs_id),
None => FastlogParent::Unknown,
};
batch_parents.push(batch_parent);
}
res.push((*cs_id, batch_parents));
}
} else {
FastlogParent::Unknown
};
batch_parents.push(batch_parent);
}
res
})
res.push((*cs_id, batch_parents));
}
res
}
#[cfg(test)]
@ -197,4 +307,90 @@ mod test {
Ok(())
}
#[test]
fn test_create_merged_list() -> Result<(), Error> {
assert_eq!(
create_merged_list(ONES_CSID, vec![]),
vec![(ONES_CSID, vec![])]
);
let first_parent = vec![(TWOS_CSID, vec![])];
let second_parent = vec![(THREES_CSID, vec![])];
assert_eq!(
create_merged_list(ONES_CSID, vec![first_parent, second_parent]),
vec![
(
ONES_CSID,
vec![
FastlogParent::Known(TWOS_CSID),
FastlogParent::Known(THREES_CSID)
]
),
(TWOS_CSID, vec![]),
(THREES_CSID, vec![]),
]
);
Ok(())
}
#[test]
fn test_create_merged_list_same_commit() -> Result<(), Error> {
assert_eq!(
create_merged_list(ONES_CSID, vec![]),
vec![(ONES_CSID, vec![])]
);
let first_parent = vec![(TWOS_CSID, vec![])];
let second_parent = vec![(TWOS_CSID, vec![])];
assert_eq!(
create_merged_list(ONES_CSID, vec![first_parent, second_parent]),
vec![
(ONES_CSID, vec![FastlogParent::Known(TWOS_CSID),]),
(TWOS_CSID, vec![]),
]
);
Ok(())
}
#[test]
fn test_convert_to_raw_list_simple() -> Result<(), Error> {
let list = vec![
(
ONES_CSID,
vec![
FastlogParent::Known(TWOS_CSID),
FastlogParent::Known(THREES_CSID),
],
),
(TWOS_CSID, vec![]),
(THREES_CSID, vec![]),
];
let raw_list = Vec::from(convert_to_raw_list(list.clone()));
let expected = vec![
(ONES_CSID, vec![ParentOffset::new(1), ParentOffset::new(2)]),
(TWOS_CSID, vec![]),
(THREES_CSID, vec![]),
];
assert_eq!(raw_list, expected);
assert_eq!(flatten_raw_list(raw_list), list);
let list = vec![
(ONES_CSID, vec![FastlogParent::Known(TWOS_CSID)]),
(TWOS_CSID, vec![FastlogParent::Known(THREES_CSID)]),
(THREES_CSID, vec![]),
];
let raw_list = Vec::from(convert_to_raw_list(list.clone()));
let expected = vec![
(ONES_CSID, vec![ParentOffset::new(1)]),
(TWOS_CSID, vec![ParentOffset::new(1)]),
(THREES_CSID, vec![]),
];
assert_eq!(raw_list, expected);
assert_eq!(flatten_raw_list(raw_list), list);
Ok(())
}
}

View File

@ -67,7 +67,7 @@ pub fn prefetch_history(
)
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum FastlogParent {
/// Parent exists and it's stored in the batch
Known(ChangesetId),
@ -291,9 +291,11 @@ mod tests {
use super::*;
use benchmark_lib::{GenManifest, GenSettings};
use blobrepo::save_bonsai_changesets;
use bookmarks::BookmarkName;
use context::CoreContext;
use fixtures::{
create_bonsai_changeset, create_bonsai_changeset_with_files, linear, store_files,
create_bonsai_changeset, create_bonsai_changeset_with_files, linear, merge_even,
merge_uneven, store_files, unshared_merge_even, unshared_merge_uneven,
};
use maplit::btreemap;
use mercurial_types::HgChangesetId;
@ -301,6 +303,7 @@ mod tests {
use pretty_assertions::assert_eq;
use rand::SeedableRng;
use rand_xorshift::XorShiftRng;
use revset::AncestorsNodeStream;
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::str::FromStr;
use tokio::runtime::Runtime;
@ -665,6 +668,68 @@ mod tests {
)?;
Ok(())
}
#[test]
fn test_derive_merges() -> Result<(), Error> {
let mut rt = Runtime::new().unwrap();
let ctx = CoreContext::test_mock();
{
let repo = merge_uneven::getrepo();
let all_commits = rt.block_on(all_commits(ctx.clone(), repo.clone()).collect())?;
for (bcs_id, _hg_cs_id) in all_commits {
verify_all_entries_for_commit(&mut rt, ctx.clone(), repo.clone(), bcs_id);
}
}
{
let repo = merge_even::getrepo();
let all_commits = rt.block_on(all_commits(ctx.clone(), repo.clone()).collect())?;
for (bcs_id, _hg_cs_id) in all_commits {
verify_all_entries_for_commit(&mut rt, ctx.clone(), repo.clone(), bcs_id);
}
}
{
let repo = unshared_merge_even::getrepo();
let all_commits = rt.block_on(all_commits(ctx.clone(), repo.clone()).collect())?;
for (bcs_id, _hg_cs_id) in all_commits {
verify_all_entries_for_commit(&mut rt, ctx.clone(), repo.clone(), bcs_id);
}
}
{
let repo = unshared_merge_uneven::getrepo();
let all_commits = rt.block_on(all_commits(ctx.clone(), repo.clone()).collect())?;
for (bcs_id, _hg_cs_id) in all_commits {
verify_all_entries_for_commit(&mut rt, ctx.clone(), repo.clone(), bcs_id);
}
}
Ok(())
}
fn all_commits(
ctx: CoreContext,
repo: BlobRepo,
) -> impl Stream<Item = (ChangesetId, HgChangesetId), Error = Error> {
let master_book = BookmarkName::new("master").unwrap();
repo.get_bonsai_bookmark(ctx.clone(), &master_book)
.map(move |maybe_bcs_id| {
let bcs_id = maybe_bcs_id.unwrap();
AncestorsNodeStream::new(ctx.clone(), &repo.get_changeset_fetcher(), bcs_id.clone())
.and_then(move |new_bcs_id| {
repo.get_hg_from_bonsai_changeset(ctx.clone(), new_bcs_id)
.map(move |hg_cs_id| (new_bcs_id, hg_cs_id))
})
})
.flatten_stream()
}
fn verify_all_entries_for_commit(
rt: &mut Runtime,
ctx: CoreContext,

View File

@ -269,5 +269,6 @@ typedef i32 ParentOffset (hs.newtype)
struct CompressedHashAndParents {
1: ChangesetId cs_id,
# Offsets can be negative!
2: list<ParentOffset> parent_offsets,
}

View File

@ -28,6 +28,10 @@ use std::sync::Arc;
pub struct ParentOffset(i32);
impl ParentOffset {
pub fn new(offset: i32) -> Self {
Self(offset)
}
pub fn num(&self) -> i32 {
self.0
}
@ -47,14 +51,15 @@ fn max_entries_in_fastlog_batch() -> usize {
}
impl FastlogBatch {
pub fn new_from_raw_list(
pub fn new_from_raw_list<I: IntoIterator<Item = (ChangesetId, Vec<ParentOffset>)>>(
ctx: CoreContext,
blobstore: Arc<dyn Blobstore>,
mut raw_list: VecDeque<(ChangesetId, Vec<ParentOffset>)>,
raw_list: I,
) -> impl Future<Item = FastlogBatch, Error = Error> {
raw_list.truncate(max_entries_in_fastlog_batch());
let chunks = raw_list.into_iter().chunks(MAX_LATEST_LEN);
let chunks = raw_list
.into_iter()
.take(max_entries_in_fastlog_batch())
.chunks(MAX_LATEST_LEN);
let chunks: Vec<_> = chunks.into_iter().map(VecDeque::from_iter).collect();
let mut chunks = chunks.into_iter();
let latest = chunks.next().unwrap_or(VecDeque::new());