commit_graph: implement range_stream

Summary: Implements a stream that yields all changesets that are simulatenously a descendant of one changeset and a descendant of another, in topological order.

Reviewed By: markbt

Differential Revision: D45156176

fbshipit-source-id: 6c70d40252b5e2402ec1718e6cbe4c35ec8e9c1e
This commit is contained in:
Youssef Ibrahim 2023-04-26 14:15:50 -07:00 committed by Facebook GitHub Bot
parent c592cabeec
commit 3c4b3f792a
9 changed files with 218 additions and 6 deletions

View File

@ -17,7 +17,6 @@ context = { version = "0.1.0", path = "../../../server/context" }
facet = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
futures = { version = "0.3.28", features = ["async-await", "compat"] }
itertools = "0.10.3"
maplit = "1.0"
mononoke_types = { version = "0.1.0", path = "../../../mononoke_types" }
smallvec = { version = "1.6.1", features = ["serde", "specialization", "union"] }
vec1 = { version = "1", features = ["serde"] }

View File

@ -29,12 +29,12 @@ use commit_graph_types::storage::Prefetch;
use commit_graph_types::ChangesetParents;
use context::CoreContext;
use futures::stream;
use futures::stream::BoxStream;
use futures::Stream;
use futures::StreamExt;
use futures::TryStreamExt;
use itertools::Either;
use itertools::Itertools;
use maplit::hashset;
use mononoke_types::ChangesetId;
use mononoke_types::ChangesetIdPrefix;
use mononoke_types::ChangesetIdsResolvedFromPrefix;
@ -561,9 +561,7 @@ impl CommitGraph {
cs_id: ChangesetId,
) -> Result<ChangesetFrontier> {
let generation = self.changeset_generation_required(ctx, cs_id).await?;
let mut frontier = ChangesetFrontier::new();
frontier.insert(generation, hashset! { cs_id });
Ok(frontier)
Ok(ChangesetFrontier::new_single(cs_id, generation))
}
/// Obtain a frontier of changesets from a list of changeset ids, which
@ -840,6 +838,82 @@ impl CommitGraph {
.try_collect()
.await
}
pub async fn range_stream<'a>(
&'a self,
ctx: &'a CoreContext,
start_id: ChangesetId,
end_id: ChangesetId,
) -> Result<BoxStream<'a, ChangesetId>> {
let (start_generation, mut frontier) = futures::try_join!(
self.changeset_generation_required(ctx, start_id),
self.single_frontier(ctx, end_id)
)?;
let mut children: HashMap<ChangesetId, HashSet<(ChangesetId, Generation)>> =
Default::default();
let mut reached_start = false;
while let Some((gen, cs_ids)) = frontier.pop_last() {
let cs_ids = cs_ids.into_iter().collect::<Vec<_>>();
let all_edges = self
.storage
.fetch_many_edges_required(ctx, &cs_ids, Prefetch::for_p1_linear_traversal())
.await?;
reached_start |= cs_ids.contains(&start_id);
if gen > start_generation {
for (_, edges) in all_edges.into_iter() {
for parent in edges.parents.into_iter() {
children
.entry(parent.cs_id)
.or_default()
.insert((edges.node.cs_id, edges.node.generation));
frontier
.entry(parent.generation)
.or_default()
.insert(parent.cs_id);
}
}
}
}
if !reached_start {
return Ok(stream::empty().boxed());
}
struct RangeStreamState {
children: HashMap<ChangesetId, HashSet<(ChangesetId, Generation)>>,
upwards_frontier: ChangesetFrontier,
}
Ok(stream::unfold(
Box::new(RangeStreamState {
children,
upwards_frontier: ChangesetFrontier::new_single(start_id, start_generation),
}),
|mut state| async {
if let Some((_, cs_ids)) = state.upwards_frontier.pop_first() {
for cs_id in cs_ids.iter() {
if let Some(children) = state.children.get(cs_id) {
for (child, generation) in children.iter() {
state
.upwards_frontier
.entry(*generation)
.or_default()
.insert(*child);
}
}
}
Some((stream::iter(cs_ids), state))
} else {
None
}
},
)
.flatten()
.boxed())
}
}
#[async_trait]

