sapling/eden/mononoke/derived_data/test_utils/lib.rs
Mark Juggurnauth-Thomas 33ec4db653 bounded_traversal: require futures to be boxed
Summary:
Bounded traversal's internal book-keeping moves the futures returned from fold and unfold callbacks around while they are being queued to be scheduled.  If these futures are large, then this can result in a significant portion of bounded traversal's CPU time being spent on `memcpy`ing these futures around.

This can be prevented by always boxing the futures that are returned to bounded traversal.  Make this a requirement by changing the type from `impl Future<...>` to `BoxFuture<...>`.

Reviewed By: mitrandir77

Differential Revision: D26997706

fbshipit-source-id: 23a3583adc23c4e7d3607a78e82fc9d1056691c3
2021-03-12 08:12:57 -08:00

67 lines
2.2 KiB
Rust

/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use anyhow::{anyhow, Error, Result};
use blobrepo::BlobRepo;
use blobrepo_hg::BlobRepoHg;
use blobstore::Loadable;
use bounded_traversal::bounded_traversal_stream;
use context::CoreContext;
use futures::future::FutureExt;
use futures::stream::{self, Stream, TryStreamExt};
use manifest::{Entry, Manifest};
use mercurial_types::HgChangesetId;
use mononoke_types::{BonsaiChangeset, ChangesetId, MPath};
pub async fn bonsai_changeset_from_hg(
ctx: &CoreContext,
repo: &BlobRepo,
s: &str,
) -> Result<(ChangesetId, BonsaiChangeset)> {
let hg_cs_id = s.parse::<HgChangesetId>()?;
let bcs_id = repo
.get_bonsai_from_hg(ctx.clone(), hg_cs_id)
.await?
.ok_or_else(|| anyhow!("Failed to find bonsai changeset id for {}", hg_cs_id))?;
let bcs = bcs_id.load(ctx, repo.blobstore()).await?;
Ok((bcs_id, bcs))
}
pub fn iterate_all_manifest_entries<'a, MfId, LId>(
ctx: &'a CoreContext,
repo: &'a BlobRepo,
entry: Entry<MfId, LId>,
) -> impl Stream<Item = Result<(Option<MPath>, Entry<MfId, LId>)>> + 'a
where
MfId: Loadable + Send + Sync + Clone + 'a,
LId: Send + Clone + 'static,
<MfId as Loadable>::Value: Manifest<TreeId = MfId, LeafId = LId>,
{
bounded_traversal_stream(256, Some((None, entry)), move |(path, entry)| {
async move {
match entry {
Entry::Leaf(_) => Ok((vec![(path, entry.clone())], vec![])),
Entry::Tree(tree) => {
let mf = tree.load(ctx, repo.blobstore()).await?;
let recurse = mf
.list()
.map(|(basename, new_entry)| {
let path = MPath::join_opt_element(path.as_ref(), &basename);
(Some(path), new_entry)
})
.collect();
Ok::<_, Error>((vec![(path, Entry::Tree(tree))], recurse))
}
}
}
.boxed()
})
.map_ok(|entries| stream::iter(entries.into_iter().map(Ok::<_, Error>)))
.try_flatten()
}