diff --git a/lib/dag/src/segment.rs b/lib/dag/src/segment.rs index 07a5834f52..499d4de425 100644 --- a/lib/dag/src/segment.rs +++ b/lib/dag/src/segment.rs @@ -546,20 +546,17 @@ impl Dag { // The algorithm works as follows: // - Iterate through level N segments [1]. // - Considering a level N segment S: - // Do S or parents of S overlap with `set`? - // - No: Skip S and check the next level N segment. - // - Yes: Is S a flat segment? + // Could we take the entire S? + // - If `set` covers `S - S.head + S.parents`, then yes, take S + // and continue with the next level N segment. + // Could we ignore the entire S and check the next level N segment? + // - If (S + S.parents) do not overlap with `set`, then yes, skip. + // No fast paths. Is S a flat segment? // - No: Iterate through level N-1 segments covered by S, // recursively (goto [1]). // - Yes: Figure out children in the flat segment. // Push them to the result. - // There could be a fast path if a high-level segment contains - // more information, like "Does this segment have roots without - // parents"? If that is marked "no", and the `set` covers - // `S - S.head + S.parents`. Then we can push S to result - // without checking lower level segments. - // FIXME: The algorithm relies on the fact that the highest level // segments contain all known ids, which is not guaranteed in all // setups (ex. build_high_level_segments can have "drop_last" set). @@ -579,6 +576,11 @@ impl Dag { break; } + let parents = seg.parents()?; + + // Count of parents overlapping with `set`. + let overlapped_parents = parents.iter().filter(|p| ctx.set.contains(**p)).count(); + // Remove the `high`. This segment cannot calculate // `children(high)`. If `high` matches a `parent` of // another segment, that segment will handle it. @@ -586,6 +588,19 @@ impl Dag { .set .intersection(&span.into()) .difference(&span.high.into()); + + if !seg.has_root()? { + // A segment must have at least one parent to be rootless. + debug_assert!(!parents.is_empty()); + // Fast path: Take the segment directly. + if overlapped_parents == parents.len() + && intersection.count() + 1 == span.count() + { + ctx.result.push_span(span); + continue; + } + } + if !intersection.is_empty() { if level > 0 { visit_segments(ctx, span, level - 1)?; @@ -601,7 +616,7 @@ impl Dag { } } - if seg.parents()?.into_iter().any(|p| ctx.set.contains(p)) { + if overlapped_parents > 0 { if level > 0 { visit_segments(ctx, span, level - 1)?; } else { diff --git a/lib/dag/src/spanset.rs b/lib/dag/src/spanset.rs index 5a1b2ea4a5..a54c154f43 100644 --- a/lib/dag/src/spanset.rs +++ b/lib/dag/src/spanset.rs @@ -62,7 +62,7 @@ impl Span { Self { low, high } } - fn count(self) -> u64 { + pub fn count(self) -> u64 { self.high - self.low + 1 }