Asyncify backfill_derived_data::prefetch_content and callsites

Summary:
This diff - which asyncifies `prefetch_content` - isn't a big win on readabiliy,
but by switching to async (taking references) and then changing upstream functions
to references we're able to save two layers of cloning in the `warmup` functions,
which ought to reduce heap allocations

Reviewed By: farnz

Differential Revision: D20612696

fbshipit-source-id: a52d6246789e964d3b02d0bdc1bfafbded8f25dd
This commit is contained in:
Steven Troxler 2020-03-25 19:54:18 -07:00 committed by Facebook GitHub Bot
parent 3b7acb2d48
commit 26b821e9b1

View File

@ -33,8 +33,8 @@ use fbinit::FacebookInit;
use fsnodes::RootFsnodeId;
use futures::{
compat::{Future01CompatExt, Stream01CompatExt},
future::{try_join, try_join3},
stream::{self, FuturesUnordered, StreamExt},
future::{self, ready, try_join, try_join3},
stream::{self, FuturesUnordered, StreamExt, TryStreamExt},
};
use futures_ext::{spawn_future, FutureExt};
use futures_old::{
@ -43,7 +43,6 @@ use futures_old::{
};
use futures_stats::Timed;
use futures_stats::TimedFutureExt;
use futures_util::{future::ready, stream::TryStreamExt};
use lock_ext::LockExt;
use manifest::find_intersection_of_diffs;
use mononoke_types::{ChangesetId, FileUnodeId, MPath, RepositoryId};
@ -431,13 +430,7 @@ async fn subcommand_backfill<P: AsRef<Path>>(
})
.and_then({
move |chunk| async move {
warmup(
ctx.clone(),
repo.clone(),
derived_data_type.clone(),
chunk.clone(),
)
.await?;
warmup(ctx, repo, derived_data_type, &chunk).await?;
Ok(chunk)
}
})
@ -477,10 +470,10 @@ async fn subcommand_backfill<P: AsRef<Path>>(
}
async fn warmup(
ctx: CoreContext,
repo: BlobRepo,
derived_data_type: String,
chunk: Vec<ChangesetId>,
ctx: &CoreContext,
repo: &BlobRepo,
derived_data_type: &String,
chunk: &Vec<ChangesetId>,
) -> Result<(), Error> {
// Warmup bonsai changesets unconditionally because
// most likely all derived data needs it. And they are cheap to warm up anyway
@ -502,14 +495,14 @@ async fn warmup(
let content_warmup = async {
if PREFETCH_CONTENT_TYPES.contains(&derived_data_type.as_ref()) {
content_warmup(ctx.clone(), repo.clone(), chunk.clone()).await?
content_warmup(ctx, repo, chunk).await?
}
Ok(())
};
let unode_warmup = async {
if PREFETCH_UNODE_TYPES.contains(&derived_data_type.as_ref()) {
unode_warmup(ctx.clone(), repo.clone(), &chunk).await?
unode_warmup(ctx, repo, chunk).await?
}
Ok(())
};
@ -520,24 +513,20 @@ async fn warmup(
}
async fn content_warmup(
ctx: CoreContext,
repo: BlobRepo,
chunk: Vec<ChangesetId>,
ctx: &CoreContext,
repo: &BlobRepo,
chunk: &Vec<ChangesetId>,
) -> Result<(), Error> {
old_stream::iter_ok(chunk)
.map({
cloned!(ctx, repo);
move |csid| prefetch_content(ctx.clone(), repo.clone(), csid)
})
stream::iter(chunk)
.map({ move |csid| prefetch_content(ctx, repo, csid) })
.buffered(CHUNK_SIZE)
.for_each(|_| Ok(()))
.compat()
.try_for_each(|_| async { Ok(()) })
.await
}
async fn unode_warmup(
ctx: CoreContext,
repo: BlobRepo,
ctx: &CoreContext,
repo: &BlobRepo,
chunk: &Vec<ChangesetId>,
) -> Result<(), Error> {
let futs = FuturesUnordered::new();
@ -704,11 +693,12 @@ async fn subcommand_single(
}
// Prefetch content of changed files between parents
fn prefetch_content(
ctx: CoreContext,
repo: BlobRepo,
csid: ChangesetId,
) -> impl OldFuture<Item = (), Error = Error> {
async fn prefetch_content(
ctx: &CoreContext,
repo: &BlobRepo,
csid: &ChangesetId,
) -> Result<(), Error> {
// TODO convert to new future when no longer using spawn_future
fn prefetch_content_unode(
ctx: CoreContext,
blobstore: Arc<dyn Blobstore>,
@ -744,45 +734,48 @@ fn prefetch_content(
.boxify()
}
csid.load(ctx.clone(), repo.blobstore())
let bonsai = csid.load(ctx.clone(), repo.blobstore()).compat().await?;
let root_manifest_fut = RootUnodeManifestId::derive(ctx.clone(), repo.clone(), csid.clone())
.from_err()
.and_then(move |bonsai| {
let root_manifest = RootUnodeManifestId::derive(ctx.clone(), repo.clone(), csid)
.map(|mf| mf.manifest_unode_id().clone())
.compat();
let parents_manifest_futs = bonsai.parents().collect::<Vec<_>>().into_iter().map({
move |csid| {
RootUnodeManifestId::derive(ctx.clone(), repo.clone(), csid)
.from_err()
.map(|mf| mf.manifest_unode_id().clone());
.map(|mf| mf.manifest_unode_id().clone())
.compat()
}
});
let (root_manifest, parents_manifests, renames) = try_join3(
root_manifest_fut,
future::try_join_all(parents_manifest_futs),
find_unode_renames(ctx.clone(), repo.clone(), &bonsai).compat(),
)
.await?;
let parents_manifest = bonsai.parents().collect::<Vec<_>>().into_iter().map({
cloned!(ctx, repo);
move |csid| {
RootUnodeManifestId::derive(ctx.clone(), repo.clone(), csid)
.from_err()
.map(|mf| mf.manifest_unode_id().clone())
}
});
(
root_manifest,
old_future::join_all(parents_manifest),
find_unode_renames(ctx.clone(), repo.clone(), &bonsai),
)
.into_future()
.and_then(move |(root_mf, parents_mf, renames)| {
let blobstore = repo.get_blobstore().boxed();
find_intersection_of_diffs(ctx.clone(), blobstore.clone(), root_mf, parents_mf)
.filter_map(|(path, entry)| Some((path?, entry.into_leaf()?)))
.map(move |(path, file)| {
spawn_future(prefetch_content_unode(
ctx.clone(),
blobstore.clone(),
&renames,
path,
file,
))
})
.buffered(256)
.for_each(|_| Ok(()))
})
})
let blobstore = repo.get_blobstore().boxed();
find_intersection_of_diffs(
ctx.clone(),
blobstore.clone(),
root_manifest,
parents_manifests,
)
.filter_map(|(path, entry)| Some((path?, entry.into_leaf()?)))
.map(move |(path, file)| {
spawn_future(prefetch_content_unode(
ctx.clone(),
blobstore.clone(),
&renames,
path,
file,
))
})
.buffered(256)
.for_each(|_| Ok(()))
.compat()
.await
}
#[cfg(test)]