megarepotool: extract chunked deletion from pre-merge-delete

Summary: We will use it in a separate subcmd later.

Reviewed By: StanislavGlebik

Differential Revision: D25944075

fbshipit-source-id: 211063ef74b192c18b5d006e9e0592085fdfc811
This commit is contained in:
Kostia Balytskyi 2021-01-18 04:39:28 -08:00 committed by Facebook GitHub Bot
parent 395ec9dbeb
commit e74174283d
5 changed files with 139 additions and 105 deletions

View File

@ -78,8 +78,8 @@ pub fn path_chunker_from_hint(prefix_lists: Vec<Vec<MPath>>) -> Result<Chunker<M
}
pub fn even_chunker_with_max_size<T: Clone>(max_chunk_size: usize) -> Result<Chunker<T>, Error> {
Ok(Box::new(move |mpaths| {
let res: Vec<Vec<T>> = mpaths
Ok(Box::new(move |items| {
let res: Vec<Vec<T>> = items
.chunks(max_chunk_size)
.map(|chunk| chunk.to_vec())
.collect();

View File

@ -15,6 +15,8 @@ use mononoke_types::{BonsaiChangeset, BonsaiChangesetMut, ChangesetId, DateTime,
use slog::info;
use std::collections::BTreeMap;
use crate::chunking::Chunker;
#[derive(Clone, Debug)]
pub struct ChangesetArgs {
pub author: String,
@ -142,3 +144,46 @@ fn create_bonsai_changeset_only(
}
.freeze()
}
pub async fn delete_files_in_chunks<'a>(
ctx: &'a CoreContext,
repo: &'a BlobRepo,
parent_bcs_id: ChangesetId,
mpaths: Vec<MPath>,
chunker: Chunker<MPath>,
delete_commits_changeset_args_factory: impl ChangesetArgsFactory,
skip_last_chunk: bool,
) -> Result<Vec<ChangesetId>, Error> {
info!(ctx.logger(), "Chunking mpaths");
let mpath_chunks: Vec<Vec<MPath>> = chunker(mpaths);
info!(ctx.logger(), "Done chunking working copy contents");
let mut delete_commits: Vec<ChangesetId> = Vec::new();
let mut parent = parent_bcs_id;
let chunk_num = mpath_chunks.len();
for (i, mpath_chunk) in mpath_chunks.into_iter().enumerate() {
if i == chunk_num - 1 && skip_last_chunk {
break;
}
let changeset_args = delete_commits_changeset_args_factory(StackPosition(i));
let file_changes: BTreeMap<MPath, _> =
mpath_chunk.into_iter().map(|mp| (mp, None)).collect();
info!(
ctx.logger(),
"Creating delete commit #{} with {:?} (deleting {} files)",
i,
changeset_args,
file_changes.len()
);
let delete_cs_id =
create_and_save_bonsai(ctx, repo, vec![parent], file_changes, changeset_args).await?;
info!(ctx.logger(), "Done creating delete commit #{}", i);
delete_commits.push(delete_cs_id);
// move one step forward
parent = delete_cs_id;
}
Ok(delete_commits)
}

View File

@ -29,6 +29,7 @@ use std::num::NonZeroU64;
pub mod chunking;
pub mod common;
pub mod pre_merge_delete;
pub mod working_copy;
use crate::common::{
create_save_and_generate_hg_changeset, ChangesetArgs, ChangesetArgsFactory, StackPosition,

View File

@ -7,20 +7,13 @@
use anyhow::Error;
use blobrepo::BlobRepo;
use blobrepo_hg::BlobRepoHg;
use blobstore::Loadable;
use context::CoreContext;
use derived_data::BonsaiDerived;
use futures::{future::try_join, TryStreamExt};
use manifest::{Diff, ManifestOps};
use mercurial_types::MPath;
use mononoke_types::ChangesetId;
use slog::info;
use std::collections::BTreeMap;
use unodes::RootUnodeManifestId;
use crate::chunking::Chunker;
use crate::common::{create_and_save_bonsai, ChangesetArgsFactory, StackPosition};
use crate::common::{delete_files_in_chunks, ChangesetArgsFactory};
use crate::working_copy::{get_changed_working_copy_paths, get_working_copy_paths};
/// A struct containing pre-merge delete information
/// Pre-merge delete commits look like this:
@ -44,65 +37,6 @@ pub struct PreMergeDelete {
pub delete_commits: Vec<ChangesetId>,
}
async fn get_working_copy_paths(
ctx: &CoreContext,
repo: &BlobRepo,
bcs_id: ChangesetId,
) -> Result<Vec<MPath>, Error> {
let hg_cs_id = repo
.get_hg_from_bonsai_changeset(ctx.clone(), bcs_id)
.await?;
let hg_cs = hg_cs_id.load(ctx, repo.blobstore()).await?;
info!(ctx.logger(), "Getting working copy contents");
let mut paths: Vec<_> = hg_cs
.manifestid()
.list_leaf_entries(ctx.clone(), repo.get_blobstore())
.map_ok(|(path, (_file_type, _filenode_id))| path)
.try_collect()
.await?;
paths.sort();
info!(ctx.logger(), "Done getting working copy contents");
Ok(paths)
}
async fn get_changed_working_copy_paths(
ctx: &CoreContext,
repo: &BlobRepo,
bcs_id: ChangesetId,
base_cs_id: ChangesetId,
) -> Result<Vec<MPath>, Error> {
let unode_id = RootUnodeManifestId::derive(ctx, repo, bcs_id);
let base_unode_id = RootUnodeManifestId::derive(ctx, repo, base_cs_id);
let (unode_id, base_unode_id) = try_join(unode_id, base_unode_id).await?;
let mut paths = base_unode_id
.manifest_unode_id()
.diff(
ctx.clone(),
repo.get_blobstore(),
*unode_id.manifest_unode_id(),
)
.try_filter_map(|diff| async move {
use Diff::*;
let maybe_path = match diff {
Added(maybe_path, entry) => entry.into_leaf().and_then(|_| maybe_path),
Removed(_maybe_path, _entry) => None,
Changed(maybe_path, _old_entry, new_entry) => {
new_entry.into_leaf().and_then(|_| maybe_path)
}
};
Ok(maybe_path)
})
.try_collect::<Vec<_>>()
.await?;
paths.sort();
Ok(paths)
}
/// Create `PreMergeDelete` struct, implementing gradual delete strategy
/// See the struct's docstring for more details about the end state
/// See also https://fb.quip.com/jPbqA3kK3qCi for strategy and discussion
@ -120,40 +54,16 @@ pub async fn create_pre_merge_delete<'a>(
}
None => get_working_copy_paths(ctx, repo, parent_bcs_id).await?,
};
info!(ctx.logger(), "Chunking working copy contents");
let mpath_chunks: Vec<Vec<MPath>> = chunker(mpaths);
info!(ctx.logger(), "Done chunking working copy contents");
let mut delete_commits: Vec<ChangesetId> = Vec::new();
let mut parent = parent_bcs_id;
let chunk_num = mpath_chunks.len();
for (i, mpath_chunk) in mpath_chunks.into_iter().enumerate() {
if i == chunk_num - 1 {
// This is last chunk
// we do not actually have to delete these files, as
// our very first merge should not be with an empty
// working copy
break;
}
let changeset_args = delete_commits_changeset_args_factory(StackPosition(i));
let file_changes: BTreeMap<MPath, _> =
mpath_chunk.into_iter().map(|mp| (mp, None)).collect();
info!(
ctx.logger(),
"Creating delete commit #{} with {:?} (deleting {} files)",
i,
changeset_args,
file_changes.len()
);
let delete_cs_id =
create_and_save_bonsai(ctx, repo, vec![parent], file_changes, changeset_args).await?;
info!(ctx.logger(), "Done creating delete commit #{}", i);
delete_commits.push(delete_cs_id);
// move one step forward
parent = delete_cs_id;
}
let delete_commits = delete_files_in_chunks(
ctx,
repo,
parent_bcs_id,
mpaths,
chunker,
delete_commits_changeset_args_factory,
true, /* skip_last_chunk */
)
.await?;
Ok(PreMergeDelete { delete_commits })
}
@ -161,7 +71,7 @@ pub async fn create_pre_merge_delete<'a>(
#[cfg(test)]
mod test {
use super::*;
use crate::common::ChangesetArgs;
use crate::common::{ChangesetArgs, StackPosition};
use cloned::cloned;
use fbinit::FacebookInit;
use fixtures::linear;

View File

@ -0,0 +1,78 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use anyhow::Error;
use blobrepo::BlobRepo;
use blobrepo_hg::BlobRepoHg;
use blobstore::Loadable;
use context::CoreContext;
use derived_data::BonsaiDerived;
use futures::{future::try_join, TryStreamExt};
use manifest::{Diff, ManifestOps};
use mercurial_types::MPath;
use mononoke_types::ChangesetId;
use slog::info;
use unodes::RootUnodeManifestId;
pub async fn get_working_copy_paths(
ctx: &CoreContext,
repo: &BlobRepo,
bcs_id: ChangesetId,
) -> Result<Vec<MPath>, Error> {
let hg_cs_id = repo
.get_hg_from_bonsai_changeset(ctx.clone(), bcs_id)
.await?;
let hg_cs = hg_cs_id.load(ctx, repo.blobstore()).await?;
info!(ctx.logger(), "Getting working copy contents");
let mut paths: Vec<_> = hg_cs
.manifestid()
.list_leaf_entries(ctx.clone(), repo.get_blobstore())
.map_ok(|(path, (_file_type, _filenode_id))| path)
.try_collect()
.await?;
paths.sort();
info!(ctx.logger(), "Done getting working copy contents");
Ok(paths)
}
pub async fn get_changed_working_copy_paths(
ctx: &CoreContext,
repo: &BlobRepo,
bcs_id: ChangesetId,
base_cs_id: ChangesetId,
) -> Result<Vec<MPath>, Error> {
let unode_id = RootUnodeManifestId::derive(ctx, repo, bcs_id);
let base_unode_id = RootUnodeManifestId::derive(ctx, repo, base_cs_id);
let (unode_id, base_unode_id) = try_join(unode_id, base_unode_id).await?;
let mut paths = base_unode_id
.manifest_unode_id()
.diff(
ctx.clone(),
repo.get_blobstore(),
*unode_id.manifest_unode_id(),
)
.try_filter_map(|diff| async move {
use Diff::*;
let maybe_path = match diff {
Added(maybe_path, entry) => entry.into_leaf().and_then(|_| maybe_path),
Removed(_maybe_path, _entry) => None,
Changed(maybe_path, _old_entry, new_entry) => {
new_entry.into_leaf().and_then(|_| maybe_path)
}
};
Ok(maybe_path)
})
.try_collect::<Vec<_>>()
.await?;
paths.sort();
Ok(paths)
}