commit_graph: parallelize calculating disjoint segments

Summary: Each call to disjoint_segments is completely independent, so let's try to run them in parallel.

Reviewed By: markbt

Differential Revision: D46801400

fbshipit-source-id: 979ddaed4ac81d395debbea89b7a2cd84af25a0b
This commit is contained in:
Youssef Ibrahim 2023-06-26 05:12:38 -07:00 committed by Facebook GitHub Bot
parent 7d9641bc72
commit efda6b6aae

View File

@ -20,6 +20,9 @@ use commit_graph_types::segments::ChangesetSegmentParent;
use commit_graph_types::storage::CommitGraphStorage;
use commit_graph_types::storage::Prefetch;
use context::CoreContext;
use futures::stream;
use futures::StreamExt;
use futures::TryStreamExt;
use futures_stats::TimedTryFutureExt;
use mononoke_types::ChangesetId;
use mononoke_types::Generation;
@ -471,7 +474,7 @@ impl CommitGraph {
self.segment_frontier(ctx, common)
)?;
let mut difference_segments = vec![];
let mut difference_segments_futures = vec![];
while let Some((generation, segments)) = heads_segment_frontier.segments.pop_last() {
self.lower_segment_frontier(ctx, &mut common_segment_frontier, generation)
@ -495,10 +498,12 @@ impl CommitGraph {
vec![]
}
};
difference_segments.extend(
self.disjoint_segments(ctx, base, heads.into_iter().collect(), common)
.await?,
);
difference_segments_futures.push(self.disjoint_segments(
ctx,
base,
heads.into_iter().collect(),
common,
));
}
let all_edges = self
@ -529,7 +534,12 @@ impl CommitGraph {
}
}
Ok(difference_segments)
stream::iter(difference_segments_futures)
.buffered(100)
.map_ok(|segments| stream::iter(segments).map(Ok))
.try_flatten()
.try_collect()
.await
}
/// Returns all changesets in a segment in reverse topological order, verifying