mononoke: check if chunking in walker defer_visit()

Summary: Makes defer_visit return result, so we can detect if it is called when not chunking.

Reviewed By: farnz

Differential Revision: D29268346

fbshipit-source-id: b8ea503c2848adb5d7ca3fb0e61399be2930c3de
This commit is contained in:
Alex Hornby 2021-06-24 01:48:41 -07:00 committed by Facebook GitHub Bot
parent b73322e5ea
commit 3d59baacd5
7 changed files with 46 additions and 35 deletions

View File

@ -461,7 +461,7 @@ async fn run_one(
command.sampling_path_regex,
command.sampler,
job_params.enable_derive,
sub_params.tail_params.chunk_direction,
Some(sub_params.tail_params.chunk_direction),
);
let type_params = RepoWalkTypeParams {

View File

@ -63,7 +63,7 @@ impl<T> SamplingWalkVisitor<T> {
sample_path_regex: Option<Regex>,
sampler: Arc<T>,
enable_derive: bool,
chunk_direction: Direction,
chunk_direction: Option<Direction>,
) -> Self {
Self {
inner: WalkState::new(
@ -321,15 +321,18 @@ where
bcs_id: &ChangesetId,
walk_item: &OutgoingEdge,
route: Option<PathTrackingRoute<P>>,
) -> (
(WalkKeyOptPath<P>, WalkPayloadMtime, Option<StepStats>),
PathTrackingRoute<P>,
) {
) -> Result<
(
(WalkKeyOptPath<P>, WalkPayloadMtime, Option<StepStats>),
PathTrackingRoute<P>,
),
Error,
> {
let inner_route = route.as_ref().map(|_| EmptyRoute {});
let route = PathTrackingRoute::evolve(route, walk_item, None);
let ((n, _nd, stats), _inner_route) =
self.inner.defer_visit(bcs_id, walk_item, inner_route);
(
self.inner.defer_visit(bcs_id, walk_item, inner_route)?;
Ok((
(
WalkKeyOptPath {
node: n,
@ -339,7 +342,7 @@ where
stats,
),
route,
)
))
}
}
@ -426,15 +429,18 @@ where
bcs_id: &ChangesetId,
walk_item: &OutgoingEdge,
route: Option<EmptyRoute>,
) -> (
) -> Result<
(
WalkKeyOptPath<WrappedPathHash>,
WalkPayloadMtime,
Option<StepStats>,
(
WalkKeyOptPath<WrappedPathHash>,
WalkPayloadMtime,
Option<StepStats>,
),
EmptyRoute,
),
EmptyRoute,
) {
let ((n, nd, stats), route) = self.inner.defer_visit(bcs_id, walk_item, route);
Error,
> {
let ((n, nd, stats), route) = self.inner.defer_visit(bcs_id, walk_item, route)?;
let output = (
WalkKeyOptPath {
node: n,
@ -446,7 +452,7 @@ where
},
stats,
);
(output, route)
Ok((output, route))
}
}

View File

@ -547,7 +547,7 @@ async fn run_one(
None,
command.sampler,
job_params.enable_derive,
sub_params.tail_params.chunk_direction,
Some(sub_params.tail_params.chunk_direction),
);
let type_params = RepoWalkTypeParams {

View File

@ -386,7 +386,7 @@ async fn run_one(
None,
command.sampler,
job_params.enable_derive,
sub_params.tail_params.chunk_direction,
Some(sub_params.tail_params.chunk_direction),
);
let type_params = RepoWalkTypeParams {

View File

@ -170,7 +170,7 @@ pub struct WalkState {
include_edge_types: HashSet<EdgeType>,
always_emit_edge_types: HashSet<EdgeType>,
enable_derive: bool,
chunk_direction: Direction,
chunk_direction: Option<Direction>,
// Interning
bcs_ids: InternMap<ChangesetId, InternedId<ChangesetId>>,
hg_cs_ids: InternMap<HgChangesetId, InternedId<HgChangesetId>>,
@ -222,7 +222,7 @@ impl WalkState {
include_edge_types: HashSet<EdgeType>,
always_emit_edge_types: HashSet<EdgeType>,
enable_derive: bool,
chunk_direction: Direction,
chunk_direction: Option<Direction>,
) -> Self {
let fac = RandomState::default();
Self {
@ -493,7 +493,7 @@ impl WalkState {
if self.chunk_contains(id) {
self.record(&self.visited_bcs, &id)
} else {
if self.chunk_direction == Direction::NewestFirst
if self.chunk_direction == Some(Direction::NewestFirst)
&& !self.visited_bcs.contains_key(&id)
{
self.record_multi(&self.deferred_bcs, id, outgoing);
@ -569,7 +569,7 @@ impl WalkState {
if self.chunk_contains(id) {
self.record(&self.visited_changeset_info, &id)
} else {
if self.chunk_direction == Direction::NewestFirst
if self.chunk_direction == Some(Direction::NewestFirst)
&& !self.visited_changeset_info.contains_key(&id)
{
self.record_multi(&self.deferred_bcs, id, outgoing);
@ -975,18 +975,23 @@ impl WalkVisitor<(Node, Option<NodeData>, Option<StepStats>), EmptyRoute> for Wa
bcs_id: &ChangesetId,
walk_item: &OutgoingEdge,
_route: Option<EmptyRoute>,
) -> ((Node, Option<NodeData>, Option<StepStats>), EmptyRoute) {
) -> Result<((Node, Option<NodeData>, Option<StepStats>), EmptyRoute), Error> {
let node_data = match self.chunk_direction {
Direction::NewestFirst => {
Some(Direction::NewestFirst) => {
let i = self.bcs_ids.interned(bcs_id);
self.record_multi(&self.deferred_bcs, i, &walk_item);
None
}
// We'll never visit backward looking edges when running OldestFirst, so don't record them.
// returning Some for NodeData tells record_resolved_visit that we don't need to visit this node again if we see it.
Direction::OldestFirst => Some(NodeData::OutsideChunk),
Some(Direction::OldestFirst) => Some(NodeData::OutsideChunk),
None => bail!(
"Attempt to defer {:?} step {:?} when not chunking",
bcs_id,
walk_item
),
};
((walk_item.target.clone(), node_data, None), EmptyRoute {})
Ok(((walk_item.target.clone(), node_data, None), EmptyRoute {}))
}
}

View File

@ -185,7 +185,7 @@ impl ValidatingVisitor {
always_emit_edge_types: HashSet<EdgeType>,
enable_derive: bool,
lfs_threshold: Option<u64>,
chunk_direction: Direction,
chunk_direction: Option<Direction>,
) -> Self {
Self {
repo_stats_key,
@ -567,14 +567,14 @@ impl WalkVisitor<(Node, Option<CheckData>, Option<StepStats>), ValidateRoute>
bcs_id: &ChangesetId,
walk_item: &OutgoingEdge,
route: Option<ValidateRoute>,
) -> ((Node, Option<CheckData>, Option<StepStats>), ValidateRoute) {
) -> Result<((Node, Option<CheckData>, Option<StepStats>), ValidateRoute), Error> {
let ((node, _node_data, stats), _route) =
self.inner
.defer_visit(bcs_id, walk_item, Some(EmptyRoute {}));
(
.defer_visit(bcs_id, walk_item, Some(EmptyRoute {}))?;
Ok((
(node.clone(), None, stats),
ValidateRoute::next_route(route, node),
)
))
}
}
@ -939,7 +939,7 @@ async fn run_one(
always_emit_edge_types.clone(),
job_params.enable_derive,
sub_params.lfs_threshold,
sub_params.tail_params.chunk_direction,
Some(sub_params.tail_params.chunk_direction),
);
let type_params = RepoWalkTypeParams {

View File

@ -182,7 +182,7 @@ pub trait WalkVisitor<VOut, Route>: VisitOne {
bcs_id: &ChangesetId,
walk_item: &OutgoingEdge,
route: Option<Route>,
) -> (VOut, Route);
) -> Result<(VOut, Route), Error>;
}
// Visitor methods that are only needed during tailing
@ -2140,7 +2140,7 @@ where
let (vout, via, next) = match step_output {
StepOutput::Deferred(bcs_id) => {
let (vout, via) = visitor.defer_visit(&bcs_id, &walk_item, via);
let (vout, via) = visitor.defer_visit(&bcs_id, &walk_item, via)?;
(vout, via, vec![])
}
StepOutput::Done(node_data, children) => {