mononoke/hook_tailer: asyncify run_with_limit

Summary:
I'd like to clean up this code a little bit since I'm going to make a few
changes and would like to avoid mixing too many old and new futures.

Reviewed By: farnz

Differential Revision: D21042081

fbshipit-source-id: d6a807ce9c60d09d82c6b8c6866ea23b8ef45f21
This commit is contained in:
Thomas Orozco 2020-04-16 02:12:29 -07:00 committed by Facebook GitHub Bot
parent d37d58b51d
commit 2ecf51e7af
2 changed files with 50 additions and 40 deletions

View File

@ -19,10 +19,9 @@ use context::CoreContext;
use fbinit::FacebookInit;
use futures::{
compat::Future01CompatExt,
future::FutureExt,
future::{Future, FutureExt},
stream::{self, StreamExt, TryStreamExt},
};
use futures_ext::BoxFuture;
use futures_old::Future as OldFuture;
use futures_stats::TimedFutureExt;
use hooks::HookOutcome;
@ -71,7 +70,7 @@ async fn run_hook_tailer<'a>(
let common_config = cmdlib::args::read_common_config(fb, &matches)?;
let init_revision = matches.value_of("init_revision").map(String::from);
let continuous = matches.is_present("continuous");
let limit = cmdlib::args::get_u64(&matches, "limit", 1000);
let limit = cmdlib::args::get_usize(&matches, "limit", 1000);
let changeset = matches.value_of("changeset");
let mut excludes = matches
@ -174,7 +173,7 @@ async fn run_hook_tailer<'a>(
.map(Ok)
.try_for_each({
move |()| async move {
process_hook_results(tail.run(), logger).await?;
process_hook_results(tail.run().compat(), logger).await?;
sleep(Duration::new(10, 0))
.map_err(|err| format_err!("Tokio timer error {:?}", err))
.compat()
@ -186,7 +185,7 @@ async fn run_hook_tailer<'a>(
.boxed()
}
(_, Some(changeset)) => {
process_hook_results(tail.run_single_changeset(changeset), logger).boxed()
process_hook_results(tail.run_single_changeset(changeset).compat(), logger).boxed()
}
_ => {
f.await?;
@ -196,11 +195,11 @@ async fn run_hook_tailer<'a>(
.await
}
async fn process_hook_results(
fut: BoxFuture<Vec<HookOutcome>, Error>,
async fn process_hook_results<F: Future<Output = Result<Vec<HookOutcome>, Error>>>(
fut: F,
logger: &Logger,
) -> Result<(), Error> {
let (stats, res) = fut.compat().timed().await;
let (stats, res) = fut.timed().await;
let res = res?;
let mut hooks_stat = HookExecutionStat::new();

View File

@ -13,9 +13,13 @@ use blobstore::Loadable;
use bookmarks::BookmarkName;
use cloned::cloned;
use context::CoreContext;
use futures::{FutureExt, TryFutureExt};
use futures::{
compat::{Future01CompatExt, Stream01CompatExt},
future::{self, FutureExt, TryFutureExt},
stream::{StreamExt, TryStreamExt},
};
use futures_ext::{spawn_future, BoxFuture, FutureExt as OldFutureExt};
use futures_old::{Future, Stream};
use futures_old::{Future as OldFuture, Stream as OldStream};
use hooks::{hook_loader::load_hooks, HookManager, HookOutcome};
use hooks_content_stores::blobrepo_text_only_fetcher;
use manifold::{ManifoldHttpClient, PayloadRange};
@ -28,6 +32,7 @@ use slog::{debug, info};
use std::collections::HashSet;
use std::sync::Arc;
use thiserror::Error;
use tokio::task;
pub struct Tailer {
ctx: CoreContext,
@ -88,35 +93,44 @@ impl Tailer {
.boxify()
}
pub fn run_with_limit(&self, limit: u64) -> BoxFuture<Vec<HookOutcome>, Error> {
let ctx = self.ctx.clone();
let bm = self.bookmark.clone();
let hm = self.hook_manager.clone();
let excludes = self.excludes.clone();
pub async fn run_with_limit(&self, limit: usize) -> Result<Vec<HookOutcome>, Error> {
let bm_rev = self
.repo
.get_bonsai_bookmark(self.ctx.clone(), &self.bookmark)
.compat()
.await?
.ok_or_else(|| ErrorKind::NoSuchBookmark(self.bookmark.clone()))?;
let bm_rev = self.repo.get_bonsai_bookmark(ctx.clone(), &bm).and_then({
cloned!(bm);
|opt| opt.ok_or(ErrorKind::NoSuchBookmark(bm).into())
});
let res =
AncestorsNodeStream::new(self.ctx.clone(), &self.repo.get_changeset_fetcher(), bm_rev)
.compat()
.take(limit)
.try_filter(|cs_id| future::ready(!self.excludes.contains(cs_id)))
.map(|cs_id| async move {
match cs_id {
Ok(cs_id) => {
let (_, outcomes) = task::spawn(
run_hooks_for_changeset(
self.ctx.clone(),
self.repo.clone(),
self.hook_manager.clone(),
self.bookmark.clone(),
cs_id,
)
.compat(),
)
.await??;
cloned!(self.ctx, self.repo);
bm_rev
.and_then(move |bm_rev| {
AncestorsNodeStream::new(ctx.clone(), &repo.get_changeset_fetcher(), bm_rev)
.take(limit)
.filter(move |cs| !excludes.contains(cs))
.map({
move |cs| {
cloned!(ctx, bm, hm, repo);
run_hooks_for_changeset(ctx, repo, hm, bm, cs)
Ok(outcomes)
}
})
.map(spawn_future)
.buffered(100)
.map(|(_, res)| res)
.concat2()
})
.boxify()
Err(e) => Err(e),
}
})
.buffered(100)
.try_concat()
.await?;
Ok(res)
}
pub fn run(&self) -> BoxFuture<Vec<HookOutcome>, Error> {
@ -200,14 +214,11 @@ fn run_hooks_for_changeset(
hm: Arc<HookManager>,
bm: BookmarkName,
cs_id: ChangesetId,
) -> impl Future<Item = (ChangesetId, Vec<HookOutcome>), Error = Error> {
) -> impl OldFuture<Item = (ChangesetId, Vec<HookOutcome>), Error = Error> {
cs_id
.load(ctx.clone(), repo.blobstore())
.from_err()
.and_then(move |cs| {
let ctx = ctx.clone();
let hm = hm.clone();
let bm = bm.clone();
async move {
debug!(ctx.logger(), "Running hooks for changeset {:?}", cs);
let hook_results = hm