Run Bonsai hooks as well as old-style hooks

Summary: Running on Mercurial hooks isn't scalable long term - move the consumers of hooks to run on both forms for a transition period

Reviewed By: krallin

Differential Revision: D20879136

fbshipit-source-id: 4630cafaebbf6a26aa6ba92bd8d53794a1d1c058
This commit is contained in:
Simon Farnsworth 2020-04-08 08:56:17 -07:00 committed by Facebook GitHub Bot
parent c59ae3274b
commit da7cbd7f36
5 changed files with 181 additions and 67 deletions

View File

@ -9,10 +9,11 @@
use anyhow::{format_err, Error, Result};
use blobrepo::BlobRepo;
use blobstore::Loadable;
use bookmarks::BookmarkName;
use cloned::cloned;
use context::CoreContext;
use futures::{FutureExt, TryFutureExt};
use futures::{compat::Future01CompatExt, FutureExt, TryFutureExt};
use futures_ext::{spawn_future, BoxFuture, FutureExt as OldFutureExt};
use futures_old::{Future, Stream};
use hooks::{hook_loader::load_hooks, HookManager, HookOutcome};
@ -87,38 +88,35 @@ impl Tailer {
ctx: CoreContext,
repo: BlobRepo,
hm: Arc<HookManager>,
last_rev: HgChangesetId,
end_rev: HgChangesetId,
last_rev: ChangesetId,
end_rev: ChangesetId,
bm: BookmarkName,
excludes: HashSet<ChangesetId>,
) -> BoxFuture<Vec<HookOutcome>, Error> {
debug!(ctx.logger(), "Running in range {} to {}", last_rev, end_rev);
nodehash_to_bonsai(ctx.clone(), &repo, end_rev)
.and_then(move |end_rev| {
AncestorsNodeStream::new(ctx.clone(), &repo.get_changeset_fetcher(), end_rev)
.take(1000) // Limit number so we don't process too many
.filter(move |cs| !excludes.contains(cs))
.map({
move |cs| {
cloned!(ctx, bm, hm, repo);
run_hooks_for_changeset(ctx, repo, hm, bm, cs)
}
})
.map(spawn_future)
.buffered(100)
.take_while(move |(hg_cs, _)| {
Ok(*hg_cs != last_rev)
})
.map(|(_, res)| res)
.concat2()
AncestorsNodeStream::new(ctx.clone(), &repo.get_changeset_fetcher(), end_rev)
.take(1000) // Limit number so we don't process too many
.filter(move |cs| !excludes.contains(cs))
.map({
move |cs| {
cloned!(ctx, bm, hm, repo);
run_hooks_for_changeset(ctx, repo, hm, bm, cs)
}
})
.map(spawn_future)
.buffered(100)
.take_while(move |(cs, _)| {
Ok(*cs != last_rev)
})
.map(|(_, res)| res)
.concat2()
.boxify()
}
pub fn run_in_range(
&self,
last_rev: HgChangesetId,
end_rev: HgChangesetId,
last_rev: ChangesetId,
end_rev: ChangesetId,
) -> BoxFuture<Vec<HookOutcome>, Error> {
cloned!(
self.ctx,
@ -204,7 +202,7 @@ impl Tailer {
);
self.repo
.get_bookmark(self.ctx.clone(), &self.bookmark.clone())
.get_bonsai_bookmark(self.ctx.clone(), &self.bookmark.clone())
.and_then({
cloned!(self.bookmark);
|opt| opt.ok_or(ErrorKind::NoSuchBookmark(bookmark).into())
@ -222,7 +220,7 @@ impl Tailer {
None => Err(ErrorKind::NoLastRevision.into()),
})
.and_then(|(current_bm_cs, last_rev_bytes)| {
let node_hash = HgChangesetId::from_bytes(&*last_rev_bytes.payload.payload)?;
let node_hash = ChangesetId::from_bytes(&*last_rev_bytes.payload.payload)?;
Ok((current_bm_cs, node_hash))
})
.and_then({
@ -263,7 +261,7 @@ impl Tailer {
ctx.logger(),
"Setting last processed revision to {:?}", end_rev
);
let bytes = end_rev.as_bytes().into();
let bytes = end_rev.as_ref().into();
manifold_client.write(last_rev_key, bytes).map(|()| res)
}
})
@ -285,19 +283,29 @@ fn run_hooks_for_changeset(
repo: BlobRepo,
hm: Arc<HookManager>,
bm: BookmarkName,
cs: ChangesetId,
) -> impl Future<Item = (HgChangesetId, Vec<HookOutcome>), Error = Error> {
repo.get_hg_from_bonsai_changeset(ctx.clone(), cs)
.and_then(move |hg_cs| {
cs_id: ChangesetId,
) -> impl Future<Item = (ChangesetId, Vec<HookOutcome>), Error = Error> {
cs_id
.load(ctx.clone(), repo.blobstore())
.from_err()
.and_then(move |cs| {
let ctx = ctx.clone();
let hm = hm.clone();
let bm = bm.clone();
async move {
debug!(ctx.logger(), "Running hooks for changeset {:?}", hg_cs);
let hook_results = hm
debug!(ctx.logger(), "Running hooks for changeset {:?}", cs);
let mut hook_results = hm
.run_hooks_for_bookmark_bonsai(&ctx, vec![cs].iter(), &bm, None)
.await?;
let hg_cs = repo
.get_hg_from_bonsai_changeset(ctx.clone(), cs_id)
.compat()
.await?;
let old_hook_results = hm
.run_hooks_for_bookmark(&ctx, vec![hg_cs], &bm, None)
.await?;
Ok((hg_cs, hook_results))
hook_results.extend(old_hook_results);
Ok((cs_id, hook_results))
}
.boxed()
.compat()

View File

@ -1431,9 +1431,9 @@ impl HgCommands for RepoClient {
.boxed()
.compat()
.and_then({
cloned!(ctx);
cloned!(ctx, blobrepo);
move |action| {
run_hooks(ctx, hook_manager, &action)
run_hooks(ctx, blobrepo, hook_manager, &action)
.map(move |_| action)
}
}).and_then({
@ -1508,7 +1508,7 @@ impl HgCommands for RepoClient {
use unbundle::BundleResolverError::*;
match err {
HookError(hooks) => {
let failed_hooks: HashSet<String> = hooks.into_iter().filter_map(|res| if res.is_rejection() { Some(res.get_hook_name().to_string())} else {None}).collect();
let failed_hooks: HashSet<String> = hooks.iter().map(|fail| fail.get_hook_name().to_string()).collect();
for failed_hook in failed_hooks {
STATS::push_hook_failure.add_value(

View File

@ -7,16 +7,27 @@
#![deny(warnings)]
use crate::{BundleResolverError, PostResolveAction, PostResolvePushRebase};
use crate::{resolver::HookFailure, BundleResolverError, PostResolveAction, PostResolvePushRebase};
use blobrepo::BlobRepo;
use bookmarks::BookmarkName;
use bytes::Bytes;
use context::CoreContext;
use futures::{FutureExt, TryFutureExt};
use futures::{
compat::Future01CompatExt,
future::try_join,
stream::{self, TryStreamExt},
FutureExt, TryFutureExt,
};
use futures_ext::{BoxFuture, FutureExt as _};
use futures_old::future::ok;
use hooks::{HookManager, HookOutcome};
use std::sync::Arc;
use hooks::{HookExecution, HookManager, HookOutcome};
use mercurial_types::HgChangesetId;
use mononoke_types::BonsaiChangeset;
use std::{collections::HashMap, sync::Arc};
pub fn run_hooks(
ctx: CoreContext,
repo: BlobRepo,
hook_manager: Arc<HookManager>,
action: &PostResolveAction,
) -> BoxFuture<(), BundleResolverError> {
@ -24,35 +35,124 @@ pub fn run_hooks(
// TODO: Need to run hooks on Push, not just PushRebase
PostResolveAction::Push(_) => ok(()).boxify(),
PostResolveAction::InfinitePush(_) => ok(()).boxify(),
PostResolveAction::PushRebase(action) => run_pushrebase_hooks(ctx, action, hook_manager),
PostResolveAction::PushRebase(action) => {
run_pushrebase_hooks(ctx, repo, action, hook_manager)
}
PostResolveAction::BookmarkOnlyPushRebase(_) => ok(()).boxify(),
}
}
fn run_pushrebase_hooks(
ctx: CoreContext,
repo: BlobRepo,
action: &PostResolvePushRebase,
hook_manager: Arc<HookManager>,
) -> BoxFuture<(), BundleResolverError> {
let changesets = action.uploaded_hg_changeset_ids.clone();
// The changesets that will be pushed
let changesets = action.uploaded_bonsais.clone();
let hg = action.uploaded_hg_changeset_ids.clone();
let maybe_pushvars = action.maybe_pushvars.clone();
// FIXME: stop cloning when this fn is async
let bookmark = action.bookmark_spec.get_bookmark_name().clone();
async move {
let hook_failures: Vec<_> = hook_manager
.run_hooks_for_bookmark(&ctx, changesets, &bookmark, maybe_pushvars.as_ref())
.await?
.into_iter()
.filter(HookOutcome::is_rejection)
.collect();
if hook_failures.is_empty() {
Ok(())
} else {
Err(BundleResolverError::HookError(hook_failures))
}
let ((), ()) = try_join(
run_hooks_on_changesets(
&ctx,
&repo,
&*hook_manager,
changesets.iter(),
bookmark.clone(),
maybe_pushvars.clone(),
),
run_hooks_on_changesets_hg(&ctx, &*hook_manager, hg, bookmark, maybe_pushvars),
)
.await?;
Ok(())
}
.boxed()
.compat()
.boxify()
}
async fn run_hooks_on_changesets(
ctx: &CoreContext,
repo: &BlobRepo,
hook_manager: &HookManager,
changesets: impl Iterator<Item = &BonsaiChangeset> + Clone + itertools::Itertools,
bookmark: BookmarkName,
maybe_pushvars: Option<HashMap<String, Bytes>>,
) -> Result<(), BundleResolverError> {
let hook_outcomes = hook_manager
.run_hooks_for_bookmark_bonsai(&ctx, changesets, &bookmark, maybe_pushvars.as_ref())
.await?;
if hook_outcomes.iter().all(HookOutcome::is_accept) {
Ok(())
} else {
let hook_failures = hook_outcomes
.into_iter()
.filter_map(|outcome| {
let hook_name = outcome.get_hook_name().to_string();
let cs_id = outcome.get_changeset_id();
let info = match outcome.into() {
HookExecution::Accepted => None,
HookExecution::Rejected(info) => Some(info),
}?;
Some(async move {
let cs_id = repo
.get_hg_from_bonsai_changeset(ctx.clone(), cs_id)
.compat()
.await?;
Result::<_, anyhow::Error>::Ok(HookFailure {
hook_name,
cs_id,
info,
})
})
})
.collect::<futures::stream::FuturesUnordered<_>>()
.try_collect()
.await?;
Err(BundleResolverError::HookError(hook_failures))
}
}
async fn run_hooks_on_changesets_hg(
ctx: &CoreContext,
hook_manager: &HookManager,
changesets: impl IntoIterator<Item = HgChangesetId>,
bookmark: BookmarkName,
maybe_pushvars: Option<HashMap<String, Bytes>>,
) -> Result<(), BundleResolverError> {
let hook_outcomes = hook_manager
.run_hooks_for_bookmark(&ctx, changesets, &bookmark, maybe_pushvars.as_ref())
.await?;
if hook_outcomes.iter().all(HookOutcome::is_accept) {
Ok(())
} else {
let hook_failures = stream::iter(
hook_outcomes
.into_iter()
.map(|o| -> Result<_, BundleResolverError> { Ok(o) }),
)
.try_filter_map(|outcome| async move {
let hook_name = outcome.get_hook_name().to_string();
let cs_id = outcome.get_cs_id();
match outcome.into() {
HookExecution::Accepted => Ok(None),
HookExecution::Rejected(info) => Ok(Some(HookFailure {
hook_name,
cs_id,
info,
})),
}
})
.try_collect()
.await?;
Err(BundleResolverError::HookError(hook_failures))
}
}

View File

@ -34,7 +34,7 @@ use futures_old::{Future as OldFuture, Stream as OldStream};
use futures_util::{
compat::Future01CompatExt, try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt,
};
use hooks::{HookExecution, HookOutcome};
use hooks::HookRejectionInfo;
use lazy_static::lazy_static;
use limits::types::RateLimit;
use mercurial_bundles::{Bundle2Item, PartHeader, PartHeaderInner, PartHeaderType, PartId};
@ -101,8 +101,20 @@ impl From<bool> for NonFastForwardPolicy {
}
}
pub struct HookFailure {
pub(crate) hook_name: String,
pub(crate) cs_id: HgChangesetId,
pub(crate) info: HookRejectionInfo,
}
impl HookFailure {
pub fn get_hook_name(&self) -> &str {
&self.hook_name
}
}
pub enum BundleResolverError {
HookError(Vec<HookOutcome>),
HookError(Vec<HookFailure>),
PushrebaseConflicts(Vec<pushrebase::PushrebaseConflict>),
Error(Error),
RateLimitExceeded {
@ -126,17 +138,11 @@ impl From<BundleResolverError> for Error {
HookError(hook_outcomes) => {
let err_msgs: Vec<_> = hook_outcomes
.into_iter()
.filter_map(|outcome| {
let exec = outcome.get_execution();
match exec {
HookExecution::Accepted => None,
HookExecution::Rejected(info) => Some(format!(
"{} for {}: {}",
outcome.get_hook_name(),
outcome.get_cs_id(),
info.long_description
)),
}
.map(|failure| {
format!(
"{} for {}: {}",
failure.hook_name, failure.cs_id, failure.info.long_description
)
})
.collect();
format_err!("hooks failed:\n{}", err_msgs.join("\n"))

View File

@ -290,7 +290,7 @@ async fn maybe_unbundle(
let hooks_outcome = match hook_manager {
Some(hook_manager) => {
let (hook_stats, hooks_outcome) =
run_hooks(ctx.clone(), hook_manager.clone(), &resolution)
run_hooks(ctx.clone(), repo.clone(), hook_manager.clone(), &resolution)
.compat()
.timed()
.await;