save the visited nodes during skiplist traversal

Summary: I'm planning to use it in my next diff to power `find_merges` functionality.

Reviewed By: StanislavGlebik

Differential Revision: D22457898

fbshipit-source-id: 76c3f107fd8b5bbef96e978037be31efca0f9841
This commit is contained in:
Mateusz Kwapich 2020-07-23 07:31:40 -07:00 committed by Facebook GitHub Bot
parent 3f8db5fda3
commit 091f06f47d
2 changed files with 95 additions and 24 deletions

View File

@ -15,7 +15,6 @@ mononoke_types = { path = "../../mononoke_types" }
reachabilityindex = { path = ".." }
skiplist-thrift = { path = "../if" }
tunables = { path = "../../tunables" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbthrift = { git = "https://github.com/facebook/fbthrift.git", branch = "master" }
anyhow = "1.0"
async-trait = "0.1.29"
@ -34,6 +33,7 @@ fixtures = { path = "../../tests/fixtures" }
revset = { path = "../../revset" }
test-helpers = { path = "../test-helpers" }
async_unit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures-old = { package = "futures", version = "0.1" }

View File

@ -15,10 +15,9 @@ use async_trait::async_trait;
use blobstore::Blobstore;
use bytes::Bytes;
use chashmap::CHashMap;
use cloned::cloned;
use context::{CoreContext, PerfCounterType};
use futures::future::try_join_all;
use futures::stream::{self, futures_unordered::FuturesUnordered, TryStreamExt};
use futures::stream::{futures_unordered::FuturesUnordered, TryStreamExt};
use futures_util::compat::Future01CompatExt;
use futures_util::try_join;
use maplit::{hashmap, hashset};
@ -40,7 +39,7 @@ const DEFAULT_EDGE_COUNT: u32 = 10;
// Each indexed node fits into one of two categories:
// - It has skiplist edges
// - It only has edges to its parents.
#[derive(Clone)]
#[derive(Clone, Debug, PartialEq)]
pub enum SkiplistNodeType {
SingleEdge((ChangesetId, Generation)),
// A list of skip edges which keep doubling
@ -246,6 +245,7 @@ async fn compute_single_skip_edge(
skip_list_edges,
initial_frontier,
target_gen,
&None,
)
.await?;
@ -579,6 +579,7 @@ impl ReachabilityIndex for SkiplistIndex {
&self.skip_list_edges,
NodeFrontier::new(hashmap! {desc_gen => hashset!{desc_hash}}),
anc_gen,
&None,
)
.await?;
match frontier.get_all_changesets_for_gen_num(anc_gen) {
@ -588,12 +589,42 @@ impl ReachabilityIndex for SkiplistIndex {
}
}
/// A structure to hold all the visited skiplist edges during a single
/// traversal in a "reverse" mapping: ancestor -> (child, is_child_a_merge_commit)
/// Such structure allows to traverse the graph from ancestor to descendants.
///
/// The merge-commit bit is the information we use for finding merges later without consulting the
/// commit graph.
struct SkiplistTraversalTrace(CHashMap<ChangesetId, Vec<(ChangesetId, bool)>>);
impl SkiplistTraversalTrace {
pub fn new() -> Self {
SkiplistTraversalTrace(CHashMap::new())
}
pub fn inner(&self) -> &CHashMap<ChangesetId, Vec<(ChangesetId, bool)>> {
&self.0
}
pub fn add(&self, ancestor: ChangesetId, child: (ChangesetId, bool)) {
self.0.alter(ancestor, |old_val| {
if let Some(mut old_val) = old_val {
old_val.push(child);
Some(old_val)
} else {
Some(vec![child])
}
});
}
}
// Take all changesets from `all_cs_ids` that have skiplist edges in `skip_edges` and moves them.
// Returns changesets that wasn't moved and a NodeFrontier of moved nodes
fn move_skippable_nodes(
skip_edges: Arc<SkiplistEdgeMapping>,
all_cs_ids: Vec<ChangesetId>,
gen: Generation,
trace: &Option<&SkiplistTraversalTrace>,
) -> (Vec<ChangesetId>, NodeFrontier) {
let mut no_skiplist_edges = vec![];
let mut node_frontier = NodeFrontier::default();
@ -604,6 +635,9 @@ fn move_skippable_nodes(
SkiplistNodeType::SingleEdge(edge_pair) => {
if edge_pair.1 >= gen {
node_frontier.insert(edge_pair.clone());
if let Some(trace) = trace {
trace.add(edge_pair.0, (cs_id, false))
}
} else {
no_skiplist_edges.push(cs_id);
}
@ -616,6 +650,9 @@ fn move_skippable_nodes(
.cloned();
if let Some(edge_pair) = best_edge {
node_frontier.insert(edge_pair);
if let Some(trace) = trace {
trace.add(edge_pair.0, (cs_id, false))
}
} else {
no_skiplist_edges.push(cs_id);
}
@ -623,6 +660,9 @@ fn move_skippable_nodes(
SkiplistNodeType::ParentEdges(edges) => {
for edge_pair in edges {
node_frontier.insert(*edge_pair);
if let Some(trace) = trace {
trace.add(edge_pair.0, (cs_id, edges.len() > 1))
}
}
}
}
@ -640,32 +680,37 @@ async fn move_nonskippable_nodes(
ctx: &CoreContext,
changeset_fetcher: &Arc<dyn ChangesetFetcher>,
cs_ids: Vec<ChangesetId>,
trace: &Option<&SkiplistTraversalTrace>,
) -> Result<Vec<(ChangesetId, Generation)>, Error> {
cs_ids
let changeset_parent_gen = cs_ids
.into_iter()
.map(|cs_id| async move {
Ok::<_, Error>(stream::iter(
get_parents(ctx, changeset_fetcher, cs_id)
.await?
.into_iter()
.map(Ok::<_, Error>),
))
Ok::<_, Error>((cs_id, get_parents(ctx, changeset_fetcher, cs_id).await?))
})
.collect::<FuturesUnordered<_>>()
.try_flatten()
.and_then(|p| {
cloned!(ctx, changeset_fetcher);
async move {
let gen_num = changeset_fetcher
.get_generation_number(ctx.clone(), p)
.compat()
.await?;
let res: Result<_, Error> = Ok((p, gen_num));
res
}
.and_then(|(cs_id, parents)| async move {
let parent_gens = try_join_all(parents.into_iter().map(|p| async move {
let gen = fetch_generation(ctx, changeset_fetcher, p).await?;
Ok::<_, Error>((p, gen))
}))
.await?;
Ok((cs_id, parent_gens))
})
.try_collect::<Vec<_>>()
.await
.await?;
if let Some(trace) = trace {
for (cs_id, parent_gen) in changeset_parent_gen.iter() {
for (p, _gen) in parent_gen {
trace.add(*p, (*cs_id, parent_gen.len() > 1));
}
}
}
Ok(changeset_parent_gen
.into_iter()
.map(|(_cs_id, parent_gens)| parent_gens)
.flatten()
.collect())
}
/// Advances the node frontier towards the target generation by a single conceptual step.
@ -683,6 +728,7 @@ async fn process_frontier_single_skip(
skip_edges: &Arc<SkiplistEdgeMapping>,
mut node_frontier: NodeFrontier,
target_gen: Generation,
trace: &Option<&SkiplistTraversalTrace>,
) -> Result<(NodeFrontier, u64), Error> {
let old_max_gen = if let Some(val) = node_frontier.max_gen() {
if val <= target_gen {
@ -700,6 +746,7 @@ async fn process_frontier_single_skip(
skip_edges.clone(),
all_cs_ids.into_iter().collect(),
target_gen,
trace,
);
if skipped_frontier.is_empty() {
ctx.perf_counters()
@ -715,7 +762,7 @@ async fn process_frontier_single_skip(
}
}
let gen_cs = move_nonskippable_nodes(ctx, changeset_fetcher, no_skiplist_edges)
let gen_cs = move_nonskippable_nodes(ctx, changeset_fetcher, no_skiplist_edges, trace)
.await?
.into_iter();
@ -741,6 +788,7 @@ async fn process_frontier(
skip_edges: &Arc<SkiplistEdgeMapping>,
node_frontier: NodeFrontier,
max_gen: Generation,
trace: &Option<&SkiplistTraversalTrace>,
) -> Result<NodeFrontier, Error> {
let max_skips_without_yield = tunables::tunables().get_skiplist_max_skips_without_yield();
let mut skips_without_yield = 0;
@ -753,6 +801,7 @@ async fn process_frontier(
skip_edges,
node_frontier,
max_gen,
trace,
)
.await?;
node_frontier = new_node_frontier;
@ -788,6 +837,7 @@ impl LeastCommonAncestorsHint for SkiplistIndex {
&self.skip_list_edges,
node_frontier,
gen,
&None,
)
.await
}
@ -956,6 +1006,7 @@ mod test {
use blobrepo::BlobRepo;
use bookmarks::BookmarkName;
use chashmap::CHashMap;
use cloned::cloned;
use context::CoreContext;
use fbinit::FacebookInit;
use futures::compat::Future01CompatExt;
@ -1971,6 +2022,7 @@ mod test {
&skip_list_edges,
initial_frontier,
max_gen,
&None,
)
.await
}
@ -2570,6 +2622,7 @@ mod test {
&sli.skip_list_edges,
NodeFrontier::new(starting_frontier_map.clone()),
Generation::new(2),
&None,
)
.await;
assert_eq!(f.unwrap(), NodeFrontier::new(expected_gen_2_frontier_map));
@ -2577,15 +2630,33 @@ mod test {
let mut expected_root_frontier_map = HashMap::new();
expected_root_frontier_map
.insert(Generation::new(1), vec![root_node].into_iter().collect());
let mut trace = SkiplistTraversalTrace::new();
let f = process_frontier(
&ctx,
&repo.get_changeset_fetcher(),
&sli.skip_list_edges,
NodeFrontier::new(starting_frontier_map),
Generation::new(1),
&Some(&mut trace),
)
.await;
assert_eq!(f.unwrap(), NodeFrontier::new(expected_root_frontier_map));
let (cs_id, skiplist_node) = trace
.inner()
.get(&root_node)
.unwrap()
.get(0)
.unwrap()
.clone();
if let Some(_skiplist_edges) = sli.get_skip_edges(cs_id) {
// When the index is present we see if what's in skiplist for cs_id
// matches what's in the trace.
assert_eq!(skiplist_node, false,);
} else {
// When the index is empty, we check that parent edge was traversed.
assert_eq!(skiplist_node, false,);
}
}
async fn test_lca(