migrated segmented_changelog_tailer to new CLI framework

Summary:
1. Migrated CLI flags and application setup to new framework.
2. Deprecated `--repo` flag in favor of `MultiRepoArgs` (`--repo-id` and `--repo-name` flags).
3. Refactored clone hints future stream so it can be used with `tokio::spawn()` which requires static lifetime of variables used in async scopes.

Reviewed By: mitrandir77

Differential Revision: D38282271

fbshipit-source-id: 77fc50662669d537059cb2297d0f4bfc27d8a07c
This commit is contained in:
Pavel Golubev 2022-08-03 01:55:05 -07:00 committed by Facebook GitHub Bot
parent af61d688e2
commit 59b5ae7e1f
2 changed files with 107 additions and 153 deletions

View File

@ -7,158 +7,110 @@
use std::time::Duration; use std::time::Duration;
use anyhow::format_err;
use anyhow::Context; use anyhow::Context;
use anyhow::Error; use anyhow::Error;
use blobrepo::BlobRepo; use blobrepo::BlobRepo;
use bytes::Bytes; use bytes::Bytes;
use changesets::deserialize_cs_entries; use changesets::deserialize_cs_entries;
use clap_old::Arg; use clap::Parser;
use cmdlib::args;
use cmdlib::args::MononokeMatches;
use cmdlib::helpers; use cmdlib::helpers;
use context::CoreContext;
use context::SessionContainer;
use fbinit::FacebookInit; use fbinit::FacebookInit;
use futures::future::join_all; use futures::future::join_all;
use futures::stream; use futures::stream;
use mononoke_app::args::MultiRepoArgs;
use mononoke_app::args::RepoArg;
use mononoke_app::fb303::Fb303AppExtension;
use mononoke_app::MononokeApp;
use mononoke_app::MononokeAppBuilder;
use segmented_changelog::seedheads_from_config; use segmented_changelog::seedheads_from_config;
use segmented_changelog::SegmentedChangelogTailer; use segmented_changelog::SegmentedChangelogTailer;
use slog::error;
use slog::info; use slog::info;
use slog::o; use slog::o;
const ONCE_ARG: &str = "once"; /// Updates segmented changelog assets
const REPO_ARG: &str = "repo"; #[derive(Parser)]
const HEAD_ARG: &str = "head"; struct SegmentedChangelogTailerArgs {
const CONFIG_HEADS_ARG: &str = "include-config-heads"; /// Repository to warm-up
const FORCE_RESEED_ARG: &str = "force-reseed"; #[clap(flatten)]
const ARG_PREFETCHED_COMMITS_PATH: &str = "prefetched-commits-path"; repos: MultiRepoArgs,
/// Repository name to warm-up. Deprecated, use --repo-name/--repo-id instead
// Deprecated, use repos instead
#[clap(long = "repo")]
repo_names: Vec<String>,
/// When set, the tailer will perform a single incremental build run. If no previous version exists it will perform full reseed instead
#[clap(long)]
once: bool,
/// A file with a serialized list of ChangesetEntry, which can be used to speed up rebuilding of segmented changelog
#[clap(long)]
prefetched_commits_path: Option<String>,
/// What heads to use for Segmented Changelog. If not provided, tailer will use the config to obtain heads
#[clap(long)]
head: Vec<String>,
/// Force use of the configured heads, as well as any specified on the command line
#[clap(long)]
include_config_heads: bool,
/// When set, the tailer will perform a single full reseed run
#[clap(long, conflicts_with = "once")]
force_reseed: bool,
}
#[fbinit::main] #[fbinit::main]
fn main(fb: FacebookInit) -> Result<(), Error> { fn main(fb: FacebookInit) -> Result<(), Error> {
let app = args::MononokeAppBuilder::new("Updates segmented changelog assets.") let app = MononokeAppBuilder::new(fb)
.with_scuba_logging_args() .with_app_extension(Fb303AppExtension {})
.with_advanced_args_hidden() .build::<SegmentedChangelogTailerArgs>()?;
.with_fb303_args()
.build()
.about("Builds a new version of segmented changelog.")
.arg(
Arg::with_name(REPO_ARG)
.long(REPO_ARG)
.takes_value(true)
.required(true)
.multiple(true)
.help("Repository name to warm-up"),
)
.arg(
Arg::with_name(ONCE_ARG)
.long(ONCE_ARG)
.takes_value(false)
.required(false)
.help(
"When set, the tailer will perform a single incremental build run. \
If no previous version exists it will perform full reseed instead",
),
)
.arg(
Arg::with_name(ARG_PREFETCHED_COMMITS_PATH)
.long(ARG_PREFETCHED_COMMITS_PATH)
.takes_value(true)
.required(false)
.help(
"a file with a serialized list of ChangesetEntry, \
which can be used to speed up rebuilding of segmented changelog",
),
)
.arg(
Arg::with_name(HEAD_ARG)
.long(HEAD_ARG)
.takes_value(true)
.multiple(true)
.help(
"What heads to use for Segmented Changelog. If not provided, \
tailer will use the config to obtain heads.",
),
)
.arg(
Arg::with_name(CONFIG_HEADS_ARG)
.long(CONFIG_HEADS_ARG)
.takes_value(false)
.help(
"Force use of the configured heads, as well as any \
specified on the command line",
),
)
.arg(
Arg::with_name(FORCE_RESEED_ARG)
.long(FORCE_RESEED_ARG)
.takes_value(false)
.conflicts_with(ONCE_ARG)
.help("When set, the tailer will perform a single full reseed run."),
);
let matches = app.get_matches(fb)?;
let logger = matches.logger(); let fb303_args = app.extension_args::<Fb303AppExtension>()?;
let session = SessionContainer::new_with_defaults(fb); fb303_args.start_fb303_server(
let ctx = session.new_context(logger.clone(), matches.scuba_sample_builder());
helpers::block_execute(
run(ctx, &matches),
fb, fb,
&std::env::var("TW_JOB_NAME").unwrap_or_else(|_| "segmented_changelog_tailer".to_string()), "segmented_changelog_tailer",
logger, app.logger(),
&matches,
cmdlib::monitoring::AliveService, cmdlib::monitoring::AliveService,
) )?;
app.run(async_main)
} }
async fn run<'a>(ctx: CoreContext, matches: &'a MononokeMatches<'a>) -> Result<(), Error> { async fn async_main(app: MononokeApp) -> Result<(), Error> {
let reponames: Vec<_> = matches let args: SegmentedChangelogTailerArgs = app.args()?;
.values_of(REPO_ARG)
.ok_or_else(|| format_err!("--{} argument is required", REPO_ARG))?
.map(ToString::to_string)
.collect();
if reponames.is_empty() {
error!(ctx.logger(), "At least one repo had to be specified");
return Ok(());
}
let prefetched_commits = match matches.value_of(ARG_PREFETCHED_COMMITS_PATH) { let repos = MultiRepoArgs {
repo_id: args.repos.repo_id,
repo_name: args
.repos
.repo_name
.into_iter()
.chain(args.repo_names.into_iter())
.collect(),
};
// This is a bit weird from the dependency point of view but I think that it is best. The
// BlobRepo may have a SegmentedChangelog attached to it but that doesn't hurt us in any
// way. On the other hand reconstructing the dependencies for SegmentedChangelog without
// BlobRepo is probably prone to more problems from the maintenance perspective.
let blobrepos: Vec<BlobRepo> = app.open_repos(&repos).await?;
let prefetched_commits = match args.prefetched_commits_path {
Some(path) => { Some(path) => {
info!(ctx.logger(), "reading prefetched commits from {}", path); info!(app.logger(), "reading prefetched commits from {}", path);
let data = tokio::fs::read(path).await?; let data = tokio::fs::read(&path).await?;
deserialize_cs_entries(&Bytes::from(data)) deserialize_cs_entries(&Bytes::from(data))
.with_context(|| format!("failed to parse serialized cs entries from {}", path))? .with_context(|| format!("failed to parse serialized cs entries from {}", path))?
} }
None => vec![], None => vec![],
}; };
let config_store = matches.config_store(); let ctx = app.new_context();
let mysql_options = matches.mysql_options();
let configs = args::load_repo_configs(config_store, matches)?;
let mut tasks = Vec::new(); let mut tasks = Vec::new();
for (index, reponame) in reponames.into_iter().enumerate() { for (index, blobrepo) in blobrepos.into_iter().enumerate() {
let config = configs let repo_id = blobrepo.get_repoid();
.repos let (repo_name, config) = app.repo_config(RepoArg::Id(repo_id))?;
.get(&reponame)
.ok_or_else(|| format_err!("unknown repository: {}", reponame))?;
let repo_id = config.repoid;
info!( info!(
ctx.logger(), ctx.logger(),
"repo name '{}' translates to id {}", reponame, repo_id "repo name '{}' translates to id {}", repo_name, repo_id
); );
// This is a bit weird from the dependency point of view but I think that it is best. The
// BlobRepo may have a SegmentedChangelog attached to it but that doesn't hurt us in any
// way. On the other hand reconstructing the dependencies for SegmentedChangelog without
// BlobRepo is probably prone to more problems from the maintenance perspective.
let blobrepo: BlobRepo =
args::open_repo_with_repo_id(ctx.fb, ctx.logger(), repo_id, matches).await?;
let ctx = ctx.clone_with_logger(ctx.logger().new(o!("repo_id" => repo_id.to_string())));
let prefetched_commits = stream::iter(prefetched_commits.iter().filter_map(|entry| { let prefetched_commits = stream::iter(prefetched_commits.iter().filter_map(|entry| {
if entry.repo_id == repo_id { if entry.repo_id == repo_id {
Some(Ok(entry.clone())) Some(Ok(entry.clone()))
@ -167,30 +119,29 @@ async fn run<'a>(ctx: CoreContext, matches: &'a MononokeMatches<'a>) -> Result<(
} }
})); }));
let ctx = ctx.clone_with_logger(ctx.logger().new(o!("repo_id" => repo_id.to_string())));
let seed_heads = { let seed_heads = {
let head_args = matches.values_of(HEAD_ARG); let mut heads = if args.head.is_empty() || args.include_config_heads {
let head_args_len = head_args.as_ref().map_or(0, |a| a.len());
let mut heads = if head_args.is_none() || matches.is_present(CONFIG_HEADS_ARG) {
let mut heads = seedheads_from_config( let mut heads = seedheads_from_config(
&ctx, &ctx,
&config.segmented_changelog_config, &config.segmented_changelog_config,
segmented_changelog::JobType::Background, segmented_changelog::JobType::Background,
)?; )?;
heads.reserve(head_args_len); heads.reserve(args.head.len());
heads heads
} else { } else {
Vec::with_capacity(head_args_len) Vec::with_capacity(args.head.len())
}; };
if let Some(head_args) = head_args {
for head_arg in head_args { for head_arg in &args.head {
let head = helpers::csid_resolve(&ctx, blobrepo.clone(), head_arg) let head = helpers::csid_resolve(&ctx, blobrepo.clone(), head_arg)
.await .await
.with_context(|| { .with_context(|| {
format!("resolving head csid '{}' for repo {}", head_arg, repo_id) format!("resolving head csid '{}' for repo {}", head_arg, repo_id)
})?; })?;
info!(ctx.logger(), "using '{}' for head", head); info!(ctx.logger(), "using '{}' for head", head);
heads.push(head.into()); heads.push(head.into());
}
} }
heads heads
}; };
@ -199,7 +150,7 @@ async fn run<'a>(ctx: CoreContext, matches: &'a MononokeMatches<'a>) -> Result<(
&ctx, &ctx,
&blobrepo, &blobrepo,
&config.storage_config.metadata, &config.storage_config.metadata,
mysql_options, app.mysql_options(),
seed_heads, seed_heads,
prefetched_commits, prefetched_commits,
None, None,
@ -208,9 +159,9 @@ async fn run<'a>(ctx: CoreContext, matches: &'a MononokeMatches<'a>) -> Result<(
info!(ctx.logger(), "SegmentedChangelogTailer initialized",); info!(ctx.logger(), "SegmentedChangelogTailer initialized",);
if matches.is_present(ONCE_ARG) || matches.is_present(FORCE_RESEED_ARG) { if args.once || args.force_reseed {
segmented_changelog_tailer segmented_changelog_tailer
.once(&ctx, matches.is_present(FORCE_RESEED_ARG)) .once(&ctx, args.force_reseed)
.await .await
.with_context(|| format!("incrementally building repo {}", repo_id))?; .with_context(|| format!("incrementally building repo {}", repo_id))?;
info!(ctx.logger(), "SegmentedChangelogTailer is done",); info!(ctx.logger(), "SegmentedChangelogTailer is done",);

View File

@ -203,26 +203,29 @@ impl CloneHints {
debug!(ctx.logger(), "Uploading {} hint entries", new_hints.len()); debug!(ctx.logger(), "Uploading {} hint entries", new_hints.len());
let hint_blob_keys: Vec<_> = stream::iter(new_hints.chunks_exact(HINTS_PER_CHUNK).map( let chunks = new_hints
|chunk| async move { .chunks_exact(HINTS_PER_CHUNK)
.map(|chunk| {
let chunk: Vec<_> = chunk.iter().collect(); let chunk: Vec<_> = chunk.iter().collect();
let chunk = mincode::serialize(&chunk)?; mincode::serialize(&chunk)
let chunk_hash = { })
let mut context = hash::Context::new(b"segmented_clone"); .collect::<Result<Vec<_>, _>>()?;
context.update(&chunk);
context.finish() let hint_blob_keys: Vec<_> = stream::iter(chunks.into_iter().map(|chunk| async move {
}; let chunk_hash = {
let chunk_key = let mut context = hash::Context::new(b"segmented_clone");
format!("segmented_clone_v1_idmapv{}.{}", idmap_version, chunk_hash); context.update(&chunk);
let blob = BlobstoreBytes::from_bytes(chunk); context.finish()
self.inner };
.blobstore let chunk_key = format!("segmented_clone_v1_idmapv{}.{}", idmap_version, chunk_hash);
.put(ctx, chunk_key.clone(), blob) let blob = BlobstoreBytes::from_bytes(chunk);
.await?; self.inner
debug!(ctx.logger(), "Uploaded hint entry {}", &chunk_key); .blobstore
Ok::<_, Error>(chunk_key) .put(ctx, chunk_key.clone(), blob)
}, .await?;
)) debug!(ctx.logger(), "Uploaded hint entry {}", &chunk_key);
Ok::<_, Error>(chunk_key)
}))
.buffer_unordered(100) .buffer_unordered(100)
.try_collect() .try_collect()
.await?; .await?;