Extract async fn tail_one_iteration

Summary:
This asyncifies the internals of `subcommand_tail`, which
loops over a stream, by taking the operation performed in
the loop and making it an async function.

The resulting code saves a few heap allocations by reducing
clones, and is also *much* less indented, which helps with
readability.

Reviewed By: krallin

Differential Revision: D20664511

fbshipit-source-id: 8e81a1507e37ad2cc59e616c739e19574252e72c
This commit is contained in:
Steven Troxler 2020-04-08 11:17:09 -07:00 committed by Facebook GitHub Bot
parent c7d12b648f
commit 10bf48e871

View File

@ -32,10 +32,9 @@ use fastlog::{fetch_parent_root_unodes, RootFastlog};
use fbinit::FacebookInit;
use fsnodes::RootFsnodeId;
use futures::{
compat::{Future01CompatExt, Stream01CompatExt},
future::{self, ready, try_join, try_join3},
compat::Future01CompatExt,
future::{self, ready, try_join, try_join3, FutureExt, TryFutureExt},
stream::{self, FuturesUnordered, StreamExt, TryStreamExt},
FutureExt, TryFutureExt,
};
use futures_ext::{spawn_future, FutureExt as OldFutureExt};
use futures_old::{
@ -585,88 +584,85 @@ async fn subcommand_tail(
})
.collect::<Result<_, Error>>()?;
let derive_utils = Arc::new(derive_utils);
old_stream::repeat::<_, Error>(())
.and_then(move |_| {
bookmarks
.list_publishing_by_prefix(
ctx.clone(),
&BookmarkPrefix::empty(),
repo.get_repoid(),
Freshness::MostRecent,
)
.map(|(_name, csid)| csid)
.collect()
.and_then({
cloned!(ctx, derive_utils);
move |heads| {
let pending: Vec<_> = derive_utils
.iter()
.map({
cloned!(ctx);
move |(derive, maybe_unredacted_repo, stats)| {
// create new context so each derivation would have its own trace
let ctx =
CoreContext::new_with_logger(ctx.fb, ctx.logger().clone());
derive
.pending(
ctx.clone(),
maybe_unredacted_repo.clone(),
heads.clone(),
)
.map({
cloned!(ctx, maybe_unredacted_repo, derive, stats);
move |pending| {
stats.pending_heads.add_value(pending.len() as i64);
pending
.into_iter()
.map(|csid| {
derive.derive(
ctx.clone(),
maybe_unredacted_repo.clone(),
csid,
)
})
.collect::<Vec<_>>()
}
})
}
})
.collect();
loop {
tail_one_iteration(ctx, repo, bookmarks, &derive_utils).await?;
}
}
old_future::join_all(pending).and_then(move |pending| {
let pending: Vec<_> = pending.into_iter().flatten().collect();
if pending.is_empty() {
tokio_timer::sleep(Duration::from_millis(250))
.from_err()
.left_future()
} else {
let count = pending.len();
info!(ctx.logger(), "found {} outdated heads", count);
old_stream::iter_ok(pending)
.buffered(1024)
.for_each(|_| Ok(()))
.timed({
cloned!(ctx);
move |stats, _| {
info!(
ctx.logger(),
"derived data for {} heads in {:?}",
count,
stats.completion_time
);
Ok(())
}
})
.right_future()
}
})
}
})
})
async fn tail_one_iteration(
ctx: &CoreContext,
repo: &BlobRepo,
bookmarks: &SqlBookmarks,
derive_utils: &[(Arc<dyn DerivedUtils>, BlobRepo, Arc<DerivedDataStats>)],
) -> Result<(), Error> {
let heads = bookmarks
.list_publishing_by_prefix(
ctx.clone(),
&BookmarkPrefix::empty(),
repo.get_repoid(),
Freshness::MostRecent,
)
.map(|(_name, csid)| csid)
.collect()
.compat()
.try_for_each(|_| async { Ok(()) })
.await
.await?;
let pending_nested_futs: Vec<_> = derive_utils
.iter()
.map({
|(derive, maybe_unredacted_repo, stats)| {
let heads = heads.clone();
async move {
// create new context so each derivation would have its own trace
let ctx = CoreContext::new_with_logger(ctx.fb, ctx.logger().clone());
let pending = derive
.pending(ctx.clone(), maybe_unredacted_repo.clone(), heads)
.compat()
.await?;
stats.pending_heads.add_value(pending.len() as i64);
let derived = pending
.into_iter()
.map(|csid| {
derive
.derive(ctx.clone(), maybe_unredacted_repo.clone(), csid)
.compat()
})
.collect::<Vec<_>>();
let res: Result<_, Error> = Ok(derived);
res
}
}
})
.collect();
let pending_futs: Vec<_> = future::try_join_all(pending_nested_futs)
.await?
.into_iter()
.flatten()
.collect();
if pending_futs.is_empty() {
tokio::time::delay_for(Duration::from_millis(250)).await;
Ok(())
} else {
let count = pending_futs.len();
info!(ctx.logger(), "found {} outdated heads", count);
let (stats, res) = stream::iter(pending_futs)
.buffered(1024)
.try_for_each(|_: String| async { Ok(()) })
.timed()
.await;
res?;
info!(
ctx.logger(),
"derived data for {} heads in {:?}", count, stats.completion_time
);
Ok(())
}
}
async fn subcommand_single(
@ -786,6 +782,7 @@ mod tests {
use super::*;
use blobstore::BlobstoreBytes;
use fixtures::linear;
use futures::future::FutureExt;
use futures_ext::BoxFuture;
use mercurial_types::HgChangesetId;
use std::str::FromStr;
@ -900,9 +897,9 @@ mod tests {
value: BlobstoreBytes,
) -> BoxFuture<(), Error> {
if key.find(&self.bad_key_substring).is_some() {
tokio_timer::sleep(Duration::new(1, 0))
.from_err()
.and_then(|_| old_future::err(format_err!("failed")))
tokio::time::delay_for(Duration::from_millis(250))
.map(|()| Err(format_err!("failed")))
.compat()
.boxify()
} else {
self.inner.put(ctx, key, value).boxify()