From a76550859d675a84db6d9c7699fa28e611e8288b Mon Sep 17 00:00:00 2001 From: Kostia Balytskyi Date: Thu, 10 Dec 2020 11:43:52 -0800 Subject: [PATCH] admin: make rsync capable of working across repos Summary: This adds support for replacing things in the small repo with the things from the large repo. This is useful when changing the bind. This diff slightly changes how `rsync` is called: now `--source-csid`, `--target-csid`, `--from-dir` and `--to-dir` are all specified before the `copy`/`remove-excessive-files` subcommands. There's still an ability to use this within a repo, as you can pass identical source and target repos. Reviewed By: StanislavGlebik Differential Revision: D25087893 fbshipit-source-id: 6e5881f80d91ef4b794a967cf9f26dd3af7f56c9 --- eden/mononoke/cmds/admin/common.rs | 24 +- eden/mononoke/cmds/admin/crossrepo.rs | 20 +- eden/mononoke/cmds/admin/rsync.rs | 413 ++++++++++++++---- .../cross_repo_sync/src/lib.rs | 48 +- 4 files changed, 369 insertions(+), 136 deletions(-) diff --git a/eden/mononoke/cmds/admin/common.rs b/eden/mononoke/cmds/admin/common.rs index 810336e1f9..3576168a00 100644 --- a/eden/mononoke/cmds/admin/common.rs +++ b/eden/mononoke/cmds/admin/common.rs @@ -10,8 +10,10 @@ use blobrepo::BlobRepo; use blobstore::Loadable; use bookmarks::{BookmarkName, BookmarkUpdateReason}; use cloned::cloned; -use cmdlib::helpers; +use cmdlib::{args, helpers}; use context::CoreContext; +use fbinit::FacebookInit; +use futures::try_join; use futures::{FutureExt, TryFutureExt, TryStreamExt}; use futures_ext::{FutureExt as _, StreamExt}; use futures_old::{ @@ -24,6 +26,7 @@ use mononoke_types::{BonsaiChangeset, DateTime, Timestamp}; use serde_json::{json, to_string_pretty}; use slog::{debug, Logger}; use std::collections::HashMap; +use synced_commit_mapping::SqlSyncedCommitMapping; pub const LATEST_REPLAYED_REQUEST_KEY: &str = "latest-replayed-request"; @@ -150,3 +153,22 @@ pub fn get_file_nodes( } }) } + +pub async fn get_source_target_repos_and_mapping<'a>( + fb: FacebookInit, + logger: Logger, + matches: &'a args::MononokeMatches<'_>, +) -> Result<(BlobRepo, BlobRepo, SqlSyncedCommitMapping), Error> { + let config_store = args::init_config_store(fb, &logger, matches)?; + + let source_repo_id = args::get_source_repo_id(config_store, matches)?; + let target_repo_id = args::get_target_repo_id(config_store, matches)?; + + let source_repo = args::open_repo_with_repo_id(fb, &logger, source_repo_id, matches); + let target_repo = args::open_repo_with_repo_id(fb, &logger, target_repo_id, matches); + // TODO(stash): in reality both source and target should point to the same mapping + // It'll be nice to verify it + let mapping = args::open_source_sql::(fb, config_store, &matches); + + try_join!(source_repo, target_repo, mapping) +} diff --git a/eden/mononoke/cmds/admin/crossrepo.rs b/eden/mononoke/cmds/admin/crossrepo.rs index 04fc095ea3..b2b7ddcaf7 100644 --- a/eden/mononoke/cmds/admin/crossrepo.rs +++ b/eden/mononoke/cmds/admin/crossrepo.rs @@ -38,6 +38,7 @@ use std::convert::TryInto; use std::sync::Arc; use synced_commit_mapping::{SqlSyncedCommitMapping, SyncedCommitMapping}; +use crate::common::get_source_target_repos_and_mapping; use crate::error::SubcommandError; pub const CROSSREPO: &str = "crossrepo"; @@ -430,25 +431,6 @@ async fn move_bookmark( } } -async fn get_source_target_repos_and_mapping<'a>( - fb: FacebookInit, - logger: Logger, - matches: &'a MononokeMatches<'_>, -) -> Result<(BlobRepo, BlobRepo, SqlSyncedCommitMapping), Error> { - let config_store = args::init_config_store(fb, &logger, matches)?; - - let source_repo_id = args::get_source_repo_id(config_store, matches)?; - let target_repo_id = args::get_target_repo_id(config_store, matches)?; - - let source_repo = args::open_repo_with_repo_id(fb, &logger, source_repo_id, matches); - let target_repo = args::open_repo_with_repo_id(fb, &logger, target_repo_id, matches); - // TODO(stash): in reality both source and target should point to the same mapping - // It'll be nice to verify it - let mapping = args::open_source_sql::(fb, config_store, &matches); - - try_join!(source_repo, target_repo, mapping) -} - fn print_commit_sync_config(csc: CommitSyncConfig, line_prefix: &str) { println!("{}large repo: {}", line_prefix, csc.large_repo_id); println!( diff --git a/eden/mononoke/cmds/admin/rsync.rs b/eden/mononoke/cmds/admin/rsync.rs index 34810508cc..055230e7e8 100644 --- a/eden/mononoke/cmds/admin/rsync.rs +++ b/eden/mononoke/cmds/admin/rsync.rs @@ -5,7 +5,7 @@ * GNU General Public License version 2. */ -use anyhow::{anyhow, Error}; +use anyhow::{anyhow, Context, Error}; use blobrepo::save_bonsai_changesets; use clap::{App, Arg, ArgMatches, SubCommand}; use derived_data::BonsaiDerived; @@ -19,6 +19,7 @@ use cmdlib::{ helpers, }; use context::CoreContext; +use cross_repo_sync::copy_file_contents; use manifest::{Entry, ManifestOps}; use mononoke_types::{ fsnode::FsnodeFile, BonsaiChangeset, BonsaiChangesetMut, ChangesetId, DateTime, FileChange, @@ -28,11 +29,13 @@ use regex::Regex; use slog::{debug, info, Logger}; use std::{collections::BTreeMap, num::NonZeroU64}; +use crate::common::get_source_target_repos_and_mapping; use crate::error::SubcommandError; pub const ARG_COMMIT_AUTHOR: &str = "commit-author"; pub const ARG_COMMIT_MESSAGE: &str = "commit-message"; -pub const ARG_CSID: &str = "csid"; +pub const ARG_SOURCE_CSID: &str = "source-csid"; +pub const ARG_TARGET_CSID: &str = "target-csid"; pub const ARG_EXCLUDE_FILE_REGEX: &str = "exclude-file-regex"; pub const ARG_FILE_NUM_LIMIT: &str = "file-num-limit"; pub const ARG_TOTAL_SIZE_LIMIT: &str = "total-size-limit"; @@ -47,10 +50,8 @@ pub const SUBCOMMAND_REMOVE_EXCESSIVE_FILES: &str = "remove-excessive-files"; pub fn build_subcommand<'a, 'b>() -> App<'a, 'b> { SubCommand::with_name(RSYNC) .subcommand( - add_common_args( - SubCommand::with_name(SUBCOMMAND_COPY) - .about("creates commits that copy content of one directory into another") - ) + add_common_args(SubCommand::with_name(SUBCOMMAND_COPY) + .about("creates commits that copy content of one directory into another") .arg( Arg::with_name(ARG_EXCLUDE_FILE_REGEX) .long(ARG_EXCLUDE_FILE_REGEX) @@ -81,24 +82,15 @@ pub fn build_subcommand<'a, 'b>() -> App<'a, 'b> { .takes_value(false) .required(false), ) - ) + )) .subcommand( - add_common_args( - SubCommand::with_name(SUBCOMMAND_REMOVE_EXCESSIVE_FILES) - .about("remove files from --to directory that are not present in --from directory") - ) + add_common_args(SubCommand::with_name(SUBCOMMAND_REMOVE_EXCESSIVE_FILES) + .about("remove files from --to directory that are not present in --from directory")) ) } pub fn add_common_args<'a, 'b>(sub_m: App<'a, 'b>) -> App<'a, 'b> { sub_m - .arg( - Arg::with_name(ARG_CSID) - .long(ARG_CSID) - .takes_value(true) - .required(true) - .help("{hg|bonsai} changeset id or bookmark name"), - ) .arg( Arg::with_name(ARG_FROM_DIR) .long(ARG_FROM_DIR) @@ -140,6 +132,80 @@ pub fn add_common_args<'a, 'b>(sub_m: App<'a, 'b>) -> App<'a, 'b> { .takes_value(true) .required(false), ) + .arg( + Arg::with_name(ARG_SOURCE_CSID) + .long(ARG_SOURCE_CSID) + .takes_value(true) + .required(true) + .help("source {hg|bonsai} changeset id or bookmark name"), + ) + .arg( + Arg::with_name(ARG_TARGET_CSID) + .long(ARG_TARGET_CSID) + .takes_value(true) + .required(true) + .help("target {hg|bonsai} changeset id or bookmark name"), + ) +} + +/// Get source_cs_id, target_cs_id, from_dir, to_dir, author and commit_msg +/// from subcommand matches +async fn parse_common_args<'a>( + ctx: &'a CoreContext, + matches: &'a ArgMatches<'_>, + source_repo: &'a BlobRepo, + target_repo: &'a BlobRepo, +) -> Result<(ChangesetId, ChangesetId, MPath, MPath, String, String), Error> { + let source_cs_id = matches + .value_of(ARG_SOURCE_CSID) + .ok_or_else(|| anyhow!("{} arg is not specified", ARG_SOURCE_CSID))?; + + let target_cs_id = matches + .value_of(ARG_TARGET_CSID) + .ok_or_else(|| anyhow!("{} arg is not specified", ARG_TARGET_CSID))?; + + let (source_cs_id, target_cs_id) = try_join( + async { + helpers::csid_resolve(ctx.clone(), source_repo.clone(), source_cs_id) + .compat() + .await + .context("failed resolving source_cs_id") + }, + async { + helpers::csid_resolve(ctx.clone(), target_repo.clone(), target_cs_id) + .compat() + .await + .context("failed resolving target_cs_id") + }, + ) + .await?; + + let from_dir = matches + .value_of(ARG_FROM_DIR) + .ok_or_else(|| anyhow!("{} arg is not specified", ARG_FROM_DIR))?; + let from_dir = MPath::new(from_dir)?; + + let to_dir = matches + .value_of(ARG_TO_DIR) + .ok_or_else(|| anyhow!("{} arg is not specified", ARG_TO_DIR))?; + let to_dir = MPath::new(to_dir)?; + + let author = matches + .value_of(ARG_COMMIT_AUTHOR) + .ok_or_else(|| anyhow!("{} arg is not specified", ARG_COMMIT_AUTHOR))?; + + let msg = matches + .value_of(ARG_COMMIT_MESSAGE) + .ok_or_else(|| anyhow!("{} arg is not specified", ARG_COMMIT_MESSAGE))?; + + Ok(( + source_cs_id, + target_cs_id, + from_dir, + to_dir, + author.to_string(), + msg.to_string(), + )) } pub async fn subcommand_rsync<'a>( @@ -150,44 +216,23 @@ pub async fn subcommand_rsync<'a>( ) -> Result<(), SubcommandError> { args::init_cachelib(fb, &matches); let ctx = CoreContext::new_with_logger(fb, logger.clone()); - let repo = args::open_repo(fb, &logger, &matches).await?; + let (source_repo, target_repo, _) = + get_source_target_repos_and_mapping(fb, logger, matches).await?; match sub_matches.subcommand() { (SUBCOMMAND_COPY, Some(sub_matches)) => { - let cs_id = sub_matches - .value_of(ARG_CSID) - .ok_or_else(|| anyhow!("{} arg is not specified", ARG_CSID))?; - - let cs_id = helpers::csid_resolve(ctx.clone(), repo.clone(), cs_id) - .compat() - .await?; - - let from_dir = sub_matches - .value_of(ARG_FROM_DIR) - .ok_or_else(|| anyhow!("{} arg is not specified", ARG_FROM_DIR))?; - let from_dir = MPath::new(from_dir)?; - - let to_dir = sub_matches - .value_of(ARG_TO_DIR) - .ok_or_else(|| anyhow!("{} arg is not specified", ARG_TO_DIR))?; - let to_dir = MPath::new(to_dir)?; - - let author = sub_matches - .value_of(ARG_COMMIT_AUTHOR) - .ok_or_else(|| anyhow!("{} arg is not specified", ARG_COMMIT_AUTHOR))?; - - let msg = sub_matches - .value_of(ARG_COMMIT_MESSAGE) - .ok_or_else(|| anyhow!("{} arg is not specified", ARG_COMMIT_MESSAGE))?; - + let (source_cs_id, target_cs_id, from_dir, to_dir, author, msg) = + parse_common_args(&ctx, sub_matches, &source_repo, &target_repo).await?; let cs_ids = copy( &ctx, - &repo, - cs_id, + &source_repo, + &target_repo, + source_cs_id, + target_cs_id, from_dir, to_dir, - author.to_string(), - msg.to_string(), + author, + msg, Limits::new(sub_matches), Options::new(sub_matches)?, ) @@ -201,43 +246,22 @@ pub async fn subcommand_rsync<'a>( println!("{}", result_cs_id); } (SUBCOMMAND_REMOVE_EXCESSIVE_FILES, Some(sub_matches)) => { - let cs_id = sub_matches - .value_of(ARG_CSID) - .ok_or_else(|| anyhow!("{} arg is not specified", ARG_CSID))?; - - let cs_id = helpers::csid_resolve(ctx.clone(), repo.clone(), cs_id) - .compat() - .await?; - - let from_dir = sub_matches - .value_of(ARG_FROM_DIR) - .ok_or_else(|| anyhow!("{} arg is not specified", ARG_FROM_DIR))?; - let from_dir = MPath::new(from_dir)?; - - let to_dir = sub_matches - .value_of(ARG_TO_DIR) - .ok_or_else(|| anyhow!("{} arg is not specified", ARG_TO_DIR))?; - let to_dir = MPath::new(to_dir)?; - - let author = sub_matches - .value_of(ARG_COMMIT_AUTHOR) - .ok_or_else(|| anyhow!("{} arg is not specified", ARG_COMMIT_AUTHOR))?; - - let msg = sub_matches - .value_of(ARG_COMMIT_MESSAGE) - .ok_or_else(|| anyhow!("{} arg is not specified", ARG_COMMIT_MESSAGE))?; + let (source_cs_id, target_cs_id, from_dir, to_dir, author, msg) = + parse_common_args(&ctx, sub_matches, &source_repo, &target_repo).await?; let maybe_file_num_limit: Option = args::get_and_parse_opt(sub_matches, ARG_FILE_NUM_LIMIT); let result_cs_id = remove_excessive_files( &ctx, - &repo, - cs_id, + &source_repo, + &target_repo, + source_cs_id, + target_cs_id, from_dir, to_dir, - author.to_string(), - msg.to_string(), + author, + msg, maybe_file_num_limit, ) .await?; @@ -299,8 +323,10 @@ impl Options { async fn copy( ctx: &CoreContext, - repo: &BlobRepo, - cs_id: ChangesetId, + source_repo: &BlobRepo, + target_repo: &BlobRepo, + source_cs_id: ChangesetId, + target_cs_id: ChangesetId, from_dir: MPath, to_dir: MPath, author: String, @@ -309,8 +335,8 @@ async fn copy( options: Options, ) -> Result, Error> { let (from_entries, to_entries) = try_join( - list_directory(&ctx, &repo, cs_id, &from_dir), - list_directory(&ctx, &repo, cs_id, &to_dir), + list_directory(&ctx, &source_repo, source_cs_id, &from_dir), + list_directory(&ctx, &target_repo, target_cs_id, &to_dir), ) .await?; let from_entries = from_entries.ok_or_else(|| Error::msg("from directory does not exist!"))?; @@ -321,6 +347,9 @@ async fn copy( // These are the file changes that have to be copied let mut file_changes = BTreeMap::new(); let mut total_file_size = 0; + let mut contents_to_upload = vec![]; + let same_repo = source_repo.get_repoid() == target_repo.get_repoid(); + for (from_suffix, fsnode_file) in from_entries { if let Some(ref regex) = options.maybe_exclude_file_regex { if from_suffix.matches_regex(®ex) { @@ -351,6 +380,11 @@ async fn copy( fsnode_file.size() ); file_changes.insert(to_path, Some((from_path, fsnode_file))); + + if !same_repo { + contents_to_upload.push(fsnode_file.content_id().clone()); + } + if let Some(lfs_threshold) = limits.lfs_threshold { if fsnode_file.size() < lfs_threshold.get() { total_file_size += fsnode_file.size(); @@ -376,13 +410,25 @@ async fn copy( } } + if !same_repo { + debug!( + ctx.logger(), + "Copying {} files contents from {} to {}", + contents_to_upload.len(), + source_repo.name(), + target_repo.name() + ); + copy_file_contents(ctx, source_repo, target_repo, contents_to_upload).await?; + } + create_changesets( ctx, - repo, + target_repo, vec![remove_file_changes, file_changes], - cs_id, + target_cs_id, author, msg, + same_repo, /* record_copy_from */ ) .await } @@ -394,6 +440,7 @@ async fn create_changesets( mut parent: ChangesetId, author: String, msg: String, + record_copy_from: bool, ) -> Result, Error> { let mut changesets = vec![]; let mut cs_ids = vec![]; @@ -405,12 +452,19 @@ async fn create_changesets( let mut fc = BTreeMap::new(); for (to_path, maybe_fsnode) in path_to_maybe_fsnodes { let maybe_file_change = match maybe_fsnode { - Some((from_path, fsnode_file)) => Some(FileChange::new( - *fsnode_file.content_id(), - *fsnode_file.file_type(), - fsnode_file.size(), - Some((from_path, parent)), - )), + Some((from_path, fsnode_file)) => { + let copy_from = if record_copy_from { + Some((from_path, parent)) + } else { + None + }; + Some(FileChange::new( + *fsnode_file.content_id(), + *fsnode_file.file_type(), + fsnode_file.size(), + copy_from, + )) + } None => None, }; @@ -433,8 +487,10 @@ async fn create_changesets( async fn remove_excessive_files( ctx: &CoreContext, - repo: &BlobRepo, - cs_id: ChangesetId, + source_repo: &BlobRepo, + target_repo: &BlobRepo, + source_cs_id: ChangesetId, + target_cs_id: ChangesetId, from_dir: MPath, to_dir: MPath, author: String, @@ -442,8 +498,8 @@ async fn remove_excessive_files( maybe_file_num_limit: Option, ) -> Result { let (from_entries, to_entries) = try_join( - list_directory(&ctx, &repo, cs_id, &from_dir), - list_directory(&ctx, &repo, cs_id, &to_dir), + list_directory(&ctx, &source_repo, source_cs_id, &from_dir), + list_directory(&ctx, &target_repo, target_cs_id, &to_dir), ) .await?; let from_entries = from_entries.ok_or_else(|| Error::msg("from directory does not exist!"))?; @@ -462,7 +518,16 @@ async fn remove_excessive_files( } } - let cs_ids = create_changesets(ctx, repo, vec![to_delete], cs_id, author, msg).await?; + let cs_ids = create_changesets( + ctx, + target_repo, + vec![to_delete], + target_cs_id, + author, + msg, + false, /* record_copy_from */ + ) + .await?; cs_ids .last() @@ -532,8 +597,10 @@ fn create_bonsai_changeset( #[cfg(test)] mod test { use super::*; - use blobrepo_factory::new_memblob_empty; + use blobrepo_factory::{new_memblob_empty, new_memblob_empty_with_id}; + use blobstore::StoreLoadable; use maplit::hashmap; + use mononoke_types::RepositoryId; use tests_utils::{list_working_copy_utf8, CreateCommitContext}; #[fbinit::compat_test] @@ -572,6 +639,8 @@ mod test { let new_cs_id = copy( &ctx, &repo, + &repo, + cs_id, cs_id, MPath::new("dir_from")?, MPath::new("dir_to")?, @@ -619,6 +688,8 @@ mod test { let first_cs_id = copy( &ctx, &repo, + &repo, + cs_id, cs_id, MPath::new("dir_from")?, MPath::new("dir_to")?, @@ -646,6 +717,8 @@ mod test { let second_cs_id = copy( &ctx, &repo, + &repo, + first_cs_id, first_cs_id, MPath::new("dir_from")?, MPath::new("dir_to")?, @@ -690,6 +763,8 @@ mod test { let cs_id = copy( &ctx, &repo, + &repo, + cs_id, cs_id, MPath::new("dir_from")?, MPath::new("dir_to")?, @@ -736,6 +811,8 @@ mod test { let first_cs_id = copy( &ctx, &repo, + &repo, + cs_id, cs_id, MPath::new("dir_from")?, MPath::new("dir_to")?, @@ -766,6 +843,8 @@ mod test { let second_cs_id = copy( &ctx, &repo, + &repo, + first_cs_id, first_cs_id, MPath::new("dir_from")?, MPath::new("dir_to")?, @@ -814,6 +893,8 @@ mod test { let cs_ids = copy( &ctx, &repo, + &repo, + cs_id, cs_id, MPath::new("dir_from")?, MPath::new("dir_to")?, @@ -861,6 +942,8 @@ mod test { let cs_ids = copy( &ctx, &repo, + &repo, + cs_id, cs_id, MPath::new("dir_from")?, MPath::new("dir_to")?, @@ -880,6 +963,8 @@ mod test { let cs_ids = copy( &ctx, &repo, + &repo, + cs_id, cs_id, MPath::new("dir_from")?, MPath::new("dir_to")?, @@ -912,6 +997,20 @@ mod test { } ); + let copy_bcs = cs_ids + .last() + .expect("changeset is expected to exist") + .load(&ctx, &repo.get_blobstore()) + .await?; + let file_changes = copy_bcs.file_changes_map(); + let a_change = file_changes + .get(&MPath::new("dir_to/a")?) + .expect("change to dir_to/a expected to be present in the map") + .clone() + .expect("change to dir_to/a expected to not be None"); + // Ensure that there is copy-from inserted when copying within the repo + assert!(a_change.copy_from().is_some()); + Ok(()) } @@ -930,6 +1029,8 @@ mod test { let cs_id = remove_excessive_files( &ctx, &repo, + &repo, + cs_id, cs_id, MPath::new("dir_from")?, MPath::new("dir_to")?, @@ -949,4 +1050,124 @@ mod test { Ok(()) } + + #[fbinit::compat_test] + async fn test_delete_excessive_files_xrepo(fb: FacebookInit) -> Result<(), Error> { + let ctx = CoreContext::test_mock(fb); + let source_repo = new_memblob_empty_with_id(None, RepositoryId::new(0))?; + let target_repo = new_memblob_empty_with_id(None, RepositoryId::new(1))?; + + let source_cs_id = CreateCommitContext::new_root(&ctx, &source_repo) + .add_file("dir_from/a", "a") + .commit() + .await?; + + let target_cs_id = CreateCommitContext::new_root(&ctx, &target_repo) + .add_file("dir_to/a", "a") + .add_file("dir_to/b", "b") + .add_file("dir_to/c/d", "c/d") + .commit() + .await?; + + let cs_id = remove_excessive_files( + &ctx, + &source_repo, + &target_repo, + source_cs_id, + target_cs_id, + MPath::new("dir_from")?, + MPath::new("dir_to")?, + "author".to_string(), + "msg".to_string(), + None, + ) + .await?; + + assert_eq!( + list_working_copy_utf8(&ctx, &target_repo, cs_id).await?, + hashmap! { + MPath::new("dir_to/a")? => "a".to_string(), + } + ); + + Ok(()) + } + + #[fbinit::compat_test] + async fn test_xrepo_rsync_with_overwrite(fb: FacebookInit) -> Result<(), Error> { + let ctx = CoreContext::test_mock(fb); + let source_repo = new_memblob_empty_with_id(None, RepositoryId::new(0))?; + let target_repo = new_memblob_empty_with_id(None, RepositoryId::new(1))?; + + let source_cs_id = CreateCommitContext::new_root(&ctx, &source_repo) + .add_file("dir_from/a", "aa") + .add_file("dir_from/b", "b") + .add_file("source_random_file", "sr") + .commit() + .await?; + + let target_cs_id = CreateCommitContext::new_root(&ctx, &target_repo) + .add_file("dir_to/a", "different") + .add_file("target_random_file", "tr") + .commit() + .await?; + + + let cs_ids = copy( + &ctx, + &source_repo, + &target_repo, + source_cs_id, + target_cs_id, + MPath::new("dir_from")?, + MPath::new("dir_to")?, + "author".to_string(), + "msg".to_string(), + Limits::default(), + Options { + overwrite: true, + ..Default::default() + }, + ) + .await?; + + assert_eq!( + list_working_copy_utf8(&ctx, &target_repo, *cs_ids.get(0).unwrap()).await?, + hashmap! { + MPath::new("target_random_file")? => "tr".to_string(), + } + ); + + assert_eq!( + list_working_copy_utf8(&ctx, &target_repo, *cs_ids.get(1).unwrap()).await?, + hashmap! { + MPath::new("dir_to/a")? => "aa".to_string(), + MPath::new("dir_to/b")? => "b".to_string(), + MPath::new("target_random_file")? => "tr".to_string(), + } + ); + + assert_eq!( + target_repo + .get_changeset_parents_by_bonsai(ctx.clone(), *cs_ids.get(0).unwrap()) + .await?, + vec![target_cs_id], + ); + + let copy_bcs = cs_ids + .get(1) + .expect("changeset is expected to exist") + .load(&ctx, &target_repo.get_blobstore()) + .await?; + let file_changes = copy_bcs.file_changes_map(); + let a_change = file_changes + .get(&MPath::new("dir_to/a")?) + .expect("change to dir_to/a expected to be present in the map") + .clone() + .expect("change to dir_to/a expected to not be None"); + // Ensure that there's no copy-from inserted when copying from another repo + assert!(a_change.copy_from().is_none()); + + Ok(()) + } } diff --git a/eden/mononoke/commit_rewriting/cross_repo_sync/src/lib.rs b/eden/mononoke/commit_rewriting/cross_repo_sync/src/lib.rs index d077ebbad7..0b1908ab8b 100644 --- a/eden/mononoke/commit_rewriting/cross_repo_sync/src/lib.rs +++ b/eden/mononoke/commit_rewriting/cross_repo_sync/src/lib.rs @@ -30,7 +30,7 @@ use maplit::{hashmap, hashset}; use mercurial_types::HgManifestId; use metaconfig_types::{CommitSyncConfig, CommitSyncConfigVersion, PushrebaseFlags}; use mononoke_types::{ - BonsaiChangeset, BonsaiChangesetMut, ChangesetId, FileChange, MPath, RepositoryId, + BonsaiChangeset, BonsaiChangesetMut, ChangesetId, ContentId, FileChange, MPath, RepositoryId, }; use movers::Mover; use pushrebase::{do_pushrebase_bonsai, PushrebaseError}; @@ -1645,6 +1645,32 @@ impl CommitSyncRepos { } } +pub async fn copy_file_contents<'a>( + ctx: &'a CoreContext, + source_repo: &'a BlobRepo, + target_repo: &'a BlobRepo, + content_ids: impl IntoIterator, +) -> Result<(), Error> { + let source_blobstore = source_repo.get_blobstore(); + let target_blobstore = target_repo.get_blobstore(); + let target_filestore_config = target_repo.filestore_config(); + let uploader: FuturesUnordered<_> = content_ids + .into_iter() + .map({ + |content_id| { + copy_content( + ctx, + &source_blobstore, + &target_blobstore, + target_filestore_config.clone(), + content_id, + ) + } + }) + .collect(); + uploader.try_for_each_concurrent(100, identity).await +} + pub async fn upload_commits<'a>( ctx: &'a CoreContext, rewritten_list: Vec, @@ -1660,25 +1686,7 @@ pub async fn upload_commits<'a>( .filter_map(|opt_change| opt_change.as_ref().map(|change| change.content_id())); files_to_sync.extend(new_files_to_sync); } - - let source_blobstore = source_repo.get_blobstore(); - let target_blobstore = target_repo.get_blobstore(); - let target_filestore_config = target_repo.filestore_config(); - let uploader: FuturesUnordered<_> = files_to_sync - .into_iter() - .map({ - |content_id| { - copy_content( - ctx, - &source_blobstore, - &target_blobstore, - target_filestore_config.clone(), - content_id, - ) - } - }) - .collect(); - uploader.try_for_each_concurrent(100, identity).await?; + copy_file_contents(ctx, source_repo, target_repo, files_to_sync).await?; save_bonsai_changesets(rewritten_list.clone(), ctx.clone(), target_repo.clone()).await?; Ok(()) }