mononoke: add catchup delete head subcommand to megarepo

Reviewed By: ikostia

Differential Revision: D23597187

fbshipit-source-id: da4710aabfc161a69d80c361dd593a3e7749e941
This commit is contained in:
Stanislau Hlebik 2020-09-11 10:37:25 -07:00 committed by Facebook GitHub Bot
parent 4d76a4c241
commit d5cafbb432
4 changed files with 409 additions and 6 deletions

View File

@ -23,6 +23,8 @@ cmdlib = { path = "../../cmdlib" }
cmdlib_x_repo = { path = "../../cmdlib/x_repo" }
context = { path = "../../server/context" }
cross_repo_sync = { path = "../cross_repo_sync" }
derived_data = { path = "../../derived_data" }
fsnodes = { path = "../../derived_data/fsnodes" }
live_commit_sync_config = { path = "../live_commit_sync_config" }
manifest = { path = "../../manifest" }
mercurial_types = { path = "../../mercurial/types" }
@ -43,6 +45,7 @@ futures = { version = "0.3.5", features = ["async-await", "compat"] }
futures-old = { package = "futures", version = "0.1" }
itertools = "0.8"
maplit = "1.0"
regex = "1.3.7"
slog = { version = "2.5", features = ["max_level_debug"] }
[dev-dependencies]

View File

