mononoke: walker: switch to new bounded_traversal crate

Summary: switch to new bounded_traversal crate prior to futures 0.3 uprade

Reviewed By: farnz

Differential Revision: D19804893

fbshipit-source-id: 3af88792397bdc84227b10f6d490fced5e4764c9
This commit is contained in:
Alex Hornby 2020-02-11 09:16:35 -08:00 committed by Facebook Github Bot
parent f52ee41ae9
commit dff471bcbb

View File

@ -12,15 +12,13 @@ use anyhow::{format_err, Error};
use blobrepo::BlobRepo;
use blobstore::Loadable;
use bookmarks::{BookmarkName, BookmarkPrefix, Freshness};
use bounded_traversal::bounded_traversal_stream;
use cloned::cloned;
use context::CoreContext;
use failure_ext::chain::ChainExt;
use filestore::{self, Alias};
use futures::{Future as Future01, Stream as Stream01};
use futures_ext::{
bounded_traversal::bounded_traversal_stream, BoxStream as BoxStream01,
FutureExt as Future01Ext, StreamExt as Stream01Ext,
};
use futures_ext::{FutureExt as Future01Ext, StreamExt as Stream01Ext};
use futures_preview::{
compat::{Future01CompatExt, Stream01CompatExt},
future::{self, Future, FutureExt},
@ -480,7 +478,7 @@ pub fn expand_checked_nodes(children: &mut Vec<OutgoingEdge>) -> () {
}
/// Walk the graph from one or more starting points, providing stream of data for later reduction
fn walk_exact_compat<V, VOut>(
pub fn walk_exact<V, VOut>(
ctx: CoreContext,
repo: BlobRepo,
enable_derive: bool,
@ -490,7 +488,7 @@ fn walk_exact_compat<V, VOut>(
error_as_data_node_types: HashSet<NodeType>,
error_as_data_edge_types: HashSet<EdgeType>,
scuba: ScubaSampleBuilder,
) -> BoxStream01<VOut, Error>
) -> BoxStream<'static, Result<VOut, Error>>
where
V: 'static + Clone + WalkVisitor<VOut> + Send,
VOut: 'static + Send,
@ -512,10 +510,11 @@ where
Freshness::MostRecent,
)
.map(|(book, csid)| (book.name, csid))
.collect_to::<HashMap<BookmarkName, ChangesetId>>();
.collect_to::<HashMap<BookmarkName, ChangesetId>>()
.compat();
public_heads
.map(move |public_heads| {
.map_ok(move |public_heads| {
let public_heads = Arc::new(public_heads);
bounded_traversal_stream(scheduled_max, walk_roots, {
move |walk_item| {
@ -552,13 +551,11 @@ where
handle.await?
}
.boxed()
.compat()
.boxify()
}
})
})
.flatten_stream()
.boxify()
.try_flatten_stream()
.boxed()
}
async fn walk_one<V, VOut>(
@ -695,33 +692,3 @@ where
}
}
}
pub fn walk_exact<V, VOut>(
ctx: CoreContext,
repo: BlobRepo,
enable_derive: bool,
walk_roots: Vec<OutgoingEdge>,
visitor: V,
scheduled_max: usize,
error_as_data_node_types: HashSet<NodeType>,
error_as_data_edge_types: HashSet<EdgeType>,
scuba: ScubaSampleBuilder,
) -> BoxStream<'static, Result<VOut, Error>>
where
V: 'static + Clone + WalkVisitor<VOut> + Send,
VOut: 'static + Send,
{
walk_exact_compat(
ctx,
repo,
enable_derive,
walk_roots,
visitor,
scheduled_max,
error_as_data_node_types,
error_as_data_edge_types,
scuba,
)
.compat()
.boxed()
}