mononoke: support multiple directories in mononoke_admin rsync

Summary: It's useful to be able to copy multiple dirs at once

Reviewed By: markbt

Differential Revision: D29358375

fbshipit-source-id: f1cc351195cc2c19de36a1b6936b598e314848c3
This commit is contained in:
Stanislau Hlebik 2021-06-24 11:43:36 -07:00 committed by Facebook GitHub Bot
parent 1044dd545d
commit 129d4fa88f
2 changed files with 242 additions and 118 deletions

View File

@ -33,6 +33,7 @@ pub const ARG_EXCLUDE_FILE_REGEX: &str = "exclude-file-regex";
pub const ARG_TOTAL_FILE_NUM_LIMIT: &str = "total-file-num-limit";
pub const ARG_TOTAL_SIZE_LIMIT: &str = "total-size-limit";
pub const ARG_FROM_DIR: &str = "from-dir";
pub const ARG_FROM_TO_DIRS: &str = "from-to-dirs";
pub const ARG_LFS_THRESHOLD: &str = "lfs-threshold";
pub const ARG_OVERWRITE: &str = "overwrite";
pub const ARG_TO_DIR: &str = "to-dir";
@ -88,7 +89,7 @@ pub fn add_common_args<'a, 'b>(sub_m: App<'a, 'b>) -> App<'a, 'b> {
Arg::with_name(ARG_FROM_DIR)
.long(ARG_FROM_DIR)
.takes_value(true)
.required(true)
.required(false)
.help(
"name of the directory to copy from. \
Error is return if this path doesn't exist or if it's a file",
@ -98,12 +99,21 @@ pub fn add_common_args<'a, 'b>(sub_m: App<'a, 'b>) -> App<'a, 'b> {
Arg::with_name(ARG_TO_DIR)
.long(ARG_TO_DIR)
.takes_value(true)
.required(true)
.required(false)
.help(
"name of the directory to copy to. \
Error is return if this path is a file",
),
)
.arg(
Arg::with_name(ARG_FROM_TO_DIRS)
.long(ARG_FROM_TO_DIRS)
.multiple(true)
.takes_value(true)
.required(false)
.help("'from_dir=to_dir' directories that needs copying")
.conflicts_with_all(&[ARG_FROM_DIR, ARG_TO_DIR]),
)
.arg(
Arg::with_name(ARG_COMMIT_MESSAGE)
.long(ARG_COMMIT_MESSAGE)
@ -148,7 +158,16 @@ async fn parse_common_args<'a>(
matches: &'a ArgMatches<'_>,
source_repo: &'a BlobRepo,
target_repo: &'a BlobRepo,
) -> Result<(ChangesetId, ChangesetId, MPath, MPath, String, String), Error> {
) -> Result<
(
ChangesetId,
ChangesetId,
Vec<(MPath, MPath)>,
String,
String,
),
Error,
> {
let source_cs_id = matches
.value_of(ARG_SOURCE_CSID)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_SOURCE_CSID))?;
@ -173,15 +192,30 @@ async fn parse_common_args<'a>(
)
.await?;
let from_dir = matches
.value_of(ARG_FROM_DIR)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_FROM_DIR))?;
let from_dir = MPath::new(from_dir)?;
let from_to_dirs = if let Some(from_to_dirs) = matches.value_of(ARG_FROM_TO_DIRS) {
let mut res = vec![];
for from_to in from_to_dirs.split(',') {
let dirs = from_to.split('=').collect::<Vec<_>>();
if dirs.len() != 2 {
return Err(anyhow!("invalid format of {}", ARG_FROM_TO_DIRS));
}
res.push((MPath::new(dirs[0])?, MPath::new(dirs[1])?));
}
let to_dir = matches
.value_of(ARG_TO_DIR)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_TO_DIR))?;
let to_dir = MPath::new(to_dir)?;
res
} else {
let from_dir = matches
.value_of(ARG_FROM_DIR)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_FROM_DIR))?;
let from_dir = MPath::new(from_dir)?;
let to_dir = matches
.value_of(ARG_TO_DIR)
.ok_or_else(|| anyhow!("{} arg is not specified", ARG_TO_DIR))?;
let to_dir = MPath::new(to_dir)?;
vec![(from_dir, to_dir)]
};
let author = matches
.value_of(ARG_COMMIT_AUTHOR)
@ -194,8 +228,7 @@ async fn parse_common_args<'a>(
Ok((
source_cs_id,
target_cs_id,
from_dir,
to_dir,
from_to_dirs,
author.to_string(),
msg.to_string(),
))
@ -213,7 +246,7 @@ pub async fn subcommand_rsync<'a>(
match sub_matches.subcommand() {
(SUBCOMMAND_COPY, Some(sub_matches)) => {
let (source_cs_id, target_cs_id, from_dir, to_dir, author, msg) =
let (source_cs_id, target_cs_id, from_to_dirs, author, msg) =
parse_common_args(&ctx, sub_matches, &source_repo, &target_repo).await?;
let cs_ids = copy(
&ctx,
@ -221,8 +254,7 @@ pub async fn subcommand_rsync<'a>(
&target_repo,
source_cs_id,
target_cs_id,
from_dir,
to_dir,
from_to_dirs,
author,
msg,
limits_from_matches(sub_matches),
@ -238,7 +270,7 @@ pub async fn subcommand_rsync<'a>(
println!("{}", result_cs_id);
}
(SUBCOMMAND_REMOVE_EXCESSIVE_FILES, Some(sub_matches)) => {
let (source_cs_id, target_cs_id, from_dir, to_dir, author, msg) =
let (source_cs_id, target_cs_id, from_to_dirs, author, msg) =
parse_common_args(&ctx, sub_matches, &source_repo, &target_repo).await?;
let maybe_total_file_num_limit: Option<NonZeroU64> =
@ -250,8 +282,7 @@ pub async fn subcommand_rsync<'a>(
&target_repo,
source_cs_id,
target_cs_id,
from_dir,
to_dir,
from_to_dirs,
author,
msg,
maybe_total_file_num_limit,

View File

@ -43,21 +43,12 @@ pub async fn copy(
target_repo: &BlobRepo,
source_cs_id: ChangesetId,
target_cs_id: ChangesetId,
from_dir: MPath,
to_dir: MPath,
from_to_dirs: Vec<(MPath, MPath)>,
author: String,
msg: String,
limits: Limits,
options: Options,
) -> Result<Vec<ChangesetId>, Error> {
let (from_entries, to_entries) = try_join(
list_directory(&ctx, &source_repo, source_cs_id, &from_dir),
list_directory(&ctx, &target_repo, target_cs_id, &to_dir),
)
.await?;
let from_entries = from_entries.ok_or_else(|| Error::msg("from directory does not exist!"))?;
let to_entries = to_entries.unwrap_or_else(BTreeMap::new);
// These are the file changes that have to be removed first
let mut remove_file_changes = BTreeMap::new();
// These are the file changes that have to be copied
@ -66,62 +57,73 @@ pub async fn copy(
let mut contents_to_upload = vec![];
let same_repo = source_repo.get_repoid() == target_repo.get_repoid();
for (from_suffix, fsnode_file) in from_entries {
if let Some(ref regex) = options.maybe_exclude_file_regex {
if from_suffix.matches_regex(&regex) {
continue;
}
}
for (from_dir, to_dir) in from_to_dirs {
let (from_entries, to_entries) = try_join(
list_directory(&ctx, &source_repo, source_cs_id, &from_dir),
list_directory(&ctx, &target_repo, target_cs_id, &to_dir),
)
.await?;
let from_entries =
from_entries.ok_or_else(|| Error::msg("from directory does not exist!"))?;
let to_entries = to_entries.unwrap_or_else(BTreeMap::new);
let from_path = from_dir.join(&from_suffix);
let to_path = to_dir.join(&from_suffix);
if let Some(to_fsnode) = to_entries.get(&from_suffix) {
if to_fsnode == &fsnode_file {
continue;
for (from_suffix, fsnode_file) in from_entries {
if let Some(ref regex) = options.maybe_exclude_file_regex {
if from_suffix.matches_regex(&regex) {
continue;
}
}
if options.overwrite {
remove_file_changes.insert(to_path.clone(), None);
let from_path = from_dir.join(&from_suffix);
let to_path = to_dir.join(&from_suffix);
if let Some(to_fsnode) = to_entries.get(&from_suffix) {
if to_fsnode == &fsnode_file {
continue;
}
if options.overwrite {
remove_file_changes.insert(to_path.clone(), None);
} else {
continue;
}
}
debug!(
ctx.logger(),
"from {}, to {}, size: {}",
from_path,
to_path,
fsnode_file.size()
);
file_changes.insert(to_path, Some((from_path, fsnode_file)));
if !same_repo {
contents_to_upload.push(fsnode_file.content_id().clone());
}
if let Some(lfs_threshold) = limits.lfs_threshold {
if fsnode_file.size() < lfs_threshold.get() {
total_file_size += fsnode_file.size();
} else {
debug!(
ctx.logger(),
"size is not accounted because of lfs threshold"
);
}
} else {
continue;
}
}
debug!(
ctx.logger(),
"from {}, to {}, size: {}",
from_path,
to_path,
fsnode_file.size()
);
file_changes.insert(to_path, Some((from_path, fsnode_file)));
if !same_repo {
contents_to_upload.push(fsnode_file.content_id().clone());
}
if let Some(lfs_threshold) = limits.lfs_threshold {
if fsnode_file.size() < lfs_threshold.get() {
total_file_size += fsnode_file.size();
} else {
debug!(
ctx.logger(),
"size is not accounted because of lfs threshold"
);
}
} else {
total_file_size += fsnode_file.size();
}
if let Some(limit) = limits.total_file_num_limit {
if file_changes.len() as u64 >= limit.get() {
break;
if let Some(limit) = limits.total_file_num_limit {
if file_changes.len() as u64 >= limit.get() {
break;
}
}
}
if let Some(limit) = limits.total_size_limit {
if total_file_size as u64 > limit.get() {
break;
if let Some(limit) = limits.total_size_limit {
if total_file_size as u64 > limit.get() {
break;
}
}
}
}
@ -207,28 +209,31 @@ pub async fn remove_excessive_files(
target_repo: &BlobRepo,
source_cs_id: ChangesetId,
target_cs_id: ChangesetId,
from_dir: MPath,
to_dir: MPath,
from_to_dirs: Vec<(MPath, MPath)>,
author: String,
msg: String,
maybe_total_file_num_limit: Option<NonZeroU64>,
) -> Result<ChangesetId, Error> {
let (from_entries, to_entries) = try_join(
list_directory(&ctx, &source_repo, source_cs_id, &from_dir),
list_directory(&ctx, &target_repo, target_cs_id, &to_dir),
)
.await?;
let from_entries = from_entries.ok_or_else(|| Error::msg("from directory does not exist!"))?;
let to_entries = to_entries.unwrap_or_else(BTreeMap::new);
let mut to_delete = BTreeMap::new();
for to_suffix in to_entries.keys() {
if !from_entries.contains_key(to_suffix) {
let to_path = to_dir.join(to_suffix);
to_delete.insert(to_path, None);
if let Some(limit) = maybe_total_file_num_limit {
if to_delete.len() as u64 >= limit.get() {
break;
for (from_dir, to_dir) in from_to_dirs {
let (from_entries, to_entries) = try_join(
list_directory(&ctx, &source_repo, source_cs_id, &from_dir),
list_directory(&ctx, &target_repo, target_cs_id, &to_dir),
)
.await?;
let from_entries =
from_entries.ok_or_else(|| Error::msg("from directory does not exist!"))?;
let to_entries = to_entries.unwrap_or_else(BTreeMap::new);
for to_suffix in to_entries.keys() {
if !from_entries.contains_key(to_suffix) {
let to_path = to_dir.join(to_suffix);
to_delete.insert(to_path, None);
if let Some(limit) = maybe_total_file_num_limit {
if to_delete.len() as u64 >= limit.get() {
break;
}
}
}
}
@ -359,8 +364,7 @@ mod test {
&repo,
cs_id,
cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
vec![(MPath::new("dir_from")?, MPath::new("dir_to")?)],
"author".to_string(),
"msg".to_string(),
Limits::default(),
@ -385,6 +389,62 @@ mod test {
Ok(())
}
#[fbinit::test]
async fn test_rsync_multiple(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let repo: BlobRepo = test_repo_factory::build_empty()?;
let cs_id = CreateCommitContext::new_root(&ctx, &repo)
.add_file("dir_from_1/a", "a")
.add_file("dir_from_1/b", "b")
.add_file("dir_from_1/c", "c")
.add_file("dir_to_1/a", "dontoverwrite")
.add_file("dir_from_2/aa", "aa")
.add_file("dir_from_2/bb", "bb")
.add_file("dir_from_2/cc", "cc")
.commit()
.await?;
let new_cs_id = copy(
&ctx,
&repo,
&repo,
cs_id,
cs_id,
vec![
(MPath::new("dir_from_1")?, MPath::new("dir_to_1")?),
(MPath::new("dir_from_2")?, MPath::new("dir_to_2")?),
],
"author".to_string(),
"msg".to_string(),
Limits::default(),
Options::default(),
)
.await?
.last()
.copied()
.unwrap();
assert_eq!(
list_working_copy_utf8(&ctx, &repo, new_cs_id,).await?,
hashmap! {
MPath::new("dir_from_1/a")? => "a".to_string(),
MPath::new("dir_from_1/b")? => "b".to_string(),
MPath::new("dir_from_1/c")? => "c".to_string(),
MPath::new("dir_to_1/a")? => "dontoverwrite".to_string(),
MPath::new("dir_to_1/b")? => "b".to_string(),
MPath::new("dir_to_1/c")? => "c".to_string(),
MPath::new("dir_from_2/aa")? => "aa".to_string(),
MPath::new("dir_from_2/bb")? => "bb".to_string(),
MPath::new("dir_from_2/cc")? => "cc".to_string(),
MPath::new("dir_to_2/aa")? => "aa".to_string(),
MPath::new("dir_to_2/bb")? => "bb".to_string(),
MPath::new("dir_to_2/cc")? => "cc".to_string(),
}
);
Ok(())
}
#[fbinit::test]
async fn test_rsync_with_limit(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
@ -408,8 +468,7 @@ mod test {
&repo,
cs_id,
cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
vec![(MPath::new("dir_from")?, MPath::new("dir_to")?)],
"author".to_string(),
"msg".to_string(),
limit.clone(),
@ -437,8 +496,7 @@ mod test {
&repo,
first_cs_id,
first_cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
vec![(MPath::new("dir_from")?, MPath::new("dir_to")?)],
"author".to_string(),
"msg".to_string(),
limit,
@ -483,8 +541,7 @@ mod test {
&repo,
cs_id,
cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
vec![(MPath::new("dir_from")?, MPath::new("dir_to")?)],
"author".to_string(),
"msg".to_string(),
Limits::default(),
@ -531,8 +588,7 @@ mod test {
&repo,
cs_id,
cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
vec![(MPath::new("dir_from")?, MPath::new("dir_to")?)],
"author".to_string(),
"msg".to_string(),
Limits {
@ -563,8 +619,7 @@ mod test {
&repo,
first_cs_id,
first_cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
vec![(MPath::new("dir_from")?, MPath::new("dir_to")?)],
"author".to_string(),
"msg".to_string(),
Limits {
@ -613,8 +668,7 @@ mod test {
&repo,
cs_id,
cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
vec![(MPath::new("dir_from")?, MPath::new("dir_to")?)],
"author".to_string(),
"msg".to_string(),
Limits {
@ -662,8 +716,7 @@ mod test {
&repo,
cs_id,
cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
vec![(MPath::new("dir_from")?, MPath::new("dir_to")?)],
"author".to_string(),
"msg".to_string(),
Limits::default(),
@ -683,8 +736,7 @@ mod test {
&repo,
cs_id,
cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
vec![(MPath::new("dir_from")?, MPath::new("dir_to")?)],
"author".to_string(),
"msg".to_string(),
Limits::default(),
@ -749,8 +801,7 @@ mod test {
&repo,
cs_id,
cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
vec![(MPath::new("dir_from")?, MPath::new("dir_to")?)],
"author".to_string(),
"msg".to_string(),
None,
@ -768,6 +819,50 @@ mod test {
Ok(())
}
#[fbinit::test]
async fn test_delete_excessive_files_multiple_dirs(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let repo: BlobRepo = test_repo_factory::build_empty()?;
let cs_id = CreateCommitContext::new_root(&ctx, &repo)
.add_file("dir_from_1/a", "a")
.add_file("dir_to_1/a", "a")
.add_file("dir_to_1/b", "b")
.add_file("dir_to_1/c/d", "c/d")
.add_file("dir_from_2/a", "a")
.add_file("dir_to_2/a", "a")
.add_file("dir_to_2/b", "b")
.commit()
.await?;
let cs_id = remove_excessive_files(
&ctx,
&repo,
&repo,
cs_id,
cs_id,
vec![
(MPath::new("dir_from_1")?, MPath::new("dir_to_1")?),
(MPath::new("dir_from_2")?, MPath::new("dir_to_2")?),
],
"author".to_string(),
"msg".to_string(),
None,
)
.await?;
assert_eq!(
list_working_copy_utf8(&ctx, &repo, cs_id,).await?,
hashmap! {
MPath::new("dir_from_1/a")? => "a".to_string(),
MPath::new("dir_to_1/a")? => "a".to_string(),
MPath::new("dir_from_2/a")? => "a".to_string(),
MPath::new("dir_to_2/a")? => "a".to_string(),
}
);
Ok(())
}
#[fbinit::test]
async fn test_delete_excessive_files_xrepo(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
@ -793,8 +888,7 @@ mod test {
&target_repo,
source_cs_id,
target_cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
vec![(MPath::new("dir_from")?, MPath::new("dir_to")?)],
"author".to_string(),
"msg".to_string(),
None,
@ -838,8 +932,7 @@ mod test {
&target_repo,
source_cs_id,
target_cs_id,
MPath::new("dir_from")?,
MPath::new("dir_to")?,
vec![(MPath::new("dir_from")?, MPath::new("dir_to")?)],
"author".to_string(),
"msg".to_string(),
Limits::default(),