@ -0,0 +1,282 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use anyhow::{anyhow, Error};
use blobrepo::BlobRepo;
use blobrepo_hg::BlobRepoHg;
use blobstore::Loadable;
use bookmarks::BookmarkName;
use context::CoreContext;
use derived_data::BonsaiDerived;
use fsnodes::RootFsnodeId;
use futures::{
compat::{Future01CompatExt, Stream01CompatExt},
future::{self, try_join},
TryStreamExt,
};
use itertools::Itertools;
use manifest::{Diff, ManifestOps};
use maplit::hashset;
use megarepolib::common::{create_and_save_bonsai, ChangesetArgsFactory, StackPosition};
use metaconfig_types::PushrebaseFlags;
use mononoke_types::{ChangesetId, MPath};
use pushrebase::do_pushrebase_bonsai;
use regex::Regex;
use slog::info;
pub async fn create_deletion_head_commits<'a>(
ctx: &'a CoreContext,
repo: &'a BlobRepo,
head_bookmark: BookmarkName,
commit_to_merge: ChangesetId,
path_regex: Regex,
deletion_chunk_size: usize,
cs_args_factory: Box<dyn ChangesetArgsFactory>,
pushrebase_flags: &'a PushrebaseFlags,
) -> Result<(), Error> {
let files =
find_files_that_need_to_be_deleted(ctx, repo, &head_bookmark, commit_to_merge, path_regex)
.await?;
info!(ctx.logger(), "total files to delete is {}", files.len());
for (num, chunk) in files
.into_iter()
.chunks(deletion_chunk_size)
.into_iter()
.enumerate()
{
let files = chunk.into_iter().map(|path| (path, None)).collect();
let maybe_head_bookmark_val = repo
.get_bonsai_bookmark(ctx.clone(), &head_bookmark)
.compat()
.await?;
let head_bookmark_val =
maybe_head_bookmark_val.ok_or(anyhow!("{} not found", head_bookmark))?;
let bcs_id = create_and_save_bonsai(
&ctx,
&repo,
vec![head_bookmark_val],
files,
cs_args_factory(StackPosition(num)),
)
.await?;
info!(
ctx.logger(),
"created bonsai #{}. Deriving hg changeset for it to verify its correctness", num
);
let hg_cs_id = repo
.get_hg_from_bonsai_changeset(ctx.clone(), bcs_id)
.compat()
.await?;
info!(ctx.logger(), "derived {}, pushrebasing...", hg_cs_id);
let bcs = bcs_id.load(ctx.clone(), repo.blobstore()).await?;
let pushrebase_res = do_pushrebase_bonsai(
&ctx,
&repo,
pushrebase_flags,
&head_bookmark,
&hashset![bcs],
None,
&[],
)
.await?;
info!(ctx.logger(), "Pushrebased to {}", pushrebase_res.head);
}
Ok(())
}
// Returns paths of the files that:
// 1) Match `path_regex`
// 2) Either do not exist in `commit_to_merge` or have different content/filetype.
async fn find_files_that_need_to_be_deleted(
ctx: &CoreContext,
repo: &BlobRepo,
head_bookmark: &BookmarkName,
commit_to_merge: ChangesetId,
path_regex: Regex,
) -> Result<Vec<MPath>, Error> {
let maybe_head_bookmark_val = repo
.get_bonsai_bookmark(ctx.clone(), head_bookmark)
.compat()
.await?;
let head_bookmark_val =
maybe_head_bookmark_val.ok_or(anyhow!("{} not found", head_bookmark))?;
let head_root_fsnode = RootFsnodeId::derive(ctx.clone(), repo.clone(), head_bookmark_val);
let commit_to_merge_root_fsnode =
RootFsnodeId::derive(ctx.clone(), repo.clone(), commit_to_merge);
let (head_root_fsnode, commit_to_merge_root_fsnode) = try_join(
head_root_fsnode.compat(),
commit_to_merge_root_fsnode.compat(),
)
.await?;
let paths = head_root_fsnode
.fsnode_id()
.diff(
ctx.clone(),
repo.get_blobstore(),
*commit_to_merge_root_fsnode.fsnode_id(),
)
.compat()
.try_filter_map(|diff| async move {
use Diff::*;
let maybe_path = match diff {
Added(_maybe_path, _entry) => None,
Removed(maybe_path, entry) => entry.into_leaf().and_then(|_| maybe_path),
Changed(maybe_path, _old_entry, new_entry) => {
new_entry.into_leaf().and_then(|_| maybe_path)
}
};
Ok(maybe_path)
})
.try_filter(|path| future::ready(path.matches_regex(&path_regex)))
.try_collect::<Vec<_>>()
.await?;
Ok(paths)
}
#[cfg(test)]
mod test {
use super::*;
use fbinit::FacebookInit;
use megarepolib::common::ChangesetArgs;
use mononoke_types::DateTime;
use revset::RangeNodeStream;
use tests_utils::{bookmark, resolve_cs_id, CreateCommitContext};
const PATH_REGEX: &'static str = "^(unchanged/.*|changed/.*|toremove/.*)";
#[fbinit::compat_test]
async fn test_find_files_that_needs_to_be_deleted(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let repo = prepare_repo(&ctx).await?;
let commit_to_merge = resolve_cs_id(&ctx, &repo, "commit_to_merge").await?;
let book = BookmarkName::new("book")?;
let mut paths = find_files_that_need_to_be_deleted(
&ctx,
&repo,
&book,
commit_to_merge,
Regex::new(PATH_REGEX)?,
)
.await?;
paths.sort();
assert_eq!(
paths,
vec![
MPath::new("changed/a")?,
MPath::new("changed/b")?,
MPath::new("toremove/file1")?,
MPath::new("toremove/file2")?,
]
);
Ok(())
}
#[fbinit::compat_test]
async fn test_create_deletion_head_commits(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let repo = prepare_repo(&ctx).await?;
let book = BookmarkName::new("book")?;
let commit_to_merge = resolve_cs_id(&ctx, &repo, "commit_to_merge").await?;
let args_factory = Box::new(|stack_pos: StackPosition| ChangesetArgs {
author: "author".to_string(),
message: format!("{}", stack_pos.0),
datetime: DateTime::now(),
bookmark: None,
mark_public: false,
});
let pushrebase_flags = {
let mut flags = PushrebaseFlags::default();
flags.rewritedates = true;
flags.forbid_p2_root_rebases = true;
flags.casefolding_check = true;
flags.recursion_limit = None;
flags
};
let commit_before_push = resolve_cs_id(&ctx, &repo, book.clone()).await?;
create_deletion_head_commits(
&ctx,
&repo,
book.clone(),
commit_to_merge,
Regex::new(PATH_REGEX)?,
1,
args_factory,
&pushrebase_flags,
)
.await?;
let commit_after_push = resolve_cs_id(&ctx, &repo, book.clone()).await?;
let range: Vec<_> = RangeNodeStream::new(
ctx.clone(),
repo.get_changeset_fetcher(),
commit_before_push,
commit_after_push,
)
.compat()
.try_collect()
.await?;
// 4 new commits + commit_before_push
assert_eq!(range.len(), 4 + 1);
let paths = find_files_that_need_to_be_deleted(
&ctx,
&repo,
&book,
commit_to_merge,
Regex::new(PATH_REGEX)?,
)
.await?;
assert!(paths.is_empty());
Ok(())
}
async fn prepare_repo(ctx: &CoreContext) -> Result<BlobRepo, Error> {
let repo = blobrepo_factory::new_memblob_empty(None)?;
let head_commit = CreateCommitContext::new_root(ctx, &repo)
.add_file("unrelated_file", "a")
.add_file("unchanged/a", "a")
.add_file("changed/a", "oldcontent")
.add_file("changed/b", "oldcontent")
.add_file("toremove/file1", "content")
.add_file("toremove/file2", "content")
.commit()
.await?;
let commit_to_merge = CreateCommitContext::new_root(ctx, &repo)
.add_file("unchanged/a", "a")
.add_file("changed/a", "newcontent")
.add_file("changed/b", "newcontent")
.commit()
.await?;
bookmark(&ctx, &repo, "book").set_to(head_commit).await?;
bookmark(&ctx, &repo, "commit_to_merge")
.set_to(commit_to_merge)
.await?;
Ok(repo)
}
}

