megarepolib: impl create delete commits

Summary:
This adds `megarepolib` support for pre-merge "delete" commits creation.
Please see `create_sharded_delete_commits` docstring for explanation of what
these "delete" commits are.

This functionality is not used anywhere yet (I intend to use it from
`megarepotool`), so I've added what I think is a reasonble test coverage.

Reviewed By: StanislavGlebik

Differential Revision: D22724946

fbshipit-source-id: a8144c47b92cb209bb1d0799f8df93450c3ef29f
This commit is contained in:
Kostia Balytskyi 2020-07-26 05:14:31 -07:00 committed by Facebook GitHub Bot
parent 1a376252e9
commit 24b4b02df6
3 changed files with 571 additions and 12 deletions

View File

@ -0,0 +1,152 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
pub type Chunker<T> = Box<dyn Fn(Vec<T>) -> Vec<Vec<T>>>;
/// Produce a chunker fn, which breaks a vector into `num_chunks` pieces,
/// such that piece sizes initially gradually grow, but after that are as
/// even as possible
pub fn gradually_increasing_chunker<T: Clone>(num_chunks: usize) -> Chunker<T> {
Box::new(move |items: Vec<T>| {
let sizes = get_gradual_chunk_sizes(num_chunks, items.len());
let mut remainder = items.as_slice();
let mut chunks: Vec<Vec<T>> = Vec::new();
for current_size in sizes {
let (iteration_slice, new_remainder) = remainder.split_at(current_size);
let new_vec = iteration_slice.to_vec();
chunks.push(new_vec);
remainder = new_remainder
}
chunks
})
}
/// Chunk `items` elements into `chunks` pieces as evenly as possible
/// Return the number of items in each chunk
fn fill_evenly(chunks: usize, mut items: usize) -> Vec<usize> {
let mut sizes = vec![];
let fill_in = (items / chunks) + if items % chunks == 0 { 0 } else { 1 };
while items > 0 {
if items > fill_in {
sizes.push(fill_in);
items -= fill_in;
} else {
sizes.push(items);
items = 0;
}
}
sizes
}
/// Chunk `num_items` elements into at most `num_chunks` pieces, where
/// the chunking first increases gradually, then is as even as possible
/// Return the number of elements in each chunk
fn get_gradual_chunk_sizes(num_chunks: usize, num_items: usize) -> Vec<usize> {
let prefix_sizes = [1, 10, 100, 1000];
let mut remaining_items = num_items;
let mut sizes = vec![];
let gradually_growing_prefixes = std::cmp::min(prefix_sizes.len(), num_chunks - 1);
for i in 0..gradually_growing_prefixes {
if remaining_items > prefix_sizes[i] {
sizes.push(prefix_sizes[i]);
remaining_items -= prefix_sizes[i];
} else {
sizes.push(remaining_items);
remaining_items = 0;
break;
}
}
let remaining_chunks = num_chunks - sizes.len();
if remaining_items == 0 {
return sizes;
}
// `remaining_chunks` cannot be 0 here, as
// `gradually_growing_prefixes` was bound by `num_chunks - 1`
// Still, let's assert for easier debugging if something breaks it
assert_ne!(
remaining_chunks, 0,
"Logic error: filled all the chunks ({}) with less than all items ({})",
num_chunks, num_items,
);
sizes.extend(fill_evenly(remaining_chunks, remaining_items));
sizes
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_gradual_chunk_sizes() {
assert_eq!(get_gradual_chunk_sizes(1, 1), vec![1]);
assert_eq!(get_gradual_chunk_sizes(2, 1), vec![1]);
assert_eq!(get_gradual_chunk_sizes(2, 2), vec![1, 1]);
assert_eq!(get_gradual_chunk_sizes(3, 10), vec![1, 9]);
assert_eq!(get_gradual_chunk_sizes(4, 10), vec![1, 9]);
assert_eq!(get_gradual_chunk_sizes(1, 10), vec![10]);
assert_eq!(get_gradual_chunk_sizes(2, 10), vec![1, 9]);
assert_eq!(get_gradual_chunk_sizes(2, 11), vec![1, 10]);
assert_eq!(get_gradual_chunk_sizes(2, 12), vec![1, 11]);
assert_eq!(get_gradual_chunk_sizes(3, 12), vec![1, 10, 1]);
assert_eq!(get_gradual_chunk_sizes(3, 110), vec![1, 10, 99]);
assert_eq!(get_gradual_chunk_sizes(3, 111), vec![1, 10, 100]);
assert_eq!(get_gradual_chunk_sizes(3, 112), vec![1, 10, 101]);
assert_eq!(get_gradual_chunk_sizes(4, 1110), vec![1, 10, 100, 999]);
assert_eq!(get_gradual_chunk_sizes(4, 1111), vec![1, 10, 100, 1000]);
assert_eq!(get_gradual_chunk_sizes(4, 1112), vec![1, 10, 100, 1001]);
assert_eq!(get_gradual_chunk_sizes(5, 1112), vec![1, 10, 100, 1000, 1]);
assert_eq!(
get_gradual_chunk_sizes(5, 10000),
vec![1, 10, 100, 1000, 8889]
);
assert_eq!(
get_gradual_chunk_sizes(6, 10000),
vec![1, 10, 100, 1000, 4445, 4444]
);
let chunked = get_gradual_chunk_sizes(100, 1_000_000);
assert_eq!(chunked.len(), 100);
assert_eq!(chunked.iter().sum::<usize>(), 1_000_000);
assert_eq!(chunked[0], 1);
assert_eq!(chunked[1], 10);
assert_eq!(chunked[2], 100);
assert_eq!(chunked[3], 1000);
assert_eq!(chunked[4], (1_000_000 - 1111) / 96 + 1);
assert_eq!(chunked[98], (1_000_000 - 1111) / 96 + 1);
}
#[test]
fn test_gradually_increasing_chunker() {
let chunker = gradually_increasing_chunker(3);
assert_eq!(chunker(vec![1]), vec![vec![1]]);
assert_eq!(chunker(vec![1, 2]), vec![vec![1], vec![2]]);
assert_eq!(chunker(vec![1, 2, 3, 4]), vec![vec![1], vec![2, 3, 4]]);
let chunker = gradually_increasing_chunker(100);
let v: Vec<u8> = vec![0; 1_000_000];
let chunks = chunker(v);
assert_eq!(chunks.len(), 100);
assert_eq!(
chunks.iter().map(|chunk| chunk.len()).sum::<usize>(),
1_000_000
);
assert_eq!(chunks[0].len(), 1);
assert_eq!(chunks[1].len(), 10);
assert_eq!(chunks[2].len(), 100);
assert_eq!(chunks[3].len(), 1000);
assert_eq!(chunks[4].len(), (1_000_000 - 1111) / 96 + 1);
assert_eq!(chunks[98].len(), (1_000_000 - 1111) / 96 + 1);
}
}

