Move dump_public_changeset_entries into mononoke_newadmin

Summary:
This moves the tool into mononoke_newadmin and uses the new clap, which makes parsing code much simpler.

Since this is an internal tool only, I deleted the old binary and moved all the code, and change all the test usages. Everything should work the same, just using `mononoke_newadmin dump-public-changesets` instead of `dump_public_changeset_entries`, and as a bonus admin is already deployed automatically :)

Reviewed By: kris1319

Differential Revision: D35578563

fbshipit-source-id: b7cc70d77862cd12a70849b1ffefc9ed65c5ab7c
This commit is contained in:
Yan Soares Couto 2022-04-21 10:42:28 -07:00 committed by Facebook GitHub Bot
parent eb80ece5a8
commit 59c396f572
10 changed files with 141 additions and 180 deletions

View File

@ -32,11 +32,6 @@ path = "cmds/compute_commit_stats/src/main.rs"
name = "configlint"
path = "cmds/configlint.rs"
[[bin]]
name = "dump_public_changeset_entries"
path = "cmds/dump_public_changeset_entries.rs"
test = false
[[bin]]
name = "dumprev"
path = "cmds/dumprev.rs"

View File

@ -1,154 +0,0 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use anyhow::{anyhow, Result};
use blobrepo::BlobRepo;
use bulkops::{Direction, PublicChangesetBulkFetch};
use bytes::Bytes;
use changesets::{deserialize_cs_entries, serialize_cs_entries, ChangesetEntry};
use clap_old::{Arg, ArgGroup};
use cmdlib::args::{self, RepoRequirement};
use cmdlib::helpers::csid_resolve;
use context::CoreContext;
use fbinit::FacebookInit;
use futures::{future, stream, StreamExt, TryStreamExt};
use mononoke_types::ChangesetId;
use phases::PhasesArc;
use std::path::Path;
const ARG_OUT_FILENAME: &str = "out-filename";
const ARG_START_COMMIT: &str = "start-commit";
const ARG_START_FROM_FILE_END: &str = "start-from-file-end";
const ARG_MERGE_FILE: &str = "merge-file";
const ARG_LIMIT: &str = "limit";
#[fbinit::main]
fn main(fb: FacebookInit) -> Result<()> {
let app = args::MononokeAppBuilder::new("Dump all public changeset entries to a file")
.with_advanced_args_hidden()
.with_fb303_args()
.with_repo_required(RepoRequirement::AtLeastOne)
.build()
.about(
"Utility to write public changeset for a given repo to a file. \
It can be used by other tools that want to avoid an expensive prefetching.",
)
.arg(
Arg::with_name(ARG_OUT_FILENAME)
.long(ARG_OUT_FILENAME)
.takes_value(true)
.required(true)
.help("file name where commits will be saved"),
)
.arg(
Arg::with_name(ARG_START_COMMIT)
.long(ARG_START_COMMIT)
.takes_value(true)
.help("start fetching from this commit rather than the beginning of time"),
)
.arg(
Arg::with_name(ARG_START_FROM_FILE_END)
.long(ARG_START_FROM_FILE_END)
.takes_value(true)
.help("start fetching from the last commit in this file, for incremental updates"),
)
.arg(
Arg::with_name(ARG_MERGE_FILE)
.long(ARG_MERGE_FILE)
.takes_value(true)
.multiple(true)
.help(
"Merge commits from this file into the final output. User is responsible for \
avoiding duplicate commits between files and database fetch. Can be repeated",
),
)
.arg(
Arg::with_name(ARG_LIMIT)
.long(ARG_LIMIT)
.takes_value(true)
.help("Only look at this many commits. Notice that this may output less than LIMIT \
commits if there are non-public commits, but it's a good way to do this command \
incrementally."),
)
.group(
ArgGroup::with_name("starting-commit")
.args(&[ARG_START_COMMIT, ARG_START_FROM_FILE_END]),
);
let matches = app.get_matches(fb)?;
let runtime = matches.runtime();
let logger = matches.logger();
let ctx = CoreContext::new_with_logger(fb, logger.clone());
let out_filename = matches
.value_of(ARG_OUT_FILENAME)
.ok_or_else(|| anyhow!("missing required argument: {}", ARG_OUT_FILENAME))?
.to_string();
let opt_start_file = matches.value_of_os(ARG_START_FROM_FILE_END);
let opt_start_commit = matches.value_of(ARG_START_COMMIT);
let opt_merge_files = matches.values_of_os(ARG_MERGE_FILE);
let merge_files = opt_merge_files
.into_iter()
.flatten()
.map(|path| load_file_contents(path.as_ref()));
let limit = matches
.value_of(ARG_LIMIT)
.map(|limit| limit.parse::<u64>())
.transpose()?;
let blob_repo_fut = args::open_repo(fb, &logger, &matches);
runtime.block_on(async move {
let repo: BlobRepo = blob_repo_fut.await?;
let fetcher =
PublicChangesetBulkFetch::new(repo.get_changesets_object(), repo.phases_arc());
let start_commit = {
if let Some(path) = opt_start_file {
load_last_commit(path.as_ref()).await?
} else if let Some(start_commit) = opt_start_commit {
Some(csid_resolve(&ctx, &repo, start_commit).await?)
} else {
None
}
};
let mut bounds = fetcher
.get_repo_bounds_after_commits(&ctx, start_commit.into_iter().collect())
.await?;
if let Some(limit) = limit {
bounds.1 = bounds.1.min(bounds.0 + limit);
}
let css = {
let (mut file_css, db_css): (Vec<_>, Vec<_>) = future::try_join(
stream::iter(merge_files).buffered(2).try_concat(),
fetcher
.fetch_bounded(&ctx, Direction::OldestFirst, Some(bounds))
.try_collect::<Vec<_>>(),
)
.await?;
file_css.extend(db_css.into_iter());
file_css
};
let serialized = serialize_cs_entries(css);
tokio::fs::write(out_filename, serialized).await?;
Ok(())
})
}
async fn load_file_contents(filename: &Path) -> Result<Vec<ChangesetEntry>> {
let file_contents = Bytes::from(tokio::fs::read(filename).await?);
deserialize_cs_entries(&file_contents)
}
async fn load_last_commit(filename: &Path) -> Result<Option<ChangesetId>> {
Ok(load_file_contents(filename).await?.last().map(|e| e.cs_id))
}