View File

@ -13,6 +13,7 @@ commit_graph = { version = "0.1.0", path = "../commit_graph" }
commit_graph_types = { version = "0.1.0", path = "../commit_graph_types" }
context = { version = "0.1.0", path = "../../../server/context" }
drawdag = { version = "0.1.0", path = "../../../../scm/lib/drawdag" }
futures = { version = "0.3.28", features = ["async-await", "compat"] }
in_memory_commit_graph_storage = { version = "0.1.0", path = "../in_memory_commit_graph_storage" }
mononoke_types = { version = "0.1.0", path = "../../../mononoke_types" }
smallvec = { version = "1.6.1", features = ["serde", "specialization", "union"] }

View File

@ -37,6 +37,7 @@ macro_rules! impl_commit_graph_tests {
test_add_recursive,
test_add_recursive_many_changesets,
test_ancestors_frontier_with,
test_range_stream,
);
};
}
@ -680,3 +681,35 @@ pub async fn test_ancestors_frontier_with(
Ok(())
}
pub async fn test_range_stream(
ctx: CoreContext,
storage: Arc<dyn CommitGraphStorage>,
) -> Result<()> {
let graph = from_dag(
&ctx,
r##"
A-B-C-D-G-H---J-K
\ / \ /
E-F I
L-M-N-O-P-Q-R-S-T-U
"##,
storage.clone(),
)
.await?;
assert_range_stream(
&graph,
&ctx,
"A",
"K",
vec!["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K"],
)
.await?;
assert_range_stream(&graph, &ctx, "D", "K", vec!["D", "G", "H", "I", "J", "K"]).await?;
assert_range_stream(&graph, &ctx, "A", "U", vec![]).await?;
assert_range_stream(&graph, &ctx, "O", "T", vec!["O", "P", "Q", "R", "S", "T"]).await?;
Ok(())
}

View File

