admin: make rsync capable of working across repos

Summary:
This adds support for replacing things in the small repo with the things from
the large repo. This is useful when changing the bind.

This diff slightly changes how `rsync` is called: now `--source-csid`,
`--target-csid`, `--from-dir` and `--to-dir` are all specified before the
`copy`/`remove-excessive-files` subcommands.

There's still an ability to use this within a repo, as you can pass identical
source and target repos.

Reviewed By: StanislavGlebik

Differential Revision: D25087893

fbshipit-source-id: 6e5881f80d91ef4b794a967cf9f26dd3af7f56c9
This commit is contained in:
Kostia Balytskyi 2020-12-10 11:43:52 -08:00 committed by Facebook GitHub Bot
parent 52e7d42458
commit a76550859d
4 changed files with 369 additions and 136 deletions

View File

@ -10,8 +10,10 @@ use blobrepo::BlobRepo;
use blobstore::Loadable;
use bookmarks::{BookmarkName, BookmarkUpdateReason};
use cloned::cloned;
use cmdlib::helpers;
use cmdlib::{args, helpers};
use context::CoreContext;
use fbinit::FacebookInit;
use futures::try_join;
use futures::{FutureExt, TryFutureExt, TryStreamExt};
use futures_ext::{FutureExt as _, StreamExt};
use futures_old::{
@ -24,6 +26,7 @@ use mononoke_types::{BonsaiChangeset, DateTime, Timestamp};
use serde_json::{json, to_string_pretty};
use slog::{debug, Logger};
use std::collections::HashMap;
use synced_commit_mapping::SqlSyncedCommitMapping;
pub const LATEST_REPLAYED_REQUEST_KEY: &str = "latest-replayed-request";
@ -150,3 +153,22 @@ pub fn get_file_nodes(
}
})
}
pub async fn get_source_target_repos_and_mapping<'a>(
fb: FacebookInit,
logger: Logger,
matches: &'a args::MononokeMatches<'_>,
) -> Result<(BlobRepo, BlobRepo, SqlSyncedCommitMapping), Error> {
let config_store = args::init_config_store(fb, &logger, matches)?;
let source_repo_id = args::get_source_repo_id(config_store, matches)?;
let target_repo_id = args::get_target_repo_id(config_store, matches)?;
let source_repo = args::open_repo_with_repo_id(fb, &logger, source_repo_id, matches);
let target_repo = args::open_repo_with_repo_id(fb, &logger, target_repo_id, matches);
// TODO(stash): in reality both source and target should point to the same mapping
// It'll be nice to verify it
let mapping = args::open_source_sql::<SqlSyncedCommitMapping>(fb, config_store, &matches);
try_join!(source_repo, target_repo, mapping)
}

View File