View File

@ -1902,15 +1902,6 @@ function backfill_derived_data() {
"$@"
}
function dump_public_changeset_entries() {
"$MONONOKE_DUMP_PUBLIC_CHANGESET_ENTRIES" \
--debug \
"${COMMON_ARGS[@]}" \
--repo-id "$REPOID" \
--mononoke-config-path "${TESTTMP}/mononoke-config" \
"$@"
}
function backfill_derived_data_multiple_repos() {
IFS=':' read -r -a ids <<< "${REPOS[*]}"
"$MONONOKE_BACKFILL_DERIVED_DATA" \

View File

@ -21,7 +21,6 @@ MONONOKE_BINS = {
"MONONOKE_BLOBSTORE_HEALER": "blobstore_healer",
"MONONOKE_BONSAI_VERIFY": "bonsai_verify",
"MONONOKE_CHECK_GIT_WC": "check_git_wc",
"MONONOKE_DUMP_PUBLIC_CHANGESET_ENTRIES": "dump_public_changeset_entries",
"MONONOKE_FASTREPLAY": "fastreplay",
"MONONOKE_GITIMPORT": "gitimport",
"MONONOKE_HG_SYNC": "mononoke_hg_sync_job",

View File

@ -22,11 +22,7 @@ setup configuration
backfill derived data
$ DERIVED_DATA_TYPE="fsnodes"
$ dump_public_changeset_entries --out-filename "$TESTTMP/prefetched_commits"
*] enabled stdlog with level: Error (set RUST_LOG to configure) (glob)
*] Initializing tunables: * (glob)
* using repo "repo" repoid RepositoryId(0) (glob)
*Reloading redacted config from configerator* (glob)
$ quiet mononoke_newadmin dump-public-changesets -R repo --out-filename "$TESTTMP/prefetched_commits"
$ backfill_derived_data backfill --prefetched-commits-path "$TESTTMP/prefetched_commits" "$DERIVED_DATA_TYPE" --limit 1
*] enabled stdlog with level: Error (set RUST_LOG to configure) (glob)