@ -14,6 +14,7 @@ use commit_graph::CommitGraph;
use commit_graph_types::edges::ChangesetNode;
use commit_graph_types::storage::CommitGraphStorage;
use context::CoreContext;
use futures::stream::StreamExt;
use mononoke_types::ChangesetId;
use mononoke_types::Generation;
@ -199,6 +200,54 @@ pub async fn assert_ancestors_difference(
Ok(())
}
async fn assert_topological_order(
graph: &CommitGraph,
ctx: &CoreContext,
cs_ids: &Vec<ChangesetId>,
) -> Result<()> {
let all_cs_ids: HashSet<ChangesetId> = cs_ids.iter().copied().collect();
let mut previous_cs_ids: HashSet<ChangesetId> = Default::default();
for cs_id in cs_ids {
let parents = graph.changeset_parents_required(ctx, *cs_id).await?;
// Check that each parent of cs_id either isn't contained in cs_ids
// or is found before cs_id.
assert!(
parents
.into_iter()
.all(|parent| !all_cs_ids.contains(&parent) || previous_cs_ids.contains(&parent))
);
previous_cs_ids.insert(*cs_id);
}
Ok(())
}
pub async fn assert_range_stream(
graph: &CommitGraph,
ctx: &CoreContext,
start: &str,
end: &str,
range: Vec<&str>,
) -> Result<()> {
let start_id = name_cs_id(start);
let end_id = name_cs_id(end);
let range_stream_cs_ids = graph
.range_stream(ctx, start_id, end_id)
.await?
.collect::<Vec<_>>()
.await;
assert_topological_order(graph, ctx, &range_stream_cs_ids).await?;
assert_eq!(
range_stream_cs_ids.into_iter().collect::<HashSet<_>>(),
range.into_iter().map(name_cs_id).collect::<HashSet<_>>()
);
Ok(())
}
pub async fn assert_ancestors_frontier_with(
graph: &CommitGraph,
ctx: &CoreContext,

View File

@ -13,6 +13,7 @@ abomonation_derive = "0.5"
anyhow = "1.0.65"
async-trait = "0.1.58"
context = { version = "0.1.0", path = "../../../server/context" }
maplit = "1.0"
mononoke_types = { version = "0.1.0", path = "../../../mononoke_types" }
smallvec = { version = "1.6.1", features = ["serde", "specialization", "union"] }
vec1 = { version = "1", features = ["serde"] }

View File

@ -13,6 +13,8 @@ use std::ops::Deref;
use std::ops::DerefMut;
use abomonation_derive::Abomonation;
use maplit::btreemap;
use maplit::hashset;
use mononoke_types::ChangesetId;
use mononoke_types::Generation;
use smallvec::SmallVec;
@ -82,6 +84,10 @@ impl ChangesetFrontier {
Self(Default::default())
}
pub fn new_single(cs_id: ChangesetId, generation: Generation) -> Self {
Self(btreemap! { generation => hashset! { cs_id }})
}
pub fn highest_generation_contains(&self, cs_id: ChangesetId, generation: Generation) -> bool {
match self.last_key_value() {
None => false,

View File

@ -9,6 +9,7 @@ mod ancestors_difference;
mod backfill;
mod backfill_one;
mod checkpoints;
mod range_stream;
use ancestors_difference::AncestorsDifferenceArgs;
use anyhow::Result;
@ -26,6 +27,7 @@ use commit_graph::CommitGraph;
use metaconfig_types::RepoConfig;
use mononoke_app::args::RepoArgs;
use mononoke_app::MononokeApp;
use range_stream::RangeStreamArgs;
use repo_identity::RepoIdentity;
#[derive(Parser)]
@ -43,8 +45,10 @@ pub enum CommitGraphSubcommand {
Backfill(BackfillArgs),
/// Backfill a commit and all of its missing ancestors.
BackfillOne(BackfillOneArgs),
/// Display ids of all commits that are ancestors of one set of commits (heads), excluding ancestors of another set of commits (common).
/// Display ids of all commits that are ancestors of one set of commits (heads), excluding ancestors of another set of commits (common) in reverse topological order.
AncestorsDifference(AncestorsDifferenceArgs),
/// Display ids of all commits that are simultaneously a descendant of one commit (start) and an ancestor of another (end) in topological order.
RangeStream(RangeStreamArgs),
}
#[facet::container]
@ -89,5 +93,8 @@ pub async fn run(app: MononokeApp, args: CommandArgs) -> Result<()> {
CommitGraphSubcommand::AncestorsDifference(args) => {
ancestors_difference::ancestors_difference(&ctx, &repo, args).await
}
CommitGraphSubcommand::RangeStream(args) => {
range_stream::range_stream(&ctx, &repo, args).await
}
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use anyhow::Result;
use clap::Args;
use commit_graph::CommitGraphRef;
use context::CoreContext;
use futures::try_join;
use futures::StreamExt;
use super::Repo;
use crate::commit_id::parse_commit_id;
#[derive(Args)]
pub struct RangeStreamArgs {
/// Commit ID of the start of the range.
#[clap(long)]
start: String,
/// Commit ID of the end of the range
#[clap(long)]
end: String,
}
pub async fn range_stream(ctx: &CoreContext, repo: &Repo, args: RangeStreamArgs) -> Result<()> {
let (start, end) = try_join!(
parse_commit_id(ctx, repo, &args.start),
parse_commit_id(ctx, repo, &args.end),
)?;
let mut range_stream = repo.commit_graph().range_stream(ctx, start, end).await?;
while let Some(cs_id) = range_stream.next().await {
println!("{}", cs_id);
}
Ok(())
}