mononoke: check whether to emit an edge from the walker earlier

Summary:
Check whether to emit an edge from the walker earlier to reduce vec allocation of unnecessary edges that would immediately be dropped in WalkVistor::visit.

The VisitOne trait is introduced as a simpler api to the Visitor that can be used to check if one edge needs to be visited,  and the Checker struct in walk.rs is a helper around that that will only call the VisitOne api if necessary. Checker also takes on responsibility for respecting keep_edge_paths when returning paths,  so that parameter has be removed  for migrated steps.

To keep the diff size reasonable, this change has all the necessary Checker/VisitOne changes but only converts hg_manifest_step, with the remainder of the steps converted in the next in stack.  Marked todos labelling unmigrated types as always emit types are be removed as part of converting remaining steps.

Reviewed By: farnz

Differential Revision: D22864136

fbshipit-source-id: 431c3637634c6a02ab08662261b10815ea6ce293
This commit is contained in:
Alex Hornby 2020-08-04 04:29:08 -07:00 committed by Facebook GitHub Bot
parent fe60eeff85
commit 5fb309a7b2
8 changed files with 246 additions and 100 deletions

View File

@ -416,6 +416,7 @@ pub async fn corpus<'a>(
logger,
datasources,
walk_params,
None,
walk_state,
make_sink,
true,

View File

