From 091f06f47d5e92785b4ca6d43e1a90ab6ec3f31c Mon Sep 17 00:00:00 2001 From: Mateusz Kwapich Date: Thu, 23 Jul 2020 07:31:40 -0700 Subject: [PATCH] save the visited nodes during skiplist traversal Summary: I'm planning to use it in my next diff to power `find_merges` functionality. Reviewed By: StanislavGlebik Differential Revision: D22457898 fbshipit-source-id: 76c3f107fd8b5bbef96e978037be31efca0f9841 --- .../reachabilityindex/skiplist/Cargo.toml | 2 +- .../reachabilityindex/skiplist/src/lib.rs | 117 ++++++++++++++---- 2 files changed, 95 insertions(+), 24 deletions(-) diff --git a/eden/mononoke/reachabilityindex/skiplist/Cargo.toml b/eden/mononoke/reachabilityindex/skiplist/Cargo.toml index 9eee8633a9..0adef79ff9 100644 --- a/eden/mononoke/reachabilityindex/skiplist/Cargo.toml +++ b/eden/mononoke/reachabilityindex/skiplist/Cargo.toml @@ -15,7 +15,6 @@ mononoke_types = { path = "../../mononoke_types" } reachabilityindex = { path = ".." } skiplist-thrift = { path = "../if" } tunables = { path = "../../tunables" } -cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } fbthrift = { git = "https://github.com/facebook/fbthrift.git", branch = "master" } anyhow = "1.0" async-trait = "0.1.29" @@ -34,6 +33,7 @@ fixtures = { path = "../../tests/fixtures" } revset = { path = "../../revset" } test-helpers = { path = "../test-helpers" } async_unit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } +cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } futures-old = { package = "futures", version = "0.1" } diff --git a/eden/mononoke/reachabilityindex/skiplist/src/lib.rs b/eden/mononoke/reachabilityindex/skiplist/src/lib.rs index e88dcf596b..e51074dd7d 100644 --- a/eden/mononoke/reachabilityindex/skiplist/src/lib.rs +++ b/eden/mononoke/reachabilityindex/skiplist/src/lib.rs @@ -15,10 +15,9 @@ use async_trait::async_trait; use blobstore::Blobstore; use bytes::Bytes; use chashmap::CHashMap; -use cloned::cloned; use context::{CoreContext, PerfCounterType}; use futures::future::try_join_all; -use futures::stream::{self, futures_unordered::FuturesUnordered, TryStreamExt}; +use futures::stream::{futures_unordered::FuturesUnordered, TryStreamExt}; use futures_util::compat::Future01CompatExt; use futures_util::try_join; use maplit::{hashmap, hashset}; @@ -40,7 +39,7 @@ const DEFAULT_EDGE_COUNT: u32 = 10; // Each indexed node fits into one of two categories: // - It has skiplist edges // - It only has edges to its parents. -#[derive(Clone)] +#[derive(Clone, Debug, PartialEq)] pub enum SkiplistNodeType { SingleEdge((ChangesetId, Generation)), // A list of skip edges which keep doubling @@ -246,6 +245,7 @@ async fn compute_single_skip_edge( skip_list_edges, initial_frontier, target_gen, + &None, ) .await?; @@ -579,6 +579,7 @@ impl ReachabilityIndex for SkiplistIndex { &self.skip_list_edges, NodeFrontier::new(hashmap! {desc_gen => hashset!{desc_hash}}), anc_gen, + &None, ) .await?; match frontier.get_all_changesets_for_gen_num(anc_gen) { @@ -588,12 +589,42 @@ impl ReachabilityIndex for SkiplistIndex { } } +/// A structure to hold all the visited skiplist edges during a single +/// traversal in a "reverse" mapping: ancestor -> (child, is_child_a_merge_commit) +/// Such structure allows to traverse the graph from ancestor to descendants. +/// +/// The merge-commit bit is the information we use for finding merges later without consulting the +/// commit graph. +struct SkiplistTraversalTrace(CHashMap>); + +impl SkiplistTraversalTrace { + pub fn new() -> Self { + SkiplistTraversalTrace(CHashMap::new()) + } + + pub fn inner(&self) -> &CHashMap> { + &self.0 + } + + pub fn add(&self, ancestor: ChangesetId, child: (ChangesetId, bool)) { + self.0.alter(ancestor, |old_val| { + if let Some(mut old_val) = old_val { + old_val.push(child); + Some(old_val) + } else { + Some(vec![child]) + } + }); + } +} + // Take all changesets from `all_cs_ids` that have skiplist edges in `skip_edges` and moves them. // Returns changesets that wasn't moved and a NodeFrontier of moved nodes fn move_skippable_nodes( skip_edges: Arc, all_cs_ids: Vec, gen: Generation, + trace: &Option<&SkiplistTraversalTrace>, ) -> (Vec, NodeFrontier) { let mut no_skiplist_edges = vec![]; let mut node_frontier = NodeFrontier::default(); @@ -604,6 +635,9 @@ fn move_skippable_nodes( SkiplistNodeType::SingleEdge(edge_pair) => { if edge_pair.1 >= gen { node_frontier.insert(edge_pair.clone()); + if let Some(trace) = trace { + trace.add(edge_pair.0, (cs_id, false)) + } } else { no_skiplist_edges.push(cs_id); } @@ -616,6 +650,9 @@ fn move_skippable_nodes( .cloned(); if let Some(edge_pair) = best_edge { node_frontier.insert(edge_pair); + if let Some(trace) = trace { + trace.add(edge_pair.0, (cs_id, false)) + } } else { no_skiplist_edges.push(cs_id); } @@ -623,6 +660,9 @@ fn move_skippable_nodes( SkiplistNodeType::ParentEdges(edges) => { for edge_pair in edges { node_frontier.insert(*edge_pair); + if let Some(trace) = trace { + trace.add(edge_pair.0, (cs_id, edges.len() > 1)) + } } } } @@ -640,32 +680,37 @@ async fn move_nonskippable_nodes( ctx: &CoreContext, changeset_fetcher: &Arc, cs_ids: Vec, + trace: &Option<&SkiplistTraversalTrace>, ) -> Result, Error> { - cs_ids + let changeset_parent_gen = cs_ids .into_iter() .map(|cs_id| async move { - Ok::<_, Error>(stream::iter( - get_parents(ctx, changeset_fetcher, cs_id) - .await? - .into_iter() - .map(Ok::<_, Error>), - )) + Ok::<_, Error>((cs_id, get_parents(ctx, changeset_fetcher, cs_id).await?)) }) .collect::>() - .try_flatten() - .and_then(|p| { - cloned!(ctx, changeset_fetcher); - async move { - let gen_num = changeset_fetcher - .get_generation_number(ctx.clone(), p) - .compat() - .await?; - let res: Result<_, Error> = Ok((p, gen_num)); - res - } + .and_then(|(cs_id, parents)| async move { + let parent_gens = try_join_all(parents.into_iter().map(|p| async move { + let gen = fetch_generation(ctx, changeset_fetcher, p).await?; + Ok::<_, Error>((p, gen)) + })) + .await?; + Ok((cs_id, parent_gens)) }) .try_collect::>() - .await + .await?; + + if let Some(trace) = trace { + for (cs_id, parent_gen) in changeset_parent_gen.iter() { + for (p, _gen) in parent_gen { + trace.add(*p, (*cs_id, parent_gen.len() > 1)); + } + } + } + Ok(changeset_parent_gen + .into_iter() + .map(|(_cs_id, parent_gens)| parent_gens) + .flatten() + .collect()) } /// Advances the node frontier towards the target generation by a single conceptual step. @@ -683,6 +728,7 @@ async fn process_frontier_single_skip( skip_edges: &Arc, mut node_frontier: NodeFrontier, target_gen: Generation, + trace: &Option<&SkiplistTraversalTrace>, ) -> Result<(NodeFrontier, u64), Error> { let old_max_gen = if let Some(val) = node_frontier.max_gen() { if val <= target_gen { @@ -700,6 +746,7 @@ async fn process_frontier_single_skip( skip_edges.clone(), all_cs_ids.into_iter().collect(), target_gen, + trace, ); if skipped_frontier.is_empty() { ctx.perf_counters() @@ -715,7 +762,7 @@ async fn process_frontier_single_skip( } } - let gen_cs = move_nonskippable_nodes(ctx, changeset_fetcher, no_skiplist_edges) + let gen_cs = move_nonskippable_nodes(ctx, changeset_fetcher, no_skiplist_edges, trace) .await? .into_iter(); @@ -741,6 +788,7 @@ async fn process_frontier( skip_edges: &Arc, node_frontier: NodeFrontier, max_gen: Generation, + trace: &Option<&SkiplistTraversalTrace>, ) -> Result { let max_skips_without_yield = tunables::tunables().get_skiplist_max_skips_without_yield(); let mut skips_without_yield = 0; @@ -753,6 +801,7 @@ async fn process_frontier( skip_edges, node_frontier, max_gen, + trace, ) .await?; node_frontier = new_node_frontier; @@ -788,6 +837,7 @@ impl LeastCommonAncestorsHint for SkiplistIndex { &self.skip_list_edges, node_frontier, gen, + &None, ) .await } @@ -956,6 +1006,7 @@ mod test { use blobrepo::BlobRepo; use bookmarks::BookmarkName; use chashmap::CHashMap; + use cloned::cloned; use context::CoreContext; use fbinit::FacebookInit; use futures::compat::Future01CompatExt; @@ -1971,6 +2022,7 @@ mod test { &skip_list_edges, initial_frontier, max_gen, + &None, ) .await } @@ -2570,6 +2622,7 @@ mod test { &sli.skip_list_edges, NodeFrontier::new(starting_frontier_map.clone()), Generation::new(2), + &None, ) .await; assert_eq!(f.unwrap(), NodeFrontier::new(expected_gen_2_frontier_map)); @@ -2577,15 +2630,33 @@ mod test { let mut expected_root_frontier_map = HashMap::new(); expected_root_frontier_map .insert(Generation::new(1), vec![root_node].into_iter().collect()); + let mut trace = SkiplistTraversalTrace::new(); let f = process_frontier( &ctx, &repo.get_changeset_fetcher(), &sli.skip_list_edges, NodeFrontier::new(starting_frontier_map), Generation::new(1), + &Some(&mut trace), ) .await; assert_eq!(f.unwrap(), NodeFrontier::new(expected_root_frontier_map)); + let (cs_id, skiplist_node) = trace + .inner() + .get(&root_node) + .unwrap() + .get(0) + .unwrap() + .clone(); + + if let Some(_skiplist_edges) = sli.get_skip_edges(cs_id) { + // When the index is present we see if what's in skiplist for cs_id + // matches what's in the trace. + assert_eq!(skiplist_node, false,); + } else { + // When the index is empty, we check that parent edge was traversed. + assert_eq!(skiplist_node, false,); + } } async fn test_lca(