mononoke: walker: update the step methods to new futures

Summary:
Update the walker step methods to use new futures, and combine them with async fn

Later in stack planning to:
  * remove use of spawn_future and replace it with the tokio 0.2 join handles
  * port bounded_traversal_stream to new futures so all these 0.3 futures don't immediated get compat'd back to 0.1

Reviewed By: farnz

Differential Revision: D19767902

fbshipit-source-id: 10fd6236a064efbb7d0815fadbdd32036bcafead
This commit is contained in:
Alex Hornby 2020-02-10 05:58:31 -08:00 committed by Facebook Github Bot
parent 3d6d9754b5
commit 0105ad492b

View File

@ -17,15 +17,16 @@ use cloned::cloned;
use context::CoreContext;
use failure_ext::chain::ChainExt;
use filestore::{self, Alias};
use futures::{future, Future as OldFuture, Stream as OldStream};
use futures::{Future as Future01, Stream as Stream01};
use futures_ext::{
bounded_traversal::bounded_traversal_stream, spawn_future, BoxFuture, BoxStream, FutureExt,
StreamExt,
bounded_traversal::bounded_traversal_stream, spawn_future, BoxStream as BoxStream01,
FutureExt as Future01Ext, StreamExt as Stream01Ext,
};
use futures_preview::{
compat::{Future01CompatExt, Stream01CompatExt},
future::FutureExt as NewFutureExt,
stream::{BoxStream as NewBoxStream, StreamExt as NewStreamExt},
future::{self, Future, FutureExt},
stream::{BoxStream, StreamExt},
TryFutureExt,
};
use itertools::{Either, Itertools};
use mercurial_types::{
@ -88,11 +89,11 @@ fn bookmark_step(
repo: BlobRepo,
b: BookmarkName,
public_heads: Arc<HashMap<BookmarkName, ChangesetId>>,
) -> BoxFuture<StepOutput, Error> {
) -> impl Future<Output = Result<StepOutput, Error>> {
match public_heads.get(&b) {
Some(csid) => future::ok::<_, Error>(Some(csid.clone())).left_future(),
Some(csid) => future::ok(Some(csid.clone())).left_future(),
// Just in case we have non-public bookmarks
None => repo.get_bonsai_bookmark(ctx, &b).right_future(),
None => repo.get_bonsai_bookmark(ctx, &b).compat().right_future(),
}
.and_then(move |bcs_opt| match bcs_opt {
Some(bcs_id) => {
@ -106,18 +107,17 @@ fn bookmark_step(
Node::BonsaiHgMapping(bcs_id),
),
];
Ok(StepOutput(NodeData::Bookmark(bcs_id), recurse))
future::ok(StepOutput(NodeData::Bookmark(bcs_id), recurse))
}
None => Err(format_err!("Unknown Bookmark {}", b)),
None => future::err(format_err!("Unknown Bookmark {}", b)),
})
.boxify()
}
fn bonsai_phase_step(
ctx: CoreContext,
phases_store: Arc<dyn Phases>,
bcs_id: ChangesetId,
) -> BoxFuture<StepOutput, Error> {
) -> impl Future<Output = Result<StepOutput, Error>> {
phases_store
.get_public(ctx, vec![bcs_id], true)
.map(move |public| public.contains(&bcs_id))
@ -125,14 +125,14 @@ fn bonsai_phase_step(
let phase = if is_public { Some(Phase::Public) } else { None };
StepOutput(NodeData::BonsaiPhaseMapping(phase), vec![])
})
.boxify()
.compat()
}
fn bonsai_changeset_step(
ctx: CoreContext,
repo: &BlobRepo,
bcs_id: ChangesetId,
) -> BoxFuture<StepOutput, Error> {
) -> impl Future<Output = Result<StepOutput, Error>> {
// Get the data, and add direct file data for this bonsai changeset
bcs_id
.load(ctx.clone(), repo.blobstore())
@ -174,23 +174,22 @@ fn bonsai_changeset_step(
);
StepOutput(NodeData::BonsaiChangeset(bcs), recurse)
})
.boxify()
.compat()
}
fn file_content_step(
ctx: CoreContext,
repo: &BlobRepo,
id: ContentId,
) -> BoxFuture<StepOutput, Error> {
) -> Result<StepOutput, Error> {
let s = filestore::fetch_stream(repo.blobstore(), ctx, id)
.map(FileBytes)
.compat();
// We don't force file loading here, content may not be needed
future::ok(StepOutput(
Ok(StepOutput(
NodeData::FileContent(FileContentData::ContentStream(Box::pin(s))),
vec![],
))
.boxify()
}
fn file_content_metadata_step(
@ -198,7 +197,7 @@ fn file_content_metadata_step(
repo: &BlobRepo,
id: ContentId,
enable_derive: bool,
) -> BoxFuture<StepOutput, Error> {
) -> impl Future<Output = Result<StepOutput, Error>> {
let loader = if enable_derive {
filestore::get_metadata(repo.blobstore(), ctx, &id.into())
.map(Some)
@ -228,7 +227,7 @@ fn file_content_metadata_step(
}
Some(None) | None => StepOutput(NodeData::FileContentMetadata(None), vec![]),
})
.boxify()
.compat()
}
fn bonsai_to_hg_mapping_step(
@ -236,7 +235,7 @@ fn bonsai_to_hg_mapping_step(
repo: &BlobRepo,
bcs_id: ChangesetId,
enable_derive: bool,
) -> BoxFuture<StepOutput, Error> {
) -> impl Future<Output = Result<StepOutput, Error>> {
let hg_cs_id = if enable_derive {
repo.get_hg_from_bonsai_changeset(ctx, bcs_id)
.map(|hg_cs_id| Some(hg_cs_id))
@ -258,14 +257,14 @@ fn bonsai_to_hg_mapping_step(
),
None => StepOutput(NodeData::BonsaiHgMapping(None), vec![]),
})
.boxify()
.compat()
}
fn hg_to_bonsai_mapping_step(
ctx: CoreContext,
repo: &BlobRepo,
id: HgChangesetId,
) -> BoxFuture<StepOutput, Error> {
) -> impl Future<Output = Result<StepOutput, Error>> {
repo.get_bonsai_from_hg(ctx, id)
.map(move |maybe_bcs_id| match maybe_bcs_id {
Some(bcs_id) => {
@ -277,14 +276,14 @@ fn hg_to_bonsai_mapping_step(
}
None => StepOutput(NodeData::HgBonsaiMapping(None), vec![]),
})
.boxify()
.compat()
}
fn hg_changeset_step(
ctx: CoreContext,
repo: &BlobRepo,
id: HgChangesetId,
) -> BoxFuture<StepOutput, Error> {
) -> impl Future<Output = Result<StepOutput, Error>> {
id.load(ctx, repo.blobstore())
.from_err()
.map(|hgchangeset| {
@ -302,15 +301,14 @@ fn hg_changeset_step(
}
StepOutput(NodeData::HgChangeset(hgchangeset), recurse)
})
.boxify()
.compat()
}
fn hg_file_envelope_step(
ctx: CoreContext,
repo: &BlobRepo,
hg_file_node_id: HgFileNodeId,
) -> BoxFuture<StepOutput, Error>
where {
) -> impl Future<Output = Result<StepOutput, Error>> {
hg_file_node_id
.load(ctx, repo.blobstore())
.from_err()
@ -324,7 +322,7 @@ where {
StepOutput(NodeData::HgFileEnvelope(envelope), vec![fnode])
}
})
.boxify()
.compat()
}
fn hg_file_node_step(
@ -332,7 +330,7 @@ fn hg_file_node_step(
repo: &BlobRepo,
path: Option<MPath>,
hg_file_node_id: HgFileNodeId,
) -> BoxFuture<StepOutput, Error> {
) -> impl Future<Output = Result<StepOutput, Error>> {
let repo_path = match path.clone() {
None => RepoPath::RootPath,
Some(mpath) => RepoPath::FilePath(mpath),
@ -379,7 +377,7 @@ fn hg_file_node_step(
}
None => StepOutput(NodeData::HgFileNode(None), vec![]),
})
.boxify()
.compat()
}
fn hg_manifest_step(
@ -387,7 +385,7 @@ fn hg_manifest_step(
repo: &BlobRepo,
path: Option<MPath>,
hg_manifest_id: HgManifestId,
) -> BoxFuture<StepOutput, Error> {
) -> impl Future<Output = Result<StepOutput, Error>> {
hg_manifest_id
.load(ctx, repo.blobstore())
.from_err()
@ -438,14 +436,14 @@ fn hg_manifest_step(
StepOutput(NodeData::HgManifest(hgmanifest), children)
}
})
.boxify()
.compat()
}
fn alias_content_mapping_step(
ctx: CoreContext,
repo: &BlobRepo,
alias: Alias,
) -> BoxFuture<StepOutput, Error> {
) -> impl Future<Output = Result<StepOutput, Error>> {
alias
.load(ctx, &repo.get_blobstore())
.map(|content_id| {
@ -456,7 +454,7 @@ fn alias_content_mapping_step(
StepOutput(NodeData::AliasContentMapping(content_id), recurse)
})
.map_err(Error::from)
.boxify()
.compat()
}
/// Expand nodes where check for a type is used as a check for other types.
@ -493,7 +491,7 @@ fn walk_exact_compat<V, VOut>(
error_as_data_node_types: HashSet<NodeType>,
error_as_data_edge_types: HashSet<EdgeType>,
scuba: ScubaSampleBuilder,
) -> BoxStream<VOut, Error>
) -> BoxStream01<VOut, Error>
where
V: 'static + Clone + WalkVisitor<VOut> + Send,
VOut: 'static + Send,
@ -520,173 +518,182 @@ where
public_heads
.map(move |public_heads| {
let public_heads = Arc::new(public_heads);
walk_exact_impl(
ctx,
repo,
enable_derive,
walk_roots,
visitor,
scheduled_max,
error_as_data_node_types,
error_as_data_edge_types,
scuba,
public_heads.clone(),
Arc::new(move |_ctx: &CoreContext| {
future::ok(public_heads.iter().map(|(_, csid)| csid).cloned().collect())
.compat()
.boxed()
}),
)
bounded_traversal_stream(scheduled_max, walk_roots, {
move |walk_item| {
cloned!(
ctx,
error_as_data_node_types,
error_as_data_edge_types,
public_heads,
repo,
scuba,
visitor
);
// Each step returns the walk result, and next steps
let next = async move {
walk_one(
ctx,
walk_item,
repo,
enable_derive,
visitor,
error_as_data_node_types,
error_as_data_edge_types,
scuba,
public_heads.clone(),
Arc::new(move |_ctx: &CoreContext| {
future::ok(
public_heads.iter().map(|(_, csid)| csid).cloned().collect(),
)
.boxed()
}),
)
.await
}
.boxed()
.compat()
.boxify();
spawn_future(next)
}
})
})
.flatten_stream()
.boxify()
}
fn walk_exact_impl<V, VOut>(
async fn walk_one<V, VOut>(
ctx: CoreContext,
walk_item: OutgoingEdge,
repo: BlobRepo,
enable_derive: bool,
walk_roots: Vec<OutgoingEdge>,
visitor: V,
scheduled_max: usize,
error_as_data_node_types: HashSet<NodeType>,
error_as_data_edge_types: HashSet<EdgeType>,
scuba: ScubaSampleBuilder,
mut scuba: ScubaSampleBuilder,
public_heads: Arc<HashMap<BookmarkName, ChangesetId>>,
heads_fetcher: HeadsFetcher,
) -> impl OldStream<Item = VOut, Error = Error>
) -> Result<(VOut, Vec<OutgoingEdge>), Error>
where
V: 'static + Clone + WalkVisitor<VOut> + Send,
VOut: 'static + Send,
{
bounded_traversal_stream(scheduled_max, walk_roots, {
// Each step returns the walk result, and next steps
move |walk_item| {
cloned!(ctx);
let logger = ctx.logger().clone();
let node = walk_item.target.clone();
let edge_label = walk_item.label;
let next = match node.clone() {
Node::Root => {
future::err(format_err!("Not expecting Roots to be generated")).boxify()
}
// Bonsai
Node::Bookmark(bookmark_name) => bookmark_step(
ctx.clone(),
repo.clone(),
bookmark_name,
public_heads.clone(),
),
Node::BonsaiChangeset(bcs_id) => bonsai_changeset_step(ctx, &repo, bcs_id),
Node::BonsaiHgMapping(bcs_id) => {
bonsai_to_hg_mapping_step(ctx, &repo, bcs_id, enable_derive)
}
Node::BonsaiPhaseMapping(bcs_id) => {
let phases_store = repo
.get_phases_factory()
.get_phases(repo.get_changeset_fetcher(), heads_fetcher.clone());
bonsai_phase_step(ctx, phases_store, bcs_id)
}
// Hg
Node::HgBonsaiMapping(hg_csid) => hg_to_bonsai_mapping_step(ctx, &repo, hg_csid),
Node::HgChangeset(hg_csid) => hg_changeset_step(ctx, &repo, hg_csid),
Node::HgFileEnvelope(hg_file_node_id) => {
hg_file_envelope_step(ctx, &repo, hg_file_node_id)
}
Node::HgFileNode((path, hg_file_node_id)) => {
hg_file_node_step(ctx, &repo, path, hg_file_node_id)
}
Node::HgManifest((path, hg_manifest_id)) => {
hg_manifest_step(ctx, &repo, path, hg_manifest_id)
}
// Content
Node::FileContent(content_id) => file_content_step(ctx, &repo, content_id),
Node::FileContentMetadata(content_id) => {
file_content_metadata_step(ctx, &repo, content_id, enable_derive)
}
Node::AliasContentMapping(alias) => alias_content_mapping_step(ctx, &repo, alias),
}
.then({
// Error as data logic
cloned!(
logger,
error_as_data_edge_types,
error_as_data_node_types,
walk_item,
mut scuba,
);
move |r| match r {
Ok(s) => Ok(s),
Err(e) => {
if error_as_data_node_types.contains(&walk_item.target.get_type()) {
if error_as_data_edge_types.is_empty()
|| error_as_data_edge_types.contains(&walk_item.label)
{
warn!(
logger,
"Could not step to {:?}, due to: {:?}", &walk_item, e
);
add_node_to_scuba(&walk_item.target, &mut scuba);
scuba
.add(EDGE_TYPE, edge_label.to_string())
.add(CHECK_TYPE, "step")
.add(CHECK_FAIL, 1)
.log();
Ok(StepOutput(NodeData::ErrorAsData(walk_item.target), vec![]))
} else {
Err(e)
}
} else {
Err(e)
}
}
}
})
.chain_err(ErrorKind::NotTraversable(walk_item))
.from_err()
.and_then({
cloned!(visitor);
move |StepOutput(node_data, children)| {
// make sure steps are valid. would be nice if this could be static
let children = children
.into_iter()
.map(|c| {
if c.label.outgoing_type() != c.target.get_type() {
Err(format_err!(
"Bad step {:?} to {:?}",
c.label,
c.target.get_type()
))
} else if c
.label
.incoming_type()
.map(|t| t != node.get_type())
.unwrap_or(false)
{
Err(format_err!(
"Bad step {:?} from {:?}",
c.label,
node.get_type()
))
} else {
Ok(c)
}
})
.collect::<Result<Vec<OutgoingEdge>, Error>>();
let children = children?;
// Allow WalkVisitor to record state and decline outgoing nodes if already visited
Ok(visitor.visit(
ResolvedNode::new(node, node_data, Some(edge_label)),
children,
))
}
});
spawn_future(next)
let logger = ctx.logger().clone();
let node = walk_item.target.clone();
let node_type = node.get_type();
let step_result = match node.clone() {
Node::Root => Err(format_err!("Not expecting Roots to be generated")),
// Bonsai
Node::Bookmark(bookmark_name) => {
bookmark_step(
ctx.clone(),
repo.clone(),
bookmark_name,
public_heads.clone(),
)
.await
}
})
Node::BonsaiChangeset(bcs_id) => bonsai_changeset_step(ctx, &repo, bcs_id).await,
Node::BonsaiHgMapping(bcs_id) => {
bonsai_to_hg_mapping_step(ctx, &repo, bcs_id, enable_derive).await
}
Node::BonsaiPhaseMapping(bcs_id) => {
let phases_store = repo
.get_phases_factory()
.get_phases(repo.get_changeset_fetcher(), heads_fetcher.clone());
bonsai_phase_step(ctx, phases_store, bcs_id).await
}
// Hg
Node::HgBonsaiMapping(hg_csid) => hg_to_bonsai_mapping_step(ctx, &repo, hg_csid).await,
Node::HgChangeset(hg_csid) => hg_changeset_step(ctx, &repo, hg_csid).await,
Node::HgFileEnvelope(hg_file_node_id) => {
hg_file_envelope_step(ctx, &repo, hg_file_node_id).await
}
Node::HgFileNode((path, hg_file_node_id)) => {
hg_file_node_step(ctx, &repo, path, hg_file_node_id).await
}
Node::HgManifest((path, hg_manifest_id)) => {
hg_manifest_step(ctx, &repo, path, hg_manifest_id).await
}
// Content
Node::FileContent(content_id) => file_content_step(ctx, &repo, content_id),
Node::FileContentMetadata(content_id) => {
file_content_metadata_step(ctx, &repo, content_id, enable_derive).await
}
Node::AliasContentMapping(alias) => alias_content_mapping_step(ctx, &repo, alias).await,
};
let edge_label = walk_item.label;
let step_output = match step_result {
Ok(s) => Ok(s),
Err(e) => {
if error_as_data_node_types.contains(&walk_item.target.get_type()) {
if error_as_data_edge_types.is_empty()
|| error_as_data_edge_types.contains(&walk_item.label)
{
warn!(
logger,
"Could not step to {:?}, due to: {:?}", &walk_item, e
);
add_node_to_scuba(&walk_item.target, &mut scuba);
scuba
.add(EDGE_TYPE, edge_label.to_string())
.add(CHECK_TYPE, "step")
.add(CHECK_FAIL, 1)
.log();
Ok(StepOutput(
NodeData::ErrorAsData(walk_item.target.clone()),
vec![],
))
} else {
Err(e)
}
} else {
Err(e)
}
}
}
.chain_err(ErrorKind::NotTraversable(walk_item))
.map_err(Error::from)?;
match step_output {
StepOutput(node_data, children) => {
// make sure steps are valid. would be nice if this could be static
let children = children
.into_iter()
.map(|c| {
if c.label.outgoing_type() != c.target.get_type() {
Err(format_err!(
"Bad step {:?} to {:?}",
c.label,
c.target.get_type()
))
} else if c
.label
.incoming_type()
.map(|t| t != node_type)
.unwrap_or(false)
{
Err(format_err!(
"Bad step {:?} from {:?}",
c.label,
node.get_type()
))
} else {
Ok(c)
}
})
.collect::<Result<Vec<OutgoingEdge>, Error>>();
let children = children?;
// Allow WalkVisitor to record state and decline outgoing nodes if already visited
Ok(visitor.visit(
ResolvedNode::new(node, node_data, Some(edge_label)),
children,
))
}
}
}
pub fn walk_exact<V, VOut>(
@ -699,7 +706,7 @@ pub fn walk_exact<V, VOut>(
error_as_data_node_types: HashSet<NodeType>,
error_as_data_edge_types: HashSet<EdgeType>,
scuba: ScubaSampleBuilder,
) -> NewBoxStream<'static, Result<VOut, Error>>
) -> BoxStream<'static, Result<VOut, Error>>
where
V: 'static + Clone + WalkVisitor<VOut> + Send,
VOut: 'static + Send,