@ -7,7 +7,7 @@
use crate::graph::{EdgeType, Node, NodeData, NodeType, WrappedPath};
use crate::state::{StepStats, WalkState};
use crate::walk::{EmptyRoute, OutgoingEdge, StepRoute, WalkVisitor};
use crate::walk::{EmptyRoute, OutgoingEdge, StepRoute, VisitOne, WalkVisitor};
use context::{CoreContext, SamplingKey};
use dashmap::DashMap;
@ -39,8 +39,15 @@ impl<T> SamplingWalkVisitor<T> {
sample_rate: u64,
sample_offset: u64,
) -> Self {
// TODO(ahornby) temporary until later in stack so don't need to review
// all types update to new checker at once
let mut unmigrated_types = include_edge_types.clone();
unmigrated_types.remove(&EdgeType::HgManifestToHgFileEnvelope);
unmigrated_types.remove(&EdgeType::HgManifestToHgFileNode);
unmigrated_types.remove(&EdgeType::HgManifestToChildHgManifest);
Self {
inner: WalkState::new(include_node_types, include_edge_types),
inner: WalkState::new(include_node_types, include_edge_types, unmigrated_types),
sample_node_types,
sample_path_regex,
sampler,
@ -50,6 +57,12 @@ impl<T> SamplingWalkVisitor<T> {
}
}
impl<T> VisitOne for SamplingWalkVisitor<T> {
fn needs_visit(&self, outgoing: &OutgoingEdge) -> bool {
self.inner.needs_visit(outgoing)
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct PathTrackingRoute {
// The path we reached this by

View File

@ -396,6 +396,7 @@ pub async fn scrub_objects<'a>(
logger,
datasources,
walk_params,
None,
walk_state,
make_sink,
false,

View File

@ -372,6 +372,7 @@ pub async fn compression_benefit<'a>(
logger,
datasources,
walk_params,
None,
walk_state,
make_sink,
true,

View File

@ -6,7 +6,7 @@
*/
use crate::graph::{EdgeType, Node, NodeData, NodeType, WrappedPath};
use crate::walk::{expand_checked_nodes, EmptyRoute, OutgoingEdge, WalkVisitor};
use crate::walk::{expand_checked_nodes, EmptyRoute, OutgoingEdge, VisitOne, WalkVisitor};
use array_init::array_init;
use context::CoreContext;
use dashmap::DashMap;
@ -45,6 +45,7 @@ pub struct WalkState {
// e.g. ChangesetId, HgChangesetId, HgFileNodeId
include_node_types: HashSet<NodeType>,
include_edge_types: HashSet<EdgeType>,
always_emit_edge_types: HashSet<EdgeType>,
visited_bcs: DashMap<ChangesetId, ()>,
visited_bcs_mapping: DashMap<ChangesetId, ()>,
visited_bcs_phase: DashMap<ChangesetId, ()>,
@ -75,10 +76,12 @@ impl WalkState {
pub fn new(
include_node_types: HashSet<NodeType>,
include_edge_types: HashSet<EdgeType>,
always_emit_edge_types: HashSet<EdgeType>,
) -> Self {
Self {
include_node_types,
include_edge_types,
always_emit_edge_types,
visited_bcs: DashMap::new(),
visited_bcs_mapping: DashMap::new(),
visited_bcs_phase: DashMap::new(),
@ -93,36 +96,6 @@ impl WalkState {
}
}
/// If the set did not have this value present, true is returned.
fn needs_visit(&self, outgoing: &OutgoingEdge) -> bool {
let target_node: &Node = &outgoing.target;
let k = target_node.get_type();
self.visit_count[k as usize].fetch_add(1, Ordering::Release);
match &target_node {
Node::BonsaiChangeset(bcs_id) => self.visited_bcs.insert(*bcs_id, ()).is_none(),
// TODO - measure if worth tracking - the mapping is cachelib enabled.
Node::BonsaiHgMapping(bcs_id) => {
// Does not insert, see record_resolved_visit
!self.visited_bcs_mapping.contains_key(bcs_id)
}
Node::BonsaiPhaseMapping(bcs_id) => {
// Does not insert, as can only prune visits once data resolved, see record_resolved_visit
!self.visited_bcs_phase.contains_key(bcs_id)
}
Node::HgBonsaiMapping(hg_cs_id) => {
self.visited_hg_cs_mapping.insert(*hg_cs_id, ()).is_none()
}
Node::HgChangeset(hg_cs_id) => self.visited_hg_cs.insert(*hg_cs_id, ()).is_none(),
Node::HgManifest(k) => record_with_path(&self.visited_hg_manifest, k),
Node::HgFileNode(k) => record_with_path(&self.visited_hg_filenode, k),
Node::HgFileEnvelope(id) => self.visited_hg_file_envelope.insert(*id, ()).is_none(),
Node::FileContent(content_id) => self.visited_file.insert(*content_id, ()).is_none(),
Node::Fsnode(k) => record_with_path(&self.visited_fsnode, k),
_ => true,
}
}
fn record_resolved_visit(&self, resolved: &OutgoingEdge, node_data: Option<&NodeData>) {
match (&resolved.target, node_data) {
(
@ -153,6 +126,38 @@ impl WalkState {
}
}
impl VisitOne for WalkState {
/// If the set did not have this value present, true is returned.
fn needs_visit(&self, outgoing: &OutgoingEdge) -> bool {
let target_node: &Node = &outgoing.target;
let k = target_node.get_type();
self.visit_count[k as usize].fetch_add(1, Ordering::Release);
match &target_node {
Node::BonsaiChangeset(bcs_id) => self.visited_bcs.insert(*bcs_id, ()).is_none(),
// TODO - measure if worth tracking - the mapping is cachelib enabled.
Node::BonsaiHgMapping(bcs_id) => {
// Does not insert, see record_resolved_visit
!self.visited_bcs_mapping.contains_key(bcs_id)
}
Node::BonsaiPhaseMapping(bcs_id) => {
// Does not insert, as can only prune visits once data resolved, see record_resolved_visit
!self.visited_bcs_phase.contains_key(bcs_id)
}
Node::HgBonsaiMapping(hg_cs_id) => {
self.visited_hg_cs_mapping.insert(*hg_cs_id, ()).is_none()
}
Node::HgChangeset(hg_cs_id) => self.visited_hg_cs.insert(*hg_cs_id, ()).is_none(),
Node::HgManifest(k) => record_with_path(&self.visited_hg_manifest, k),
Node::HgFileNode(k) => record_with_path(&self.visited_hg_filenode, k),
Node::HgFileEnvelope(id) => self.visited_hg_file_envelope.insert(*id, ()).is_none(),
Node::FileContent(content_id) => self.visited_file.insert(*content_id, ()).is_none(),
Node::Fsnode(k) => record_with_path(&self.visited_fsnode, k),
_ => true,
}
}
}
impl WalkVisitor<(Node, Option<NodeData>, Option<StepStats>), EmptyRoute> for WalkState {
fn start_step(
&self,
@ -168,18 +173,30 @@ impl WalkVisitor<(Node, Option<NodeData>, Option<StepStats>), EmptyRoute> for Wa
_ctx: &CoreContext,
resolved: OutgoingEdge,
node_data: Option<NodeData>,
_route: Option<EmptyRoute>,
route: Option<EmptyRoute>,
mut outgoing: Vec<OutgoingEdge>,
) -> (
(Node, Option<NodeData>, Option<StepStats>),
EmptyRoute,
Vec<OutgoingEdge>,
) {
// Filter things we don't want to enter the WalkVisitor at all.
outgoing.retain(|e| self.retain_edge(e) && self.needs_visit(&e));
if route.is_none() || !self.always_emit_edge_types.is_empty() {
outgoing.retain(|e| {
if e.label.incoming_type().is_none() {
// Make sure stats are updated for root nodes
self.needs_visit(&e);
true
} else {
// Check the always emit edges, outer visitor has now processed them.
self.retain_edge(e)
&& (!self.always_emit_edge_types.contains(&e.label) || self.needs_visit(&e))
}
});
}
let num_outgoing = outgoing.len();
expand_checked_nodes(&mut outgoing);
// Make sure we don't expand to types of node and edge not wanted
if num_outgoing != outgoing.len() {
outgoing.retain(|e| self.retain_edge(e));

View File

@ -5,6 +5,7 @@
* GNU General Public License version 2.
*/
use crate::graph::EdgeType;
use crate::setup::{RepoWalkDatasources, RepoWalkParams};
use crate::walk::{walk_exact, StepRoute, WalkVisitor};
@ -15,6 +16,7 @@ use fbinit::FacebookInit;
use futures::{future::Future, stream::BoxStream};
use scuba_ext::ScubaSampleBuilder;
use slog::Logger;
use std::collections::HashSet;
use tokio::time::{Duration, Instant};
#[derive(Clone)]
@ -28,6 +30,7 @@ pub async fn walk_exact_tail<RunFac, SinkFac, SinkOut, V, VOut, Route>(
logger: Logger,
datasources: RepoWalkDatasources,
walk_params: RepoWalkParams,
always_emit_edge_types: Option<HashSet<EdgeType>>,
visitor: V,
make_run: RunFac,
keep_edge_paths: bool,
@ -36,13 +39,18 @@ where
RunFac: 'static + Clone + Send + Sync + FnOnce(RepoWalkRun) -> SinkFac,
SinkFac: 'static + FnOnce(BoxStream<'static, Result<VOut, Error>>) -> SinkOut + Clone + Send,
SinkOut: Future<Output = Result<(), Error>> + 'static + Send,
V: 'static + Clone + WalkVisitor<VOut, Route> + Send,
V: 'static + Clone + WalkVisitor<VOut, Route> + Send + Sync,
VOut: 'static + Send,
Route: 'static + Send + Clone + StepRoute,
{
let scuba_builder = datasources.scuba_builder;
let repo = datasources.blobrepo;
let tail_secs = walk_params.tail_secs.clone();
let always_emit_edge_types = if let Some(always_emit_edge_types) = always_emit_edge_types {
always_emit_edge_types
} else {
HashSet::new()
};
loop {
cloned!(make_run, repo, mut scuba_builder, visitor,);
@ -62,6 +70,8 @@ where
walk_params.scheduled_max,
walk_params.error_as_data_node_types.clone(),
walk_params.error_as_data_edge_types.clone(),
walk_params.include_edge_types.clone(),
always_emit_edge_types.clone(),
scuba_builder,
keep_edge_paths,
);

View File

@ -23,7 +23,7 @@ use crate::setup::{
};
use crate::state::{StepStats, WalkState};
use crate::tail::{walk_exact_tail, RepoWalkRun};
use crate::walk::{EmptyRoute, OutgoingEdge, StepRoute, WalkVisitor};
use crate::walk::{EmptyRoute, OutgoingEdge, StepRoute, VisitOne, WalkVisitor};
use anyhow::{format_err, Error};
use clap::ArgMatches;
@ -157,10 +157,15 @@ impl ValidatingVisitor {
include_node_types: HashSet<NodeType>,
include_edge_types: HashSet<EdgeType>,
include_checks: HashSet<CheckType>,
always_emit_edge_types: HashSet<EdgeType>,
) -> Self {
Self {
repo_stats_key,
inner: WalkState::new(include_node_types, include_edge_types),
inner: WalkState::new(
include_node_types,
include_edge_types,
always_emit_edge_types,
),
checks_by_node_type: include_checks
.into_iter()
.group_by(|c| c.node_type())
@ -171,6 +176,12 @@ impl ValidatingVisitor {
}
}
impl VisitOne for ValidatingVisitor {
fn needs_visit(&self, outgoing: &OutgoingEdge) -> bool {
self.inner.needs_visit(outgoing)
}
}
fn check_bonsai_phase_is_public(
node: &Node,
node_data: Option<&NodeData>,
@ -661,11 +672,24 @@ pub async fn validate<'a>(
}
};
let mut always_emit_edge_types = vec![EdgeType::HgLinkNodeToHgChangeset];
// TODO(ahornby) temporary until later in stack so don't need to review
// all types update to new checker at once
let mut unmigrated_types = include_edge_types.clone();
unmigrated_types.remove(&EdgeType::HgManifestToHgFileEnvelope);
unmigrated_types.remove(&EdgeType::HgManifestToHgFileNode);
unmigrated_types.remove(&EdgeType::HgManifestToChildHgManifest);
always_emit_edge_types.append(&mut unmigrated_types.into_iter().collect());
let always_emit_edge_types = HashSet::from_iter(always_emit_edge_types.into_iter());
let stateful_visitor = Arc::new(ValidatingVisitor::new(
repo_stats_key,
include_node_types,
include_edge_types,
include_check_types,
always_emit_edge_types.clone(),
));
walk_exact_tail(
@ -673,6 +697,7 @@ pub async fn validate<'a>(
logger,
datasources,
walk_params,
Some(always_emit_edge_types),
stateful_visitor,
make_sink,
false,

View File

@ -96,7 +96,15 @@ pub enum ErrorKind {
NotTraversable(OutgoingEdge, String),
}
pub trait WalkVisitor<VOut, Route> {
// Simpler visitor trait used inside each step to decide
// whether to emit an edge
pub trait VisitOne {
fn needs_visit(&self, outgoing: &OutgoingEdge) -> bool;
}
// Overall trait with support for route tracking and handling
// partially derived types (it can see the node_data)
pub trait WalkVisitor<VOut, Route>: VisitOne {
// Called before the step is attempted
fn start_step(
&self,
@ -116,6 +124,15 @@ pub trait WalkVisitor<VOut, Route> {
) -> (VOut, Route, Vec<OutgoingEdge>);
}
impl<V> VisitOne for Arc<V>
where
V: VisitOne,
{
fn needs_visit(&self, outgoing: &OutgoingEdge) -> bool {
self.as_ref().needs_visit(outgoing)
}
}
impl<V, VOut, Route> WalkVisitor<VOut, Route> for Arc<V>
where
V: 'static + WalkVisitor<VOut, Route> + Sync + Send,
@ -507,70 +524,44 @@ fn hg_file_node_step(
.compat()
}
fn hg_manifest_step(
fn hg_manifest_step<'a, V: VisitOne>(
ctx: CoreContext,
repo: &BlobRepo,
repo: &'a BlobRepo,
checker: &'a Checker<V>,
path: WrappedPath,
hg_manifest_id: HgManifestId,
keep_edge_paths: bool,
) -> impl Future<Output = Result<StepOutput, Error>> {
) -> impl Future<Output = Result<StepOutput, Error>> + 'a {
hg_manifest_id
.load(ctx, repo.blobstore())
.map_err(Error::from)
.map_ok({
move |hgmanifest| {
let (manifests, filenodes): (Vec<_>, Vec<_>) =
hgmanifest.list().partition_map(|child| {
let path_opt = WrappedPath::from(MPath::join_element_opt(
path.as_ref(),
child.get_name(),
));
match child.get_hash() {
HgEntryId::File(_, filenode_id) => {
Either::Right((path_opt, filenode_id))
}
HgEntryId::Manifest(manifest_id) => {
Either::Left((path_opt, manifest_id))
}
}
});
let mut children: Vec<_> = filenodes
.into_iter()
.map(move |(full_path, hg_file_node_id)| {
vec![
OutgoingEdge::new_with_path(
EdgeType::HgManifestToHgFileEnvelope,
Node::HgFileEnvelope(hg_file_node_id),
if keep_edge_paths {
Some(full_path.clone())
} else {
None
},
),
OutgoingEdge::new(
EdgeType::HgManifestToHgFileNode,
Node::HgFileNode((full_path, hg_file_node_id)),
),
]
})
.flatten()
.collect();
let mut children_manifests: Vec<_> = manifests
.into_iter()
.map(move |(full_path, hg_child_manifest_id)| {
OutgoingEdge::new(
EdgeType::HgManifestToChildHgManifest,
Node::HgManifest((full_path, hg_child_manifest_id)),
)
})
.collect();
children.append(&mut children_manifests);
StepOutput(NodeData::HgManifest(hgmanifest), children)
.map_ok(move |hgmanifest| {
let (manifests, filenodes): (Vec<_>, Vec<_>) =
hgmanifest.list().partition_map(|child| {
let path_opt =
WrappedPath::from(MPath::join_element_opt(path.as_ref(), child.get_name()));
match child.get_hash() {
HgEntryId::File(_, filenode_id) => Either::Right((path_opt, filenode_id)),
HgEntryId::Manifest(manifest_id) => Either::Left((path_opt, manifest_id)),
}
});
let mut edges = vec![];
for (full_path, hg_file_node_id) in filenodes {
checker.add_edge_with_path(
&mut edges,
EdgeType::HgManifestToHgFileEnvelope,
|| Node::HgFileEnvelope(hg_file_node_id),
|| Some(full_path.clone()),
);
checker.add_edge(&mut edges, EdgeType::HgManifestToHgFileNode, || {
Node::HgFileNode((full_path, hg_file_node_id))
});
}
for (full_path, hg_child_manifest_id) in manifests {
checker.add_edge(&mut edges, EdgeType::HgManifestToChildHgManifest, || {
Node::HgManifest((full_path, hg_child_manifest_id))
})
}
StepOutput(NodeData::HgManifest(hgmanifest), edges)
})
}
@ -683,6 +674,81 @@ pub fn expand_checked_nodes(children: &mut Vec<OutgoingEdge>) -> () {
}
}
struct Checker<V: VisitOne> {
include_edge_types: HashSet<EdgeType>,
always_emit_edge_types: HashSet<EdgeType>,
keep_edge_paths: bool,
visitor: V,
}
impl<V: VisitOne> Checker<V> {
// Convience method around make_edge
fn add_edge<N>(&self, edges: &mut Vec<OutgoingEdge>, edge_type: EdgeType, node_fn: N)
where
N: FnOnce() -> Node,
{
if let Some(edge) = self.make_edge(edge_type, node_fn) {
edges.push(edge)
}
}
// Convience method around make_edge_with_path
fn add_edge_with_path<N, P>(
&self,
edges: &mut Vec<OutgoingEdge>,
edge_type: EdgeType,
node_fn: N,
path_fn: P,
) where
N: FnOnce() -> Node,
P: FnOnce() -> Option<WrappedPath>,
{
if let Some(edge) = self.make_edge_with_path(edge_type, node_fn, path_fn) {
edges.push(edge)
}
}
// Construct a new edge, only calling visitor to check if the edge_type is needed
fn make_edge<N>(&self, edge_type: EdgeType, node_fn: N) -> Option<OutgoingEdge>
where
N: FnOnce() -> Node,
{
let always_emit = self.always_emit_edge_types.contains(&edge_type);
if always_emit || self.include_edge_types.contains(&edge_type) {
let outgoing = OutgoingEdge::new(edge_type, node_fn());
if always_emit || self.visitor.needs_visit(&outgoing) {
return Some(outgoing);
}
}
None
}
// Construct a new edge, only calling visitor to check if the edge_type is needed
fn make_edge_with_path<N, P>(
&self,
edge_type: EdgeType,
node_fn: N,
path_fn: P,
) -> Option<OutgoingEdge>
where
N: FnOnce() -> Node,
P: FnOnce() -> Option<WrappedPath>,
{
let always_emit = self.always_emit_edge_types.contains(&edge_type);
if always_emit || self.include_edge_types.contains(&edge_type) {
let outgoing = if self.keep_edge_paths {
OutgoingEdge::new_with_path(edge_type, node_fn(), path_fn())
} else {
OutgoingEdge::new(edge_type, node_fn())
};
if always_emit || self.visitor.needs_visit(&outgoing) {
return Some(outgoing);
}
}
None
}
}
/// Walk the graph from one or more starting points, providing stream of data for later reduction
pub fn walk_exact<V, VOut, Route>(
ctx: CoreContext,
@ -693,11 +759,13 @@ pub fn walk_exact<V, VOut, Route>(
scheduled_max: usize,
error_as_data_node_types: HashSet<NodeType>,
error_as_data_edge_types: HashSet<EdgeType>,
include_edge_types: HashSet<EdgeType>,
always_emit_edge_types: HashSet<EdgeType>,
scuba: ScubaSampleBuilder,
keep_edge_paths: bool,
) -> BoxStream<'static, Result<VOut, Error>>
where
V: 'static + Clone + WalkVisitor<VOut, Route> + Send,
V: 'static + Clone + WalkVisitor<VOut, Route> + Send + Sync,
VOut: 'static + Send,
Route: 'static + Send + Clone + StepRoute,
{
@ -719,6 +787,13 @@ where
let walk_roots: Vec<(Option<Route>, OutgoingEdge)> =
walk_roots.into_iter().map(|e| (None, e)).collect();
let checker = Arc::new(Checker {
include_edge_types,
always_emit_edge_types,
keep_edge_paths,
visitor: visitor.clone(),
});
published_bookmarks
.map_ok(move |published_bookmarks| {
let published_bookmarks = Arc::new(published_bookmarks);
@ -731,7 +806,8 @@ where
published_bookmarks,
repo,
scuba,
visitor
visitor,
checker,
);
// Each step returns the walk result, and next steps
async move {
@ -756,6 +832,7 @@ where
)
.boxed()
}),
checker,
keep_edge_paths,
);
@ -782,6 +859,7 @@ async fn walk_one<V, VOut, Route>(
mut scuba: ScubaSampleBuilder,
published_bookmarks: Arc<HashMap<BookmarkName, ChangesetId>>,
heads_fetcher: HeadsFetcher,
checker: Arc<Checker<V>>,
keep_edge_paths: bool,
) -> Result<(VOut, Vec<(Option<Route>, OutgoingEdge)>), Error>
where
@ -845,7 +923,7 @@ where
hg_file_node_step(ctx.clone(), &repo, path, hg_file_node_id).await
}
Node::HgManifest((path, hg_manifest_id)) => {
hg_manifest_step(ctx.clone(), &repo, path, hg_manifest_id, keep_edge_paths).await
hg_manifest_step(ctx.clone(), &repo, &checker, path, hg_manifest_id).await
}
// Content
Node::FileContent(content_id) => file_content_step(ctx.clone(), &repo, content_id),