View File

@ -36,7 +36,7 @@ Regenerate microwave snapshot. This will fail because we have no derived data:
Derive data, then regenerate microwave snapshot:
$ quiet dump_public_changeset_entries --out-filename "$TESTTMP/prefetched_commits"
$ quiet mononoke_newadmin dump-public-changesets -R repo --out-filename "$TESTTMP/prefetched_commits"
$ quiet backfill_derived_data backfill --prefetched-commits-path "$TESTTMP/prefetched_commits" filenodes
$ quiet microwave_builder --debug blobstore

View File

@ -62,7 +62,7 @@ Now test without head option (tailer will fetch it from config) and with prefetc
> [segmented_changelog_config]
> master_bookmark="master_bookmark"
> CONFIG
$ dump_public_changeset_entries --out-filename "$TESTTMP/prefetched_commits" &> /dev/null
$ quiet mononoke_newadmin dump-public-changesets -R repo --out-filename "$TESTTMP/prefetched_commits"
$ quiet segmented_changelog_tailer_reseed --repo repo --prefetched-commits-path "$TESTTMP/prefetched_commits"
$ grep -e "repo_id: 0" -e "segmented_changelog_tailer" "$TESTTMP/quiet.last.log"
* reading prefetched commits from $TESTTMP/prefetched_commits (glob)

View File

@ -21,7 +21,7 @@ setup configuration
cloning repo in hg client 'repo2'
Dump current entries
$ quiet dump_public_changeset_entries --out-filename "$TESTTMP/init-dump"
$ quiet mononoke_newadmin dump-public-changesets -R repo --out-filename "$TESTTMP/init-dump"
$ stat -c '%s %N' "$TESTTMP/init-dump"
200 '$TESTTMP/init-dump'
@ -32,7 +32,7 @@ Add a new commit
$ hgmn push -r . --to master_bookmark -q
Dump the extra entry only
$ quiet dump_public_changeset_entries --out-filename "$TESTTMP/incr-dump" --start-from-file-end "$TESTTMP/init-dump"
$ quiet mononoke_newadmin dump-public-changesets -R repo --out-filename "$TESTTMP/incr-dump" --start-from-file-end "$TESTTMP/init-dump"
$ stat -c '%s %N' "$TESTTMP/incr-dump"
79 '$TESTTMP/incr-dump'
@ -43,8 +43,8 @@ Add a new commit
$ hgmn push -r . --to master_bookmark -q
Merge commit files, and compare to a straight dump
$ quiet dump_public_changeset_entries --out-filename "$TESTTMP/merge-dump" --start-from-file-end "$TESTTMP/incr-dump" --merge-file "$TESTTMP/init-dump" --merge-file "$TESTTMP/incr-dump"
$ quiet dump_public_changeset_entries --out-filename "$TESTTMP/full-dump"
$ quiet mononoke_newadmin dump-public-changesets -R repo --out-filename "$TESTTMP/merge-dump" --start-from-file-end "$TESTTMP/incr-dump" --merge-file "$TESTTMP/init-dump" --merge-file "$TESTTMP/incr-dump"
$ quiet mononoke_newadmin dump-public-changesets -R repo --out-filename "$TESTTMP/full-dump"
$ cmp "$TESTTMP/merge-dump" "$TESTTMP/full-dump"
$ stat -c '%s %N' "$TESTTMP/merge-dump" "$TESTTMP/full-dump"
356 '$TESTTMP/merge-dump'

View File

@ -20,4 +20,5 @@ mononoke_app::subcommands! {
mod repo_info;
mod skiplist;
mod ephemeral_store;
mod dump_public_changesets;
}

View File

