mononoke: only emit NodeData from walker if required

Summary:
Only emit NodeData from walker if required to save some memory.  Each of the walks can now specify which NodeData it is interested in observing in the output stream.

We still need to emit Some as part of the Option<NodeData> in the output stream as it is used in things like the final count of loaded objects. Rather than stream over Option<Option<NodeData>> we instead add a NodeData::NotRequired variant

Reviewed By: markbt

Differential Revision: D22849831

fbshipit-source-id: ef212103ac2deb9d66b017b8febe233eb53c9ed3
This commit is contained in:
Alex Hornby 2020-08-06 06:26:09 -07:00 committed by Facebook GitHub Bot
parent c0347c6baf
commit f07e0be8e3
7 changed files with 142 additions and 31 deletions

View File

@ -416,6 +416,7 @@ pub async fn corpus<'a>(
logger,
datasources,
walk_params,
&[NodeType::FileContent],
None,
walk_state,
make_sink,

View File

@ -439,6 +439,7 @@ pub enum FileContentData {
/// e.g. file content streams are passed to you to read, they aren't pre-loaded to bytes.
pub enum NodeData {
ErrorAsData(Node),
NotRequired,
// Bonsai
Bookmark(ChangesetId),
BonsaiChangeset(BonsaiChangeset),

View File

@ -396,6 +396,7 @@ pub async fn scrub_objects<'a>(
logger,
datasources,
walk_params,
&[NodeType::FileContent],
None,
walk_state,
make_sink,

View File

@ -372,6 +372,7 @@ pub async fn compression_benefit<'a>(
logger,
datasources,
walk_params,
&[NodeType::FileContent],
None,
walk_state,
make_sink,

View File

@ -5,7 +5,7 @@
* GNU General Public License version 2.
*/
use crate::graph::EdgeType;
use crate::graph::{EdgeType, NodeType};
use crate::setup::{RepoWalkDatasources, RepoWalkParams};
use crate::walk::{walk_exact, StepRoute, WalkVisitor};
@ -16,7 +16,7 @@ use fbinit::FacebookInit;
use futures::{future::Future, stream::BoxStream};
use scuba_ext::ScubaSampleBuilder;
use slog::Logger;
use std::collections::HashSet;
use std::{collections::HashSet, iter::FromIterator};
use tokio::time::{Duration, Instant};
#[derive(Clone)]
@ -30,6 +30,7 @@ pub async fn walk_exact_tail<RunFac, SinkFac, SinkOut, V, VOut, Route>(
logger: Logger,
datasources: RepoWalkDatasources,
walk_params: RepoWalkParams,
required_node_data_types: &[NodeType],
always_emit_edge_types: Option<HashSet<EdgeType>>,
visitor: V,
make_run: RunFac,
@ -51,6 +52,9 @@ where
} else {
HashSet::new()
};
let required_node_data_types =
HashSet::from_iter(required_node_data_types.into_iter().cloned());
loop {
cloned!(make_run, repo, mut scuba_builder, visitor,);
@ -72,6 +76,7 @@ where
walk_params.error_as_data_edge_types.clone(),
walk_params.include_edge_types.clone(),
always_emit_edge_types.clone(),
required_node_data_types.clone(),
scuba_builder,
keep_edge_paths,
);

View File

@ -688,6 +688,7 @@ pub async fn validate<'a>(
logger,
datasources,
walk_params,
&[NodeType::BonsaiPhaseMapping],
Some(always_emit_edge_types),
stateful_visitor,
make_sink,

View File

@ -185,7 +185,10 @@ fn bookmark_step<'a, V: VisitOne>(
checker.add_edge(&mut edges, EdgeType::BookmarkToBonsaiHgMapping, || {
Node::BonsaiHgMapping(bcs_id)
});
future::ok(StepOutput(NodeData::Bookmark(bcs_id), edges))
future::ok(StepOutput(
checker.step_data(NodeType::Bookmark, || NodeData::Bookmark(bcs_id)),
edges,
))
}
None => future::err(format_err!("Unknown Bookmark {}", b)),
})
@ -208,22 +211,33 @@ fn published_bookmarks_step<V: VisitOne>(
|| Node::BonsaiHgMapping(bcs_id.clone()),
);
}
future::ok(StepOutput(NodeData::PublishedBookmarks, edges))
future::ok(StepOutput(
checker.step_data(NodeType::PublishedBookmarks, || {
NodeData::PublishedBookmarks
}),
edges,
))
}
fn bonsai_phase_step(
ctx: &CoreContext,
fn bonsai_phase_step<'a, V: VisitOne>(
ctx: &'a CoreContext,
checker: &'a Checker<V>,
phases_store: Arc<dyn Phases>,
bcs_id: ChangesetId,
) -> impl Future<Output = Result<StepOutput, Error>> {
) -> impl Future<Output = Result<StepOutput, Error>> + 'a {
phases_store
.get_public(ctx.clone(), vec![bcs_id], true)
.map(move |public| public.contains(&bcs_id))
.map(|is_public| {
let phase = if is_public { Some(Phase::Public) } else { None };
StepOutput(NodeData::BonsaiPhaseMapping(phase), vec![])
})
.compat()
.map_ok(move |is_public| {
let phase = if is_public { Some(Phase::Public) } else { None };
StepOutput(
checker.step_data(NodeType::BonsaiPhaseMapping, || {
NodeData::BonsaiPhaseMapping(phase)
}),
vec![],
)
})
}
async fn bonsai_changeset_step<V: VisitOne>(
@ -269,12 +283,16 @@ async fn bonsai_changeset_step<V: VisitOne>(
EdgeType::BonsaiChangesetToBonsaiFsnodeMapping,
|| Node::BonsaiFsnodeMapping(*bcs_id),
);
Ok(StepOutput(NodeData::BonsaiChangeset(bcs), edges))
Ok(StepOutput(
checker.step_data(NodeType::BonsaiChangeset, || NodeData::BonsaiChangeset(bcs)),
edges,
))
}
fn file_content_step(
fn file_content_step<V: VisitOne>(
ctx: CoreContext,
repo: &BlobRepo,
checker: &Checker<V>,
id: ContentId,
) -> Result<StepOutput, Error> {
let s = filestore::fetch_stream(repo.blobstore(), ctx, id)
@ -282,7 +300,9 @@ fn file_content_step(
.compat();
// We don't force file loading here, content may not be needed
Ok(StepOutput(
NodeData::FileContent(FileContentData::ContentStream(Box::pin(s))),
checker.step_data(NodeType::FileContent, || {
NodeData::FileContent(FileContentData::ContentStream(Box::pin(s)))
}),
vec![],
))
}
@ -319,9 +339,19 @@ fn file_content_metadata_step<'a, V: VisitOne>(
EdgeType::FileContentMetadataToGitSha1Alias,
|| Node::AliasContentMapping(Alias::GitSha1(metadata.git_sha1.sha1())),
);
StepOutput(NodeData::FileContentMetadata(Some(metadata)), edges)
StepOutput(
checker.step_data(NodeType::FileContentMetadata, || {
NodeData::FileContentMetadata(Some(metadata))
}),
edges,
)
}
Some(None) | None => StepOutput(NodeData::FileContentMetadata(None), vec![]),
Some(None) | None => StepOutput(
checker.step_data(NodeType::FileContentMetadata, || {
NodeData::FileContentMetadata(None)
}),
vec![],
),
})
.compat()
}
@ -390,9 +420,19 @@ fn bonsai_to_hg_mapping_step<'a, V: 'a + VisitOne>(
checker.add_edge(&mut edges, EdgeType::BonsaiHgMappingToHgChangeset, || {
Node::HgChangeset(hg_cs_id)
});
StepOutput(NodeData::BonsaiHgMapping(Some(hg_cs_id)), edges)
StepOutput(
checker.step_data(NodeType::BonsaiHgMapping, || {
NodeData::BonsaiHgMapping(Some(hg_cs_id))
}),
edges,
)
}
None => StepOutput(NodeData::BonsaiHgMapping(None), vec![]),
None => StepOutput(
checker.step_data(NodeType::BonsaiHgMapping, || {
NodeData::BonsaiHgMapping(None)
}),
vec![],
),
})
.compat()
}
@ -412,9 +452,19 @@ fn hg_to_bonsai_mapping_step<'a, V: VisitOne>(
EdgeType::HgBonsaiMappingToBonsaiChangeset,
|| Node::BonsaiChangeset(bcs_id),
);
StepOutput(NodeData::HgBonsaiMapping(Some(bcs_id)), edges)
StepOutput(
checker.step_data(NodeType::HgBonsaiMapping, || {
NodeData::HgBonsaiMapping(Some(bcs_id))
}),
edges,
)
}
None => StepOutput(NodeData::HgBonsaiMapping(None), vec![]),
None => StepOutput(
checker.step_data(NodeType::HgBonsaiMapping, || {
NodeData::HgBonsaiMapping(None)
}),
vec![],
),
})
.compat()
}
@ -437,7 +487,10 @@ fn hg_changeset_step<'a, V: VisitOne>(
Node::HgChangeset(HgChangesetId::new(p))
});
}
StepOutput(NodeData::HgChangeset(hgchangeset), edges)
StepOutput(
checker.step_data(NodeType::HgChangeset, || NodeData::HgChangeset(hgchangeset)),
edges,
)
})
}
@ -460,7 +513,12 @@ fn hg_file_envelope_step<'a, V: 'a + VisitOne>(
|| Node::FileContent(envelope.content_id()),
|| path.cloned(),
);
StepOutput(NodeData::HgFileEnvelope(envelope), edges)
StepOutput(
checker.step_data(NodeType::HgFileEnvelope, || {
NodeData::HgFileEnvelope(envelope)
}),
edges,
)
}
})
}
@ -511,9 +569,17 @@ fn hg_file_node_step<'a, V: VisitOne>(
))
})
}
StepOutput(NodeData::HgFileNode(Some(file_node_info)), edges)
StepOutput(
checker.step_data(NodeType::HgFileNode, || {
NodeData::HgFileNode(Some(file_node_info))
}),
edges,
)
}
None => StepOutput(NodeData::HgFileNode(None), vec![]),
None => StepOutput(
checker.step_data(NodeType::HgFileNode, || NodeData::HgFileNode(None)),
vec![],
),
})
.compat()
}
@ -555,7 +621,10 @@ fn hg_manifest_step<'a, V: VisitOne>(
Node::HgManifest((full_path, hg_child_manifest_id))
})
}
StepOutput(NodeData::HgManifest(hgmanifest), edges)
StepOutput(
checker.step_data(NodeType::HgManifest, || NodeData::HgManifest(hgmanifest)),
edges,
)
})
}
@ -574,7 +643,12 @@ fn alias_content_mapping_step<'a, V: VisitOne>(
EdgeType::AliasContentMappingToFileContent,
|| Node::FileContent(content_id),
);
StepOutput(NodeData::AliasContentMapping(content_id), edges)
StepOutput(
checker.step_data(NodeType::AliasContentMapping, || {
NodeData::AliasContentMapping(content_id)
}),
edges,
)
})
.map_err(Error::from)
}
@ -598,11 +672,18 @@ async fn bonsai_to_fsnode_mapping_step<V: VisitOne>(
Node::Fsnode((WrappedPath::Root, *root_fsnode_id.fsnode_id()))
});
Ok(StepOutput(
NodeData::BonsaiFsnodeMapping(Some(*root_fsnode_id.fsnode_id())),
checker.step_data(NodeType::BonsaiFsnodeMapping, || {
NodeData::BonsaiFsnodeMapping(Some(*root_fsnode_id.fsnode_id()))
}),
edges,
))
} else {
Ok(StepOutput(NodeData::BonsaiFsnodeMapping(None), vec![]))
Ok(StepOutput(
checker.step_data(NodeType::BonsaiFsnodeMapping, || {
NodeData::BonsaiFsnodeMapping(None)
}),
vec![],
))
}
}
@ -645,7 +726,10 @@ async fn fsnode_step<V: VisitOne>(
}
}
Ok(StepOutput(NodeData::Fsnode(fsnode), edges))
Ok(StepOutput(
checker.step_data(NodeType::Fsnode, || NodeData::Fsnode(fsnode)),
edges,
))
}
/// Expand nodes where check for a type is used as a check for other types.
@ -676,6 +760,7 @@ pub fn expand_checked_nodes(children: &mut Vec<OutgoingEdge>) -> () {
struct Checker<V: VisitOne> {
include_edge_types: HashSet<EdgeType>,
always_emit_edge_types: HashSet<EdgeType>,
required_node_data_types: HashSet<NodeType>,
keep_edge_paths: bool,
visitor: V,
}
@ -746,6 +831,18 @@ impl<V: VisitOne> Checker<V> {
}
None
}
// Only add the node data if requested
fn step_data<D>(&self, t: NodeType, data_fn: D) -> NodeData
where
D: FnOnce() -> NodeData,
{
if self.required_node_data_types.contains(&t) {
data_fn()
} else {
NodeData::NotRequired
}
}
}
/// Walk the graph from one or more starting points, providing stream of data for later reduction
@ -760,6 +857,7 @@ pub fn walk_exact<V, VOut, Route>(
error_as_data_edge_types: HashSet<EdgeType>,
include_edge_types: HashSet<EdgeType>,
always_emit_edge_types: HashSet<EdgeType>,
required_node_data_types: HashSet<NodeType>,
scuba: ScubaSampleBuilder,
keep_edge_paths: bool,
) -> BoxStream<'static, Result<VOut, Error>>
@ -791,6 +889,7 @@ where
always_emit_edge_types,
keep_edge_paths,
visitor: visitor.clone(),
required_node_data_types,
});
published_bookmarks
@ -896,7 +995,7 @@ where
repo.get_changeset_fetcher(),
heads_fetcher.clone(),
);
bonsai_phase_step(&ctx, phases_store, bcs_id).await
bonsai_phase_step(&ctx, &checker, phases_store, bcs_id).await
}
Node::PublishedBookmarks => {
published_bookmarks_step(published_bookmarks.clone(), &checker).await
@ -925,7 +1024,9 @@ where
hg_manifest_step(ctx.clone(), &repo, &checker, path, hg_manifest_id).await
}
// Content
Node::FileContent(content_id) => file_content_step(ctx.clone(), &repo, content_id),
Node::FileContent(content_id) => {
file_content_step(ctx.clone(), &repo, &checker, content_id)
}
Node::FileContentMetadata(content_id) => {
file_content_metadata_step(ctx.clone(), &repo, &checker, content_id, enable_derive)
.await