@ -38,6 +38,7 @@ use std::convert::TryInto;
use std::sync::Arc;
use synced_commit_mapping::{SqlSyncedCommitMapping, SyncedCommitMapping};
use crate::common::get_source_target_repos_and_mapping;
use crate::error::SubcommandError;
pub const CROSSREPO: &str = "crossrepo";
@ -430,25 +431,6 @@ async fn move_bookmark(
}
}
async fn get_source_target_repos_and_mapping<'a>(
fb: FacebookInit,
logger: Logger,
matches: &'a MononokeMatches<'_>,
) -> Result<(BlobRepo, BlobRepo, SqlSyncedCommitMapping), Error> {
let config_store = args::init_config_store(fb, &logger, matches)?;
let source_repo_id = args::get_source_repo_id(config_store, matches)?;
let target_repo_id = args::get_target_repo_id(config_store, matches)?;
let source_repo = args::open_repo_with_repo_id(fb, &logger, source_repo_id, matches);
let target_repo = args::open_repo_with_repo_id(fb, &logger, target_repo_id, matches);
// TODO(stash): in reality both source and target should point to the same mapping
// It'll be nice to verify it
let mapping = args::open_source_sql::<SqlSyncedCommitMapping>(fb, config_store, &matches);
try_join!(source_repo, target_repo, mapping)
}
fn print_commit_sync_config(csc: CommitSyncConfig, line_prefix: &str) {
println!("{}large repo: {}", line_prefix, csc.large_repo_id);
println!(

View File

@ -5,7 +5,7 @@
* GNU General Public License version 2.
*/
use anyhow::{anyhow, Error};
use anyhow::{anyhow, Context, Error};
use blobrepo::save_bonsai_changesets;
use clap::{App, Arg, ArgMatches, SubCommand};
use derived_data::BonsaiDerived;
@ -19,6 +19,7 @@ use cmdlib::{
helpers,
};
use context::CoreContext;
use cross_repo_sync::copy_file_contents;
use manifest::{Entry, ManifestOps};
use mononoke_types::{
fsnode::FsnodeFile, BonsaiChangeset, BonsaiChangesetMut, ChangesetId, DateTime, FileChange,
@ -28,11 +29,13 @@ use regex::Regex;
use slog::{debug, info, Logger};
use std::{collections::BTreeMap, num::NonZeroU64};
use crate::common::get_source_target_repos_and_mapping;
use crate::error::SubcommandError;
pub const ARG_COMMIT_AUTHOR: &str = "commit-author";
pub const ARG_COMMIT_MESSAGE: &str = "commit-message";
pub const ARG_CSID: &str = "csid";
pub const ARG_SOURCE_CSID: &str = "source-csid";
pub const ARG_TARGET_CSID: &str = "target-csid";
pub const ARG_EXCLUDE_FILE_REGEX: &str = "exclude-file-regex";
pub const ARG_FILE_NUM_LIMIT: &str = "file-num-limit";
pub const ARG_TOTAL_SIZE_LIMIT: &str = "total-size-limit";
@ -47,10 +50,8 @@ pub const SUBCOMMAND_REMOVE_EXCESSIVE_FILES: &str = "remove-excessive-files";
pub fn build_subcommand<'a, 'b>() -> App<'a, 'b> {
SubCommand::with_name(RSYNC)
.subcommand(
add_common_args(
SubCommand::with_name(SUBCOMMAND_COPY)
.about("creates commits that copy content of one directory into another")
)
add_common_args(SubCommand::with_name(SUBCOMMAND_COPY)
.about("creates commits that copy content of one directory into another")
.arg(
Arg::with_name(ARG_EXCLUDE_FILE_REGEX)
.long(ARG_EXCLUDE_FILE_REGEX)
@ -81,24 +82,15 @@ pub fn build_subcommand<'a, 'b>() -> App<'a, 'b> {
.takes_value(false)
.required(false),
)
)
))
.subcommand(
add_common_args(
SubCommand::with_name(SUBCOMMAND_REMOVE_EXCESSIVE_FILES)
.about("remove files from --to directory that are not present in --from directory")
)
add_common_args(SubCommand::with_name(SUBCOMMAND_REMOVE_EXCESSIVE_FILES)
.about("remove files from --to directory that are not present in --from directory"))
)
}
pub fn add_common_args<'a, 'b>(sub_m: App<'a, 'b>) -> App<'a, 'b> {
sub_m
.arg(
Arg::with_name(ARG_CSID)
.long(ARG_CSID)
.takes_value(true)
.required(true)
.help("{hg|bonsai} changeset id or bookmark name"),
)
.arg(
Arg::with_name(ARG_FROM_DIR)
.long(ARG_FROM_DIR)
@ -140,6 +132,80 @@ pub fn add_common_args<'a, 'b>(sub_m: App<'a, 'b>) -> App<'a, 'b> {
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name(ARG_SOURCE_CSID)
.long(ARG_SOURCE_CSID)
.takes_value(true)
.required(true)
.help("source {hg|bonsai} changeset id or bookmark name"),
)
.arg(
Arg::with_name(ARG_TARGET_CSID)
.long(ARG_TARGET_CSID)
.takes_value(true)
.required(true)
.help("target {hg|bonsai} changeset id or bookmark name"),
)
}
/// Get source_cs_id, target_cs_id, from_dir, to_dir, author and commit_msg
/// from subcommand matches
async fn parse_common_args<'a>(
ctx: &'a CoreContext,
matches: &'a ArgMatches<'_>,
source_repo: &'a BlobRepo,
target_repo: &'a BlobRepo,
) -> Result<(ChangesetId, ChangesetId, MPath, MPath, String, String), Error> {
let source_cs_id = matches
.value_of(ARG_SOURCE_CSID)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_SOURCE_CSID))?;
let target_cs_id = matches
.value_of(ARG_TARGET_CSID)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_TARGET_CSID))?;
let (source_cs_id, target_cs_id) = try_join(
async {
helpers::csid_resolve(ctx.clone(), source_repo.clone(), source_cs_id)
.compat()
.await
.context("failed resolving source_cs_id")
},
async {
helpers::csid_resolve(ctx.clone(), target_repo.clone(), target_cs_id)
.compat()
.await
.context("failed resolving target_cs_id")
},
)
.await?;
let from_dir = matches
.value_of(ARG_FROM_DIR)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_FROM_DIR))?;
let from_dir = MPath::new(from_dir)?;
let to_dir = matches
.value_of(ARG_TO_DIR)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_TO_DIR))?;
let to_dir = MPath::new(to_dir)?;
let author = matches
.value_of(ARG_COMMIT_AUTHOR)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_COMMIT_AUTHOR))?;
let msg = matches
.value_of(ARG_COMMIT_MESSAGE)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_COMMIT_MESSAGE))?;
Ok((
source_cs_id,
target_cs_id,
from_dir,
to_dir,
author.to_string(),
msg.to_string(),
))
}
pub async fn subcommand_rsync<'a>(
@ -150,44 +216,23 @@ pub async fn subcommand_rsync<'a>(
) -> Result<(), SubcommandError> {
args::init_cachelib(fb, &matches);
let ctx = CoreContext::new_with_logger(fb, logger.clone());
let repo = args::open_repo(fb, &logger, &matches).await?;
let (source_repo, target_repo, _) =
get_source_target_repos_and_mapping(fb, logger, matches).await?;
match sub_matches.subcommand() {
(SUBCOMMAND_COPY, Some(sub_matches)) => {
let cs_id = sub_matches
.value_of(ARG_CSID)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_CSID))?;
let cs_id = helpers::csid_resolve(ctx.clone(), repo.clone(), cs_id)
.compat()
.await?;
let from_dir = sub_matches
.value_of(ARG_FROM_DIR)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_FROM_DIR))?;
let from_dir = MPath::new(from_dir)?;
let to_dir = sub_matches
.value_of(ARG_TO_DIR)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_TO_DIR))?;
let to_dir = MPath::new(to_dir)?;
let author = sub_matches
.value_of(ARG_COMMIT_AUTHOR)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_COMMIT_AUTHOR))?;
let msg = sub_matches
.value_of(ARG_COMMIT_MESSAGE)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_COMMIT_MESSAGE))?;
let (source_cs_id, target_cs_id, from_dir, to_dir, author, msg) =
parse_common_args(&ctx, sub_matches, &source_repo, &target_repo).await?;
let cs_ids = copy(
&ctx,
&repo,
cs_id,
&source_repo,
&target_repo,
source_cs_id,
target_cs_id,
from_dir,
to_dir,
author.to_string(),
msg.to_string(),
author,
msg,
Limits::new(sub_matches),
Options::new(sub_matches)?,
)
@ -201,43 +246,22 @@ pub async fn subcommand_rsync<'a>(
println!("{}", result_cs_id);
}
(SUBCOMMAND_REMOVE_EXCESSIVE_FILES, Some(sub_matches)) => {
let cs_id = sub_matches
.value_of(ARG_CSID)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_CSID))?;
let cs_id = helpers::csid_resolve(ctx.clone(), repo.clone(), cs_id)
.compat()
.await?;
let from_dir = sub_matches
.value_of(ARG_FROM_DIR)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_FROM_DIR))?;
let from_dir = MPath::new(from_dir)?;
let to_dir = sub_matches
.value_of(ARG_TO_DIR)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_TO_DIR))?;
let to_dir = MPath::new(to_dir)?;
let author = sub_matches
.value_of(ARG_COMMIT_AUTHOR)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_COMMIT_AUTHOR))?;
let msg = sub_matches
.value_of(ARG_COMMIT_MESSAGE)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_COMMIT_MESSAGE))?;
let (source_cs_id, target_cs_id, from_dir, to_dir, author, msg) =
parse_common_args(&ctx, sub_matches, &source_repo, &target_repo).await?;
let maybe_file_num_limit: Option<NonZeroU64> =
args::get_and_parse_opt(sub_matches, ARG_FILE_NUM_LIMIT);
let result_cs_id = remove_excessive_files(
&ctx,
&repo,
cs_id,
&source_repo,
&target_repo,
source_cs_id,
target_cs_id,
from_dir,
to_dir,
author.to_string(),
msg.to_string(),
author,
msg,
maybe_file_num_limit,
)
.await?;
@ -299,8 +323,10 @@ impl Options {
async fn copy(
ctx: &CoreContext,
repo: &BlobRepo,
cs_id: ChangesetId,
source_repo: &BlobRepo,
target_repo: &BlobRepo,
source_cs_id: ChangesetId,
target_cs_id: ChangesetId,
from_dir: MPath,
to_dir: MPath,
author: String,
@ -309,8 +335,8 @@ async fn copy(
options: Options,
) -> Result<Vec<ChangesetId>, Error> {
let (from_entries, to_entries) = try_join(
list_directory(&ctx, &repo, cs_id, &from_dir),
list_directory(&ctx, &repo, cs_id, &to_dir),
list_directory(&ctx, &source_repo, source_cs_id, &from_dir),
list_directory(&ctx, &target_repo, target_cs_id, &to_dir),
)
.await?;
let from_entries = from_entries.ok_or_else(|| Error::msg("from directory does not exist!"))?;
@ -321,6 +347,9 @@ async fn copy(
// These are the file changes that have to be copied
let mut file_changes = BTreeMap::new();
let mut total_file_size = 0;
let mut contents_to_upload = vec![];
let same_repo = source_repo.get_repoid() == target_repo.get_repoid();
for (from_suffix, fsnode_file) in from_entries {
if let Some(ref regex) = options.maybe_exclude_file_regex {
if from_suffix.matches_regex(&regex) {
@ -351,6 +380,11 @@ async fn copy(
fsnode_file.size()
);
file_changes.insert(to_path, Some((from_path, fsnode_file)));
if !same_repo {
contents_to_upload.push(fsnode_file.content_id().clone());
}
if let Some(lfs_threshold) = limits.lfs_threshold {
if fsnode_file.size() < lfs_threshold.get() {
total_file_size += fsnode_file.size();
@ -376,13 +410,25 @@ async fn copy(
}
}
if !same_repo {
debug!(
ctx.logger(),
"Copying {} files contents from {} to {}",
contents_to_upload.len(),
source_repo.name(),
target_repo.name()
);
copy_file_contents(ctx, source_repo, target_repo, contents_to_upload).await?;
}
create_changesets(
ctx,
repo,
target_repo,
vec![remove_file_changes, file_changes],
cs_id,
target_cs_id,
author,
msg,
same_repo, /* record_copy_from */
)
.await
}
@ -394,6 +440,7 @@ async fn create_changesets(
mut parent: ChangesetId,
author: String,
msg: String,
record_copy_from: bool,
) -> Result<Vec<ChangesetId>, Error> {
let mut changesets = vec![];
let mut cs_ids = vec![];
@ -405,12 +452,19 @@ async fn create_changesets(
let mut fc = BTreeMap::new();
for (to_path, maybe_fsnode) in path_to_maybe_fsnodes {
let maybe_file_change = match maybe_fsnode {
Some((from_path, fsnode_file)) => Some(FileChange::new(
*fsnode_file.content_id(),
*fsnode_file.file_type(),
fsnode_file.size(),
Some((from_path, parent)),
)),
Some((from_path, fsnode_file)) => {
let copy_from = if record_copy_from {
Some((from_path, parent))
} else {
None
};
Some(FileChange::new(
*fsnode_file.content_id(),
*fsnode_file.file_type(),
fsnode_file.size(),
copy_from,
))
}
None => None,
};
@ -433,8 +487,10 @@ async fn create_changesets(
async fn remove_excessive_files(
ctx: &CoreContext,
repo: &BlobRepo,
cs_id: ChangesetId,
source_repo: &BlobRepo,
target_repo: &BlobRepo,
source_cs_id: ChangesetId,
target_cs_id: ChangesetId,
from_dir: MPath,
to_dir: MPath,
author: String,
@ -442,8 +498,8 @@ async fn remove_excessive_files(
maybe_file_num_limit: Option<NonZeroU64>,
) -> Result<ChangesetId, Error> {
let (from_entries, to_entries) = try_join(
list_directory(&ctx, &repo, cs_id, &from_dir),
list_directory(&ctx, &repo, cs_id, &to_dir),
list_directory(&ctx, &source_repo, source_cs_id, &from_dir),
list_directory(&ctx, &target_repo, target_cs_id, &to_dir),
)
.await?;
let from_entries = from_entries.ok_or_else(|| Error::msg("from directory does not exist!"))?;
@ -462,7 +518,16 @@ async fn remove_excessive_files(
}
}
let cs_ids = create_changesets(ctx, repo, vec![to_delete], cs_id, author, msg).await?;
let cs_ids = create_changesets(
ctx,
target_repo,
vec![to_delete],
target_cs_id,
author,
msg,
false, /* record_copy_from */
)
.await?;
cs_ids
.last()
@ -532,8 +597,10 @@ fn create_bonsai_changeset(
#[cfg(test)]
mod test {
use super::*;
use blobrepo_factory::new_memblob_empty;
use blobrepo_factory::{new_memblob_empty, new_memblob_empty_with_id};
use blobstore::StoreLoadable;
use maplit::hashmap;
use mononoke_types::RepositoryId;
use tests_utils::{list_working_copy_utf8, CreateCommitContext};
#[fbinit::compat_test]
@ -572,6 +639,8 @@ mod test {
let new_cs_id = copy(
&ctx,
&repo,
&repo,
cs_id,
cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
@ -619,6 +688,8 @@ mod test {
let first_cs_id = copy(
&ctx,
&repo,
&repo,
cs_id,
cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
@ -646,6 +717,8 @@ mod test {
let second_cs_id = copy(
&ctx,
&repo,
&repo,
first_cs_id,
first_cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
@ -690,6 +763,8 @@ mod test {
let cs_id = copy(
&ctx,
&repo,
&repo,
cs_id,
cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
@ -736,6 +811,8 @@ mod test {
let first_cs_id = copy(
&ctx,
&repo,
&repo,
cs_id,
cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
@ -766,6 +843,8 @@ mod test {
let second_cs_id = copy(
&ctx,
&repo,
&repo,
first_cs_id,
first_cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
@ -814,6 +893,8 @@ mod test {
let cs_ids = copy(
&ctx,
&repo,
&repo,
cs_id,
cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
@ -861,6 +942,8 @@ mod test {
let cs_ids = copy(
&ctx,
&repo,
&repo,
cs_id,
cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
@ -880,6 +963,8 @@ mod test {
let cs_ids = copy(
&ctx,
&repo,
&repo,
cs_id,
cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
@ -912,6 +997,20 @@ mod test {
}
);
let copy_bcs = cs_ids
.last()
.expect("changeset is expected to exist")
.load(&ctx, &repo.get_blobstore())
.await?;
let file_changes = copy_bcs.file_changes_map();
let a_change = file_changes
.get(&MPath::new("dir_to/a")?)
.expect("change to dir_to/a expected to be present in the map")
.clone()
.expect("change to dir_to/a expected to not be None");
// Ensure that there is copy-from inserted when copying within the repo
assert!(a_change.copy_from().is_some());
Ok(())
}
@ -930,6 +1029,8 @@ mod test {
let cs_id = remove_excessive_files(
&ctx,
&repo,
&repo,
cs_id,
cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
@ -949,4 +1050,124 @@ mod test {
Ok(())
}
#[fbinit::compat_test]
async fn test_delete_excessive_files_xrepo(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let source_repo = new_memblob_empty_with_id(None, RepositoryId::new(0))?;
let target_repo = new_memblob_empty_with_id(None, RepositoryId::new(1))?;
let source_cs_id = CreateCommitContext::new_root(&ctx, &source_repo)
.add_file("dir_from/a", "a")
.commit()
.await?;
let target_cs_id = CreateCommitContext::new_root(&ctx, &target_repo)
.add_file("dir_to/a", "a")
.add_file("dir_to/b", "b")
.add_file("dir_to/c/d", "c/d")
.commit()
.await?;
let cs_id = remove_excessive_files(
&ctx,
&source_repo,
&target_repo,
source_cs_id,
target_cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
"author".to_string(),
"msg".to_string(),
None,
)
.await?;
assert_eq!(
list_working_copy_utf8(&ctx, &target_repo, cs_id).await?,
hashmap! {
MPath::new("dir_to/a")? => "a".to_string(),
}
);
Ok(())
}
#[fbinit::compat_test]
async fn test_xrepo_rsync_with_overwrite(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let source_repo = new_memblob_empty_with_id(None, RepositoryId::new(0))?;
let target_repo = new_memblob_empty_with_id(None, RepositoryId::new(1))?;
let source_cs_id = CreateCommitContext::new_root(&ctx, &source_repo)
.add_file("dir_from/a", "aa")
.add_file("dir_from/b", "b")
.add_file("source_random_file", "sr")
.commit()
.await?;
let target_cs_id = CreateCommitContext::new_root(&ctx, &target_repo)
.add_file("dir_to/a", "different")
.add_file("target_random_file", "tr")
.commit()
.await?;
let cs_ids = copy(
&ctx,
&source_repo,
&target_repo,
source_cs_id,
target_cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
"author".to_string(),
"msg".to_string(),
Limits::default(),
Options {
overwrite: true,
..Default::default()
},
)
.await?;
assert_eq!(
list_working_copy_utf8(&ctx, &target_repo, *cs_ids.get(0).unwrap()).await?,
hashmap! {
MPath::new("target_random_file")? => "tr".to_string(),
}
);
assert_eq!(
list_working_copy_utf8(&ctx, &target_repo, *cs_ids.get(1).unwrap()).await?,
hashmap! {
MPath::new("dir_to/a")? => "aa".to_string(),
MPath::new("dir_to/b")? => "b".to_string(),
MPath::new("target_random_file")? => "tr".to_string(),
}
);
assert_eq!(
target_repo
.get_changeset_parents_by_bonsai(ctx.clone(), *cs_ids.get(0).unwrap())
.await?,
vec![target_cs_id],
);
let copy_bcs = cs_ids
.get(1)
.expect("changeset is expected to exist")
.load(&ctx, &target_repo.get_blobstore())
.await?;
let file_changes = copy_bcs.file_changes_map();
let a_change = file_changes
.get(&MPath::new("dir_to/a")?)
.expect("change to dir_to/a expected to be present in the map")
.clone()
.expect("change to dir_to/a expected to not be None");
// Ensure that there's no copy-from inserted when copying from another repo
assert!(a_change.copy_from().is_none());
Ok(())
}
}

View File

@ -30,7 +30,7 @@ use maplit::{hashmap, hashset};
use mercurial_types::HgManifestId;
use metaconfig_types::{CommitSyncConfig, CommitSyncConfigVersion, PushrebaseFlags};
use mononoke_types::{
BonsaiChangeset, BonsaiChangesetMut, ChangesetId, FileChange, MPath, RepositoryId,
BonsaiChangeset, BonsaiChangesetMut, ChangesetId, ContentId, FileChange, MPath, RepositoryId,
};
use movers::Mover;
use pushrebase::{do_pushrebase_bonsai, PushrebaseError};
@ -1645,6 +1645,32 @@ impl CommitSyncRepos {
}
}
pub async fn copy_file_contents<'a>(
ctx: &'a CoreContext,
source_repo: &'a BlobRepo,
target_repo: &'a BlobRepo,
content_ids: impl IntoIterator<Item = ContentId>,
) -> Result<(), Error> {
let source_blobstore = source_repo.get_blobstore();
let target_blobstore = target_repo.get_blobstore();
let target_filestore_config = target_repo.filestore_config();
let uploader: FuturesUnordered<_> = content_ids
.into_iter()
.map({
|content_id| {
copy_content(
ctx,
&source_blobstore,
&target_blobstore,
target_filestore_config.clone(),
content_id,
)
}
})
.collect();
uploader.try_for_each_concurrent(100, identity).await
}
pub async fn upload_commits<'a>(
ctx: &'a CoreContext,
rewritten_list: Vec<BonsaiChangeset>,
@ -1660,25 +1686,7 @@ pub async fn upload_commits<'a>(
.filter_map(|opt_change| opt_change.as_ref().map(|change| change.content_id()));
files_to_sync.extend(new_files_to_sync);
}
let source_blobstore = source_repo.get_blobstore();
let target_blobstore = target_repo.get_blobstore();
let target_filestore_config = target_repo.filestore_config();
let uploader: FuturesUnordered<_> = files_to_sync
.into_iter()
.map({
|content_id| {
copy_content(
ctx,
&source_blobstore,
&target_blobstore,
target_filestore_config.clone(),
content_id,
)
}
})
.collect();
uploader.try_for_each_concurrent(100, identity).await?;
copy_file_contents(ctx, source_repo, target_repo, files_to_sync).await?;
save_bonsai_changesets(rewritten_list.clone(), ctx.clone(), target_repo.clone()).await?;
Ok(())
}