mononoke/unbundle_replay: run hooks

Summary:
As the name indicates, this updates unbundle_replay to run hooks. Hook failures
don't block the replay, but they're logged to Scuba.

Differential Revision: D20693851

fbshipit-source-id: 4357bb0d6869a658026dbc5421a694bc4b39816f
This commit is contained in:
Thomas Orozco 2020-03-30 06:12:14 -07:00 committed by Facebook GitHub Bot
parent fd546edbad
commit 8315336b2c
2 changed files with 81 additions and 7 deletions

View File

@ -81,7 +81,7 @@ Insert the entry. Note that in tests, the commit timestamp will always be zero.
Replay the push. It will succeed now
$ quiet unbundle_replay --scuba-log-file "$TESTTMP/scuba.json" hg-recording "$BUNDLE_HELPER" 1
$ quiet unbundle_replay --run-hooks --scuba-log-file "$TESTTMP/scuba.json" hg-recording "$BUNDLE_HELPER" 1
Check history again. We're back to where we were:
@ -94,6 +94,7 @@ Check history again. We're back to where we were:
{
"int": {
"age_s": *, (glob)
"hooks_execution_time_us": *, (glob)
"pushrebase_completion_time_us": *, (glob)
"pushrebase_recorded_time_us": 123000,
"time": *, (glob)

View File

@ -11,6 +11,7 @@ mod hg_recording;
mod hooks;
mod replay_spec;
use ::hooks::{hook_loader::load_hooks, HookManager};
use anyhow::{format_err, Error};
use blobrepo::BlobRepo;
use blobrepo_factory::BlobrepoBuilder;
@ -28,10 +29,12 @@ use futures::{
};
use futures_old::stream::Stream as OldStream;
use futures_stats::{FutureStats, TimedFutureExt};
use hooks_content_stores::{blobrepo_text_only_store, BlobRepoChangesetStore};
use mercurial_bundles::bundle2::{Bundle2Stream, StreamEvent};
use metaconfig_types::{RepoConfig, RepoReadOnly};
use mononoke_types::{BonsaiChangeset, ChangesetId, Timestamp};
use pushrebase::OntoBookmarkParams;
use scuba_ext::ScubaSampleBuilder;
use slog::{info, warn, Logger};
use std::collections::HashMap;
use std::collections::HashSet;
@ -42,7 +45,8 @@ use std::time::Duration;
use time_ext::DurationExt;
use tokio::{task, time};
use unbundle::{
self, get_pushrebase_hooks, PostResolveAction, PostResolvePushRebase, PushrebaseBookmarkSpec,
self, get_pushrebase_hooks, run_hooks, PostResolveAction, PostResolvePushRebase,
PushrebaseBookmarkSpec,
};
use crate::hg_recording::HgRecordingClient;
@ -50,6 +54,7 @@ use crate::hooks::{Target, UnbundleReplayHook};
use crate::replay_spec::{OntoRev, PushrebaseSpec, ReplaySpec};
const ARG_UNBUNDLE_CONCURRENCY: &str = "unbundle-concurrency";
const ARG_RUN_HOOKS: &str = "run-hooks";
const SUBCOMMAND_HG_RECORDING: &str = "hg-recording";
const ARG_HG_RECORDING_ID: &str = "hg-recording-id";
@ -219,6 +224,7 @@ struct UnbundleComplete {
timestamps: HashMap<ChangesetId, Timestamp>,
changesets: HashSet<BonsaiChangeset>,
unbundle_stats: FutureStats,
hooks_outcome: Option<(FutureStats, Option<Error>)>,
recorded_duration: Option<Duration>,
}
@ -234,6 +240,7 @@ async fn maybe_unbundle(
ctx: &CoreContext,
repo: &BlobRepo,
repo_config: &RepoConfig,
hook_manager: &Option<Arc<HookManager>>,
bundle: Bytes,
pushrebase_spec: PushrebaseSpec,
) -> Result<UnbundleOutcome, Error> {
@ -270,6 +277,19 @@ async fn maybe_unbundle(
Err(e) => return Err(e.into()),
};
let hooks_outcome = match hook_manager {
Some(hook_manager) => {
let (hook_stats, hooks_outcome) =
run_hooks(ctx.clone(), hook_manager.clone(), &resolution)
.compat()
.timed()
.await;
Some((hook_stats, hooks_outcome.err().map(Error::from)))
}
None => None,
};
let PushrebaseSpec {
onto,
onto_rev,
@ -339,6 +359,7 @@ async fn maybe_unbundle(
timestamps,
changesets,
unbundle_stats,
hooks_outcome,
recorded_duration,
}))
}
@ -349,6 +370,9 @@ async fn do_main(
logger: &Logger,
service: &ReadyFlagService,
) -> Result<(), Error> {
// TODO: Would want Scuba and such here.
let ctx = CoreContext::new_with_logger(fb, logger.clone());
let unbundle_concurrency = matches
.value_of(ARG_UNBUNDLE_CONCURRENCY)
.map(|s| s.parse())
@ -382,18 +406,33 @@ async fn do_main(
.build()
.await?;
let hook_manager = if matches.is_present(ARG_RUN_HOOKS) {
info!(logger, "Creating HookManager");
let mut hook_manager = HookManager::new(
ctx.fb,
Box::new(BlobRepoChangesetStore::new(repo.clone())),
blobrepo_text_only_store(repo.clone(), repo_config.hook_max_file_size),
repo_config.hook_manager_params.clone().unwrap_or_default(),
ScubaSampleBuilder::with_discard(),
);
info!(logger, "Loading hooks");
load_hooks(fb, &mut hook_manager, repo_config.clone(), &HashSet::new())?;
Some(Arc::new(hook_manager))
} else {
None
};
service.set_ready();
let mut scuba = args::get_scuba_sample_builder(fb, &matches)?;
scuba.add_common_server_data();
// TODO: Would want Scuba and such here.
let ctx = CoreContext::new_with_logger(fb, logger.clone());
let ctx = &ctx;
let scuba = &scuba;
let repo = &repo;
let repo_config = &repo_config;
let hook_manager = &hook_manager;
get_replay_stream(&ctx, &repo, matches)
.await?
@ -407,7 +446,15 @@ async fn do_main(
let bundle = bundle.load(ctx, repo).await?;
maybe_unbundle(ctx, repo, repo_config, bundle, pushrebase_spec).await
maybe_unbundle(
ctx,
repo,
repo_config,
hook_manager,
bundle,
pushrebase_spec,
)
.await
}
Err(e) => Err(e),
}
@ -428,7 +475,16 @@ async fn do_main(
pushrebase_spec.target
);
match maybe_unbundle(ctx, repo, repo_config, bundle, pushrebase_spec).await? {
match maybe_unbundle(
ctx,
repo,
repo_config,
hook_manager,
bundle,
pushrebase_spec,
)
.await?
{
UnbundleOutcome::Complete(c) => c,
UnbundleOutcome::Deferred(_, _, err) => {
return Err(err);
@ -444,6 +500,7 @@ async fn do_main(
timestamps,
changesets,
unbundle_stats,
hooks_outcome,
recorded_duration,
} = unbundle_complete;
@ -518,6 +575,15 @@ async fn do_main(
recorded_duration.as_micros_unchecked(),
);
}
if let Some((hooks_stats, maybe_hooks_err)) = hooks_outcome {
scuba.add(
"hooks_execution_time_us",
hooks_stats.completion_time.as_micros_unchecked(),
);
if let Some(hooks_err) = maybe_hooks_err {
scuba.add("hooks_error", hooks_err.to_string());
}
}
scuba.log();
info!(
@ -551,6 +617,13 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name(ARG_RUN_HOOKS)
.help("Whether to run hooks")
.long(ARG_RUN_HOOKS)
.takes_value(false)
.required(false),
)
.subcommand(
SubCommand::with_name(SUBCOMMAND_HG_RECORDING)
.about("Replay a single bundle, from hg")