From 2ecf51e7afebb886f5aa74c96eef2b2459858086 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Thu, 16 Apr 2020 02:12:29 -0700 Subject: [PATCH] mononoke/hook_tailer: asyncify run_with_limit Summary: I'd like to clean up this code a little bit since I'm going to make a few changes and would like to avoid mixing too many old and new futures. Reviewed By: farnz Differential Revision: D21042081 fbshipit-source-id: d6a807ce9c60d09d82c6b8c6866ea23b8ef45f21 --- eden/mononoke/hook_tailer/main.rs | 15 +++--- eden/mononoke/hook_tailer/tailer.rs | 75 +++++++++++++++++------------ 2 files changed, 50 insertions(+), 40 deletions(-) diff --git a/eden/mononoke/hook_tailer/main.rs b/eden/mononoke/hook_tailer/main.rs index 15dd98ce48..1a13e1fcc0 100644 --- a/eden/mononoke/hook_tailer/main.rs +++ b/eden/mononoke/hook_tailer/main.rs @@ -19,10 +19,9 @@ use context::CoreContext; use fbinit::FacebookInit; use futures::{ compat::Future01CompatExt, - future::FutureExt, + future::{Future, FutureExt}, stream::{self, StreamExt, TryStreamExt}, }; -use futures_ext::BoxFuture; use futures_old::Future as OldFuture; use futures_stats::TimedFutureExt; use hooks::HookOutcome; @@ -71,7 +70,7 @@ async fn run_hook_tailer<'a>( let common_config = cmdlib::args::read_common_config(fb, &matches)?; let init_revision = matches.value_of("init_revision").map(String::from); let continuous = matches.is_present("continuous"); - let limit = cmdlib::args::get_u64(&matches, "limit", 1000); + let limit = cmdlib::args::get_usize(&matches, "limit", 1000); let changeset = matches.value_of("changeset"); let mut excludes = matches @@ -174,7 +173,7 @@ async fn run_hook_tailer<'a>( .map(Ok) .try_for_each({ move |()| async move { - process_hook_results(tail.run(), logger).await?; + process_hook_results(tail.run().compat(), logger).await?; sleep(Duration::new(10, 0)) .map_err(|err| format_err!("Tokio timer error {:?}", err)) .compat() @@ -186,7 +185,7 @@ async fn run_hook_tailer<'a>( .boxed() } (_, Some(changeset)) => { - process_hook_results(tail.run_single_changeset(changeset), logger).boxed() + process_hook_results(tail.run_single_changeset(changeset).compat(), logger).boxed() } _ => { f.await?; @@ -196,11 +195,11 @@ async fn run_hook_tailer<'a>( .await } -async fn process_hook_results( - fut: BoxFuture, Error>, +async fn process_hook_results, Error>>>( + fut: F, logger: &Logger, ) -> Result<(), Error> { - let (stats, res) = fut.compat().timed().await; + let (stats, res) = fut.timed().await; let res = res?; let mut hooks_stat = HookExecutionStat::new(); diff --git a/eden/mononoke/hook_tailer/tailer.rs b/eden/mononoke/hook_tailer/tailer.rs index 14d38d584e..19f844a246 100644 --- a/eden/mononoke/hook_tailer/tailer.rs +++ b/eden/mononoke/hook_tailer/tailer.rs @@ -13,9 +13,13 @@ use blobstore::Loadable; use bookmarks::BookmarkName; use cloned::cloned; use context::CoreContext; -use futures::{FutureExt, TryFutureExt}; +use futures::{ + compat::{Future01CompatExt, Stream01CompatExt}, + future::{self, FutureExt, TryFutureExt}, + stream::{StreamExt, TryStreamExt}, +}; use futures_ext::{spawn_future, BoxFuture, FutureExt as OldFutureExt}; -use futures_old::{Future, Stream}; +use futures_old::{Future as OldFuture, Stream as OldStream}; use hooks::{hook_loader::load_hooks, HookManager, HookOutcome}; use hooks_content_stores::blobrepo_text_only_fetcher; use manifold::{ManifoldHttpClient, PayloadRange}; @@ -28,6 +32,7 @@ use slog::{debug, info}; use std::collections::HashSet; use std::sync::Arc; use thiserror::Error; +use tokio::task; pub struct Tailer { ctx: CoreContext, @@ -88,35 +93,44 @@ impl Tailer { .boxify() } - pub fn run_with_limit(&self, limit: u64) -> BoxFuture, Error> { - let ctx = self.ctx.clone(); - let bm = self.bookmark.clone(); - let hm = self.hook_manager.clone(); - let excludes = self.excludes.clone(); + pub async fn run_with_limit(&self, limit: usize) -> Result, Error> { + let bm_rev = self + .repo + .get_bonsai_bookmark(self.ctx.clone(), &self.bookmark) + .compat() + .await? + .ok_or_else(|| ErrorKind::NoSuchBookmark(self.bookmark.clone()))?; - let bm_rev = self.repo.get_bonsai_bookmark(ctx.clone(), &bm).and_then({ - cloned!(bm); - |opt| opt.ok_or(ErrorKind::NoSuchBookmark(bm).into()) - }); + let res = + AncestorsNodeStream::new(self.ctx.clone(), &self.repo.get_changeset_fetcher(), bm_rev) + .compat() + .take(limit) + .try_filter(|cs_id| future::ready(!self.excludes.contains(cs_id))) + .map(|cs_id| async move { + match cs_id { + Ok(cs_id) => { + let (_, outcomes) = task::spawn( + run_hooks_for_changeset( + self.ctx.clone(), + self.repo.clone(), + self.hook_manager.clone(), + self.bookmark.clone(), + cs_id, + ) + .compat(), + ) + .await??; - cloned!(self.ctx, self.repo); - bm_rev - .and_then(move |bm_rev| { - AncestorsNodeStream::new(ctx.clone(), &repo.get_changeset_fetcher(), bm_rev) - .take(limit) - .filter(move |cs| !excludes.contains(cs)) - .map({ - move |cs| { - cloned!(ctx, bm, hm, repo); - run_hooks_for_changeset(ctx, repo, hm, bm, cs) + Ok(outcomes) } - }) - .map(spawn_future) - .buffered(100) - .map(|(_, res)| res) - .concat2() - }) - .boxify() + Err(e) => Err(e), + } + }) + .buffered(100) + .try_concat() + .await?; + + Ok(res) } pub fn run(&self) -> BoxFuture, Error> { @@ -200,14 +214,11 @@ fn run_hooks_for_changeset( hm: Arc, bm: BookmarkName, cs_id: ChangesetId, -) -> impl Future), Error = Error> { +) -> impl OldFuture), Error = Error> { cs_id .load(ctx.clone(), repo.blobstore()) .from_err() .and_then(move |cs| { - let ctx = ctx.clone(); - let hm = hm.clone(); - let bm = bm.clone(); async move { debug!(ctx.logger(), "Running hooks for changeset {:?}", cs); let hook_results = hm