@ -0,0 +1,133 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use anyhow::Result;
use bonsai_git_mapping::BonsaiGitMapping;
use bonsai_globalrev_mapping::BonsaiGlobalrevMapping;
use bonsai_hg_mapping::BonsaiHgMapping;
use bonsai_svnrev_mapping::BonsaiSvnrevMapping;
use bulkops::{Direction, PublicChangesetBulkFetch};
use bytes::Bytes;
use changesets::{
deserialize_cs_entries, serialize_cs_entries, ChangesetEntry, Changesets, ChangesetsArc,
};
use clap::Parser;
use futures::{future, stream, StreamExt, TryStreamExt};
use mononoke_app::args::RepoArgs;
use mononoke_app::MononokeApp;
use mononoke_types::ChangesetId;
use phases::{Phases, PhasesArc};
use std::num::NonZeroU64;
use std::path::Path;
use crate::commit_id::parse_commit_id;
/// Dump all public changeset entries to a file.
#[derive(Parser)]
pub struct CommandArgs {
/// Which repo to dump changesets from.
#[clap(flatten)]
repo: RepoArgs,
/// File name where commits will be saved.
#[clap(long)]
out_filename: String,
/// Start fetching from this commit rather than the beginning of time.
#[clap(long)]
start_commit: Option<String>,
/// Start fetching from the last commit in this file, for incremental updates.
#[clap(long)]
start_from_file_end: Option<String>,
/// Merge commits from this file into the final output. User is responsible for
/// avoiding duplicate commits between files and database fetch. Can be repeated.
#[clap(long)]
merge_file: Vec<String>,
/// Only look at this many commits. Notice that this may output less than LIMIT
/// commits if there are non-public commits, but it's a good way to do this command
/// incrementally.
#[clap(long)]
limit: Option<NonZeroU64>,
}
#[facet::container]
pub struct Repo {
#[facet]
bonsai_hg_mapping: dyn BonsaiHgMapping,
#[facet]
bonsai_git_mapping: dyn BonsaiGitMapping,
#[facet]
bonsai_globalrev_mapping: dyn BonsaiGlobalrevMapping,
#[facet]
bonsai_svnrev_mapping: dyn BonsaiSvnrevMapping,
#[facet]
changesets: dyn Changesets,
#[facet]
phases: dyn Phases,
}
pub async fn run(app: MononokeApp, args: CommandArgs) -> Result<()> {
let ctx = app.new_context();
let repo: Repo = app.open_repo(&args.repo).await?;
let fetcher = PublicChangesetBulkFetch::new(repo.changesets_arc(), repo.phases_arc());
let start_commit = {
if let Some(path) = args.start_from_file_end {
load_last_commit(path.as_ref()).await?
} else if let Some(start_commit) = args.start_commit {
Some(parse_commit_id(&ctx, &repo, &start_commit).await?)
} else {
None
}
};
let mut bounds = fetcher
.get_repo_bounds_after_commits(&ctx, start_commit.into_iter().collect())
.await?;
if let Some(limit) = args.limit {
bounds.1 = bounds.1.min(bounds.0 + limit.get());
}
let css = {
let (mut file_css, db_css): (Vec<_>, Vec<_>) = future::try_join(
stream::iter(
args.merge_file
.iter()
.map(|path| load_file_contents(path.as_ref()))
// prevent compiler bug
.collect::<Vec<_>>(),
)
.buffered(2)
.try_concat(),
fetcher
.fetch_bounded(&ctx, Direction::OldestFirst, Some(bounds))
.try_collect::<Vec<_>>(),
)
.await?;
file_css.extend(db_css.into_iter());
file_css
};
let serialized = serialize_cs_entries(css);
tokio::fs::write(args.out_filename, serialized).await?;
Ok(())
}
async fn load_file_contents(filename: &Path) -> Result<Vec<ChangesetEntry>> {
let file_contents = Bytes::from(tokio::fs::read(filename).await?);
deserialize_cs_entries(&file_contents)
}
async fn load_last_commit(filename: &Path) -> Result<Option<ChangesetId>> {
Ok(load_file_contents(filename).await?.last().map(|e| e.cs_id))
}