View File

@ -38,10 +38,15 @@ pub const MAX_NUM_OF_MOVES_IN_COMMIT: &'static str = "max-num-of-moves-in-commit
pub const CHUNKING_HINT_FILE: &'static str = "chunking-hint-file";
pub const PARENTS: &'static str = "parents";
pub const PRE_MERGE_DELETE: &'static str = "pre-merge-delete";
pub const CATCHUP_DELETE_HEAD: &'static str = "create-catchup-head-deletion-commits";
pub const EVEN_CHUNK_SIZE: &'static str = "even-chunk-size";
pub const BONSAI_MERGE: &'static str = "bonsai-merge";
pub const BONSAI_MERGE_P1: &'static str = "bonsai-merge-p1";
pub const BONSAI_MERGE_P2: &'static str = "bonsai-merge-p2";
pub const HEAD_BOOKMARK: &'static str = "head-bookmark";
pub const TO_MERGE_CS_ID: &'static str = "to-merge-cs-id";
pub const PATH_REGEX: &'static str = "path-regex";
pub const DELETION_CHUNK_SIZE: &'static str = "deletion-chunk-size";
pub fn cs_args_from_matches<'a>(sub_m: &ArgMatches<'a>) -> BoxFuture<ChangesetArgs, Error> {
let message = try_boxfuture!(
@ -94,6 +99,14 @@ pub fn get_delete_commits_cs_args_factory<'a>(
})
}
pub fn get_catchup_head_delete_commits_cs_args_factory<'a>(
sub_m: &ArgMatches<'a>,
) -> Result<Box<dyn ChangesetArgsFactory>, Error> {
get_commit_factory(sub_m, |s, num| -> String {
format!("[MEGAREPO CATCHUP DELETE] {} ({})", s, num)
})
}
pub fn get_gradual_merge_commits_cs_args_factory<'a>(
sub_m: &ArgMatches<'a>,
) -> Result<Box<dyn ChangesetArgsFactory>, Error> {
@ -363,6 +376,55 @@ pub fn setup_app<'a, 'b>() -> App<'a, 'b> {
.required(true),
);
let catchup_delete_head_subcommand = SubCommand::with_name(CATCHUP_DELETE_HEAD)
.about("Create delete commits for 'catchup strategy. \
This is normally used after invisible merge is done, but small repo got a few new commits
that needs merging.
O <- head bookmark
|
O O <- new commits (we want to merge them in master)
| ...
IM | <- invisible merge commit
|\\ /
O O
This command create deletion commits on top of master bookmark for files that were changed in new commits,
and pushrebases them.
After all of the commits are pushrebased paths that match --path-regex in head bookmark should be a subset
of all paths that match --path-regex in the latest new commit we want to merge.
")
.arg(
Arg::with_name(HEAD_BOOKMARK)
.long(HEAD_BOOKMARK)
.help("commit to merge into")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name(TO_MERGE_CS_ID)
.long(TO_MERGE_CS_ID)
.help("commit to merge")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name(PATH_REGEX)
.long(PATH_REGEX)
.help("regex that matches all paths that should be merged in head commit")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name(DELETION_CHUNK_SIZE)
.long(DELETION_CHUNK_SIZE)
.help("how many files to delete in a single commit")
.default_value("10000")
.takes_value(true)
.required(false),
);
args::MononokeApp::new("megarepo preparation tool")
.with_advanced_args_hidden()
.with_source_and_target_repos()
@ -375,4 +437,7 @@ pub fn setup_app<'a, 'b>() -> App<'a, 'b> {
.subcommand(add_light_resulting_commit_args(gradual_merge_subcommand))
.subcommand(gradual_merge_progress_subcommand)
.subcommand(manual_commit_sync_subcommand)
.subcommand(add_light_resulting_commit_args(
catchup_delete_head_subcommand,
))
}

View File

@ -23,12 +23,14 @@ use live_commit_sync_config::{CfgrLiveCommitSyncConfig, LiveCommitSyncConfig};
use metaconfig_types::RepoConfig;
use mononoke_types::RepositoryId;
use movers::get_small_to_large_mover;
use regex::Regex;
use skiplist::fetch_skiplist_index;
use slog::info;
use std::collections::BTreeMap;
use std::num::NonZeroU64;
use synced_commit_mapping::SqlSyncedCommitMapping;
mod catchup;
mod cli;
mod gradual_merge;
mod manual_commit_sync;
@ -36,12 +38,14 @@ mod merging;
mod sync_diamond_merge;
use crate::cli::{
cs_args_from_matches, get_delete_commits_cs_args_factory,
get_gradual_merge_commits_cs_args_factory, setup_app, BONSAI_MERGE, BONSAI_MERGE_P1,
BONSAI_MERGE_P2, CHANGESET, CHUNKING_HINT_FILE, COMMIT_BOOKMARK, COMMIT_HASH, DRY_RUN,
EVEN_CHUNK_SIZE, FIRST_PARENT, GRADUAL_MERGE, GRADUAL_MERGE_PROGRESS, LAST_DELETION_COMMIT,
LIMIT, MANUAL_COMMIT_SYNC, MAX_NUM_OF_MOVES_IN_COMMIT, MERGE, MOVE, ORIGIN_REPO, PARENTS,
PRE_DELETION_COMMIT, PRE_MERGE_DELETE, SECOND_PARENT, SYNC_DIAMOND_MERGE,
cs_args_from_matches, get_catchup_head_delete_commits_cs_args_factory,
get_delete_commits_cs_args_factory, get_gradual_merge_commits_cs_args_factory, setup_app,
BONSAI_MERGE, BONSAI_MERGE_P1, BONSAI_MERGE_P2, CATCHUP_DELETE_HEAD, CHANGESET,
CHUNKING_HINT_FILE, COMMIT_BOOKMARK, COMMIT_HASH, DELETION_CHUNK_SIZE, DRY_RUN,
EVEN_CHUNK_SIZE, FIRST_PARENT, GRADUAL_MERGE, GRADUAL_MERGE_PROGRESS, HEAD_BOOKMARK,
LAST_DELETION_COMMIT, LIMIT, MANUAL_COMMIT_SYNC, MAX_NUM_OF_MOVES_IN_COMMIT, MERGE, MOVE,
ORIGIN_REPO, PARENTS, PATH_REGEX, PRE_DELETION_COMMIT, PRE_MERGE_DELETE, SECOND_PARENT,
SYNC_DIAMOND_MERGE, TO_MERGE_CS_ID,
};
use crate::merging::perform_merge;
use megarepolib::chunking::{
@ -422,6 +426,52 @@ async fn run_manual_commit_sync<'a>(
Ok(())
}
async fn run_catchup_delete_head<'a>(
ctx: CoreContext,
matches: &ArgMatches<'a>,
sub_m: &ArgMatches<'a>,
) -> Result<(), Error> {
let repo = args::open_repo(ctx.fb, &ctx.logger().clone(), &matches)
.compat()
.await?;
let head_bookmark = sub_m
.value_of(HEAD_BOOKMARK)
.ok_or_else(|| format_err!("{} not set", HEAD_BOOKMARK))?;
let head_bookmark = BookmarkName::new(head_bookmark)?;
let to_merge_cs_id = sub_m
.value_of(TO_MERGE_CS_ID)
.ok_or_else(|| format_err!("{} not set", TO_MERGE_CS_ID))?;
let to_merge_cs_id = helpers::csid_resolve(ctx.clone(), repo.clone(), to_merge_cs_id)
.compat()
.await?;
let path_regex = sub_m
.value_of(PATH_REGEX)
.ok_or_else(|| format_err!("{} not set", PATH_REGEX))?;
let path_regex = Regex::new(path_regex)?;
let deletion_chunk_size = args::get_usize(&sub_m, DELETION_CHUNK_SIZE, 10000);
let cs_args_factory = get_catchup_head_delete_commits_cs_args_factory(&sub_m)?;
let (_, repo_config) = args::get_config(ctx.fb, &matches)?;
catchup::create_deletion_head_commits(
&ctx,
&repo,
head_bookmark,
to_merge_cs_id,
path_regex,
deletion_chunk_size,
cs_args_factory,
&repo_config.pushrebase.flags,
)
.await?;
Ok(())
}
fn get_and_verify_repo_config<'a>(
fb: FacebookInit,
matches: &ArgMatches<'a>,
@ -468,6 +518,9 @@ fn main(fb: FacebookInit) -> Result<()> {
run_gradual_merge_progress(ctx, &matches, sub_m).await
}
(MANUAL_COMMIT_SYNC, Some(sub_m)) => run_manual_commit_sync(ctx, &matches, sub_m).await,
(CATCHUP_DELETE_HEAD, Some(sub_m)) => {
run_catchup_delete_head(ctx, &matches, sub_m).await
}
_ => bail!("oh no, wrong arguments provided!"),
}
};