View File

@ -17,7 +17,7 @@ use mercurial_types::{HgChangesetId, MPath};
use mononoke_types::{BonsaiChangeset, BonsaiChangesetMut, ChangesetId, DateTime, FileChange};
use std::collections::BTreeMap;
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct ChangesetArgs {
pub author: String,
pub message: String,
@ -26,13 +26,26 @@ pub struct ChangesetArgs {
pub mark_public: bool,
}
pub async fn create_and_save_changeset(
pub trait ChangesetArgsFactory = Fn(usize) -> ChangesetArgs;
pub async fn create_save_and_generate_hg_changeset(
ctx: &CoreContext,
repo: &BlobRepo,
parents: Vec<ChangesetId>,
file_changes: BTreeMap<MPath, Option<FileChange>>,
changeset_args: ChangesetArgs,
) -> Result<HgChangesetId, Error> {
let bcs_id = create_and_save_bonsai(ctx, repo, parents, file_changes, changeset_args).await?;
generate_hg_changeset(ctx, repo, bcs_id).await
}
pub async fn create_and_save_bonsai(
ctx: &CoreContext,
repo: &BlobRepo,
parents: Vec<ChangesetId>,
file_changes: BTreeMap<MPath, Option<FileChange>>,
changeset_args: ChangesetArgs,
) -> Result<ChangesetId, Error> {
let ChangesetArgs {
author,
message,
@ -46,7 +59,8 @@ pub async fn create_and_save_changeset(
if let Some(bookmark) = maybe_bookmark {
create_bookmark(ctx, repo, bookmark, bcs_id).await?;
}
generate_hg_changeset(ctx, repo, bcs_id).await
Ok(bcs_id)
}
async fn save_and_maybe_mark_public(

View File

@ -5,6 +5,9 @@
* GNU General Public License version 2.
*/
#![feature(trait_alias)]
#![deny(warnings)]
use anyhow::{anyhow, Error};
use blobrepo::BlobRepo;
use blobrepo_hg::BlobRepoHg;
@ -23,11 +26,16 @@ use mercurial_types::{
use mononoke_types::{ChangesetId, ContentId, FileChange, FileType};
use movers::Mover;
use slog::info;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};
use std::num::NonZeroU64;
pub mod chunking;
pub mod common;
use crate::common::{create_and_save_changeset, ChangesetArgs};
use crate::chunking::Chunker;
use crate::common::{
create_and_save_bonsai, create_save_and_generate_hg_changeset, ChangesetArgs,
ChangesetArgsFactory,
};
const BUFFER_SIZE: usize = 100;
const REPORTING_INTERVAL_FILES: usize = 10000;
@ -124,7 +132,7 @@ pub async fn perform_stack_move<'a>(
parent_bcs_id: ChangesetId,
path_converter: Mover,
max_num_of_moves_in_commit: NonZeroU64,
resulting_changeset_args: impl Fn(usize) -> ChangesetArgs,
resulting_changeset_args: impl ChangesetArgsFactory,
) -> Result<Vec<HgChangesetId>, Error> {
perform_stack_move_impl(
ctx,
@ -150,7 +158,7 @@ async fn perform_stack_move_impl<'a, Chunker>(
mut parent_bcs_id: ChangesetId,
path_converter: Mover,
chunker: Chunker,
resulting_changeset_args: impl Fn(usize) -> ChangesetArgs,
resulting_changeset_args: impl ChangesetArgsFactory,
) -> Result<Vec<HgChangesetId>, Error>
where
Chunker: Fn(Vec<FileMove>) -> Vec<Vec<FileMove>>,
@ -193,7 +201,7 @@ where
}
}
let hg_cs_id = create_and_save_changeset(
let hg_cs_id = create_save_and_generate_hg_changeset(
&ctx,
&repo,
vec![parent_bcs_id],
@ -213,6 +221,160 @@ where
Ok(res)
}
async fn get_all_working_copy_paths(
ctx: &CoreContext,
repo: &BlobRepo,
cs_id: ChangesetId,
) -> Result<Vec<MPath>, Error> {
let hg_cs_id = repo
.get_hg_from_bonsai_changeset(ctx.clone(), cs_id)
.compat()
.await?;
let hg_cs = hg_cs_id.load(ctx.clone(), repo.blobstore()).await?;
let paths_sorted = {
let mut paths = hg_cs
.manifestid()
.list_leaf_entries(ctx.clone(), repo.blobstore().clone())
.compat()
.map_ok(|(mpath, _)| mpath)
.try_collect::<Vec<MPath>>()
.await?;
paths.sort();
paths
};
Ok(paths_sorted)
}
/// Create pre-merge delete commits
/// Our gradual merge approach is like this:
/// ```text
/// M1
/// . \
/// . D1
/// M2 \
/// . \ |
/// . D2 |
/// o \|
/// | |
/// o PM
///
/// ^ ^
/// | \
/// main DAG merged repo's DAG
/// ```
/// Where:
/// - `M1`, `M2` - merge commits, each of which merges only a chunk
/// of the merged repo's DAG
/// - `PM` is a pre-merge master of the merged repo's DAG
/// - `D1` and `D2` are commits, which delete everything except
/// for a chunk of working copy each. These commits are needed
/// to make partial merge possible. The union of `D1`, `D2`, ...
/// working copies must equal the whole `PM` working copy. These
/// deletion commits do not form a stack, they are all parented
/// by `PM`
///
/// This function creates a set of such commits, parented
/// by `parent_bcs_id`. Files in the working copy are sharded
/// according to the `chunker` fn.
pub async fn create_sharded_delete_commits<'a>(
ctx: &'a CoreContext,
repo: &'a BlobRepo,
parent_bcs_id: ChangesetId,
chunker: Chunker<MPath>,
resulting_changeset_args: impl ChangesetArgsFactory,
) -> Result<Vec<ChangesetId>, Error> {
let all_mpaths: Vec<MPath> = get_all_working_copy_paths(ctx, repo, parent_bcs_id).await?;
let chunked_mpaths = {
let chunked_mpaths = chunker(all_mpaths.clone());
// Sanity check: total number of files before and after chunking is the same
// (together with a check below this also ensures that we didn't duplicate
// any file)
let before_count = all_mpaths.len();
let after_count = chunked_mpaths
.iter()
.map(|chunk| chunk.len())
.sum::<usize>();
if before_count != after_count {
return Err(anyhow!(
"File counts before ({}) and after ({}) chunking are different",
before_count,
after_count,
));
}
// Sanity check that we have not dropped any file
let before: HashSet<&MPath> = all_mpaths.iter().collect();
let after: HashSet<&MPath> = chunked_mpaths
.iter()
.map(|chunk| chunk.iter())
.flatten()
.collect();
if before != after {
let lost_paths: Vec<&MPath> = before.difference(&after).take(5).map(|mp| *mp).collect();
return Err(anyhow!(
"Chunker lost some paths, for example: {:?}",
lost_paths
));
}
chunked_mpaths
};
let delete_commit_creation_futs = chunked_mpaths.into_iter().enumerate().map(|(i, chunk)| {
let changeset_args = resulting_changeset_args(i);
create_delete_commit(
ctx,
repo,
&parent_bcs_id,
&all_mpaths,
chunk.into_iter().collect(),
changeset_args,
)
});
future::try_join_all(delete_commit_creation_futs).await
}
async fn create_delete_commit(
ctx: &CoreContext,
repo: &BlobRepo,
parent_bcs_id: &ChangesetId,
all_files: &Vec<MPath>,
files_to_keep: HashSet<MPath>,
changeset_args: ChangesetArgs,
) -> Result<ChangesetId, Error> {
let file_changes: BTreeMap<MPath, Option<FileChange>> = all_files
.iter()
.filter_map(|mpath| {
if files_to_keep.contains(mpath) {
None
} else {
Some((mpath.clone(), None))
}
})
.collect();
info!(
ctx.logger(),
"Creating a delete commit for {} files with {:?}",
file_changes.len(),
changeset_args
);
create_and_save_bonsai(
ctx,
repo,
vec![parent_bcs_id.clone()],
file_changes,
changeset_args,
)
.await
}
#[cfg(test)]
mod test {
use super::*;
@ -223,7 +385,7 @@ mod test {
use fixtures::{linear, many_files_dirs};
use futures::{compat::Future01CompatExt, future::TryFutureExt};
use futures_old::{stream::Stream, Future};
use maplit::btreemap;
use maplit::{btreemap, hashset};
use mercurial_types::HgChangesetId;
use mononoke_types::{BonsaiChangeset, BonsaiChangesetMut, DateTime};
use std::str::FromStr;
@ -458,14 +620,12 @@ mod test {
let old_bcs_id = resolve_cs_id(&ctx, &repo, "master").await?;
let create_cs_args = |num| ChangesetArgs {
author: "user".to_string(),
message: format!("I like to move it: {}", num),
message: format!("I like to delete it: {}", num),
datetime: DateTime::from_rfc3339("1985-04-12T23:20:50.52Z").unwrap(),
bookmark: None,
mark_public: false,
};
let repo = linear::getrepo(fb).await;
let stack = perform_stack_move(
&ctx,
&repo,
@ -508,4 +668,237 @@ mod test {
Ok(())
}
#[fbinit::compat_test]
async fn test_create_delete_commit(fb: FacebookInit) -> Result<(), Error> {
let repo = linear::getrepo(fb).await;
let ctx = CoreContext::test_mock(fb);
let master_bcs_id = resolve_cs_id(&ctx, &repo, "master").await?;
let changeset_args = ChangesetArgs {
author: "user".to_string(),
message: "I like to delete it".to_string(),
datetime: DateTime::from_rfc3339("1985-04-12T23:20:50.52Z").unwrap(),
bookmark: None,
mark_public: false,
};
let all_mpaths = get_all_working_copy_paths(&ctx, &repo, master_bcs_id).await?;
let files_to_keep = hashset!(MPath::new("6")?);
let deletion_cs_id = create_delete_commit(
&ctx,
&repo,
&master_bcs_id,
&all_mpaths,
files_to_keep.clone(),
changeset_args,
)
.await?;
let new_all_mpaths: HashSet<_> = get_all_working_copy_paths(&ctx, &repo, deletion_cs_id)
.await?
.into_iter()
.collect();
assert_eq!(files_to_keep, new_all_mpaths);
Ok(())
}
#[fbinit::compat_test]
async fn test_create_delete_commits_one_per_file(fb: FacebookInit) -> Result<(), Error> {
let repo = linear::getrepo(fb).await;
let ctx = CoreContext::test_mock(fb);
let master_bcs_id = resolve_cs_id(&ctx, &repo, "master").await?;
let changeset_args_factory = |num| ChangesetArgs {
author: "user".to_string(),
message: format!("I like to delete it: {}", num),
datetime: DateTime::from_rfc3339("1985-04-12T23:20:50.52Z").unwrap(),
bookmark: None,
mark_public: false,
};
let chunker: Chunker<MPath> = Box::new(|files: Vec<MPath>| {
files
.into_iter()
.map(|file| vec![file])
.collect::<Vec<Vec<MPath>>>()
});
let commits = create_sharded_delete_commits(
&ctx,
&repo,
master_bcs_id,
chunker,
changeset_args_factory,
)
.await?;
let all_mpaths_at_master: HashSet<_> =
get_all_working_copy_paths(&ctx, &repo, master_bcs_id)
.await?
.into_iter()
.collect();
let all_mpaths_at_commits = future::try_join_all(
commits
.iter()
.map(|cs_id| get_all_working_copy_paths(&ctx, &repo, *cs_id)),
)
.await?;
for mpaths_at_commit in &all_mpaths_at_commits {
assert_eq!(mpaths_at_commit.len(), 1);
}
let all_mpaths_at_commits: HashSet<MPath> = all_mpaths_at_commits
.into_iter()
.map(|mpaths_at_commit| mpaths_at_commit.into_iter())
.flatten()
.collect();
assert_eq!(all_mpaths_at_commits, all_mpaths_at_master);
Ok(())
}
#[fbinit::compat_test]
async fn test_create_delete_commits_two_commits(fb: FacebookInit) -> Result<(), Error> {
let repo = linear::getrepo(fb).await;
let ctx = CoreContext::test_mock(fb);
let master_bcs_id = resolve_cs_id(&ctx, &repo, "master").await?;
let changeset_args_factory = |num| ChangesetArgs {
author: "user".to_string(),
message: format!("I like to delete it: {}", num),
datetime: DateTime::from_rfc3339("1985-04-12T23:20:50.52Z").unwrap(),
bookmark: None,
mark_public: false,
};
let chunker: Chunker<MPath> = Box::new(|files: Vec<MPath>| {
let (v1, v2) = files.split_at(1);
vec![v1.to_vec(), v2.to_vec()]
});
let commits = create_sharded_delete_commits(
&ctx,
&repo,
master_bcs_id,
chunker,
changeset_args_factory,
)
.await?;
let all_mpaths_at_master: HashSet<_> =
get_all_working_copy_paths(&ctx, &repo, master_bcs_id)
.await?
.into_iter()
.collect();
let all_mpaths_at_commits = future::try_join_all(
commits
.iter()
.map(|cs_id| get_all_working_copy_paths(&ctx, &repo, *cs_id)),
)
.await?;
assert_eq!(all_mpaths_at_commits[0].len(), 1);
assert_eq!(
all_mpaths_at_commits[1].len(),
all_mpaths_at_master.len() - 1
);
let all_mpaths_at_commits: HashSet<MPath> = all_mpaths_at_commits
.into_iter()
.map(|mpaths_at_commit| mpaths_at_commit.into_iter())
.flatten()
.collect();
assert_eq!(all_mpaths_at_commits, all_mpaths_at_master);
Ok(())
}
#[fbinit::compat_test]
async fn test_create_delete_commits_invalid_chunker(fb: FacebookInit) -> Result<(), Error> {
let repo = linear::getrepo(fb).await;
let ctx = CoreContext::test_mock(fb);
let master_bcs_id = resolve_cs_id(&ctx, &repo, "master").await?;
let changeset_args_factory = |num| ChangesetArgs {
author: "user".to_string(),
message: format!("I like to delete it: {}", num),
datetime: DateTime::from_rfc3339("1985-04-12T23:20:50.52Z").unwrap(),
bookmark: None,
mark_public: false,
};
// fewer files
let chunker: Chunker<MPath> = Box::new(|files: Vec<MPath>| {
let (_, v2) = files.split_at(1);
vec![v2.to_vec()]
});
let commits_res = create_sharded_delete_commits(
&ctx,
&repo,
master_bcs_id,
chunker,
changeset_args_factory,
)
.await;
assert!(commits_res.is_err());
// more files
let chunker: Chunker<MPath> = Box::new(|files: Vec<MPath>| {
let (_, v2) = files.split_at(1);
vec![v2.to_vec(), v2.to_vec()]
});
let commits_res = create_sharded_delete_commits(
&ctx,
&repo,
master_bcs_id,
chunker,
changeset_args_factory,
)
.await;
assert!(commits_res.is_err());
// correct number, but unrelated files
let chunker: Chunker<MPath> = Box::new(|files: Vec<MPath>| {
let (_, v2) = files.split_at(1);
vec![vec![MPath::new("ababagalamaga").unwrap()], v2.to_vec()]
});
let commits_res = create_sharded_delete_commits(
&ctx,
&repo,
master_bcs_id,
chunker,
changeset_args_factory,
)
.await;
assert!(commits_res.is_err());
// duplicated files
let chunker: Chunker<MPath> = Box::new(|files: Vec<MPath>| {
let (_, v2) = files.split_at(1);
vec![v2.to_vec(), files]
});
let commits_res = create_sharded_delete_commits(
&ctx,
&repo,
master_bcs_id,
chunker,
changeset_args_factory,
)
.await;
assert!(commits_res.is_err());
Ok(())
}
}