migrate query_reachability internal implementation to async/await syntax

Summary:
NOTE: this diff doesn't change the public API of the trait - that's why we do
`compat()` and `boxify()`. This change will follow later in the stack.

Reviewed By: StanislavGlebik

Differential Revision: D21937308

fbshipit-source-id: 11bef4c5087a9ac06e209685d3281483b804c394
This commit is contained in:
Mateusz Kwapich 2020-06-15 02:53:40 -07:00 committed by Facebook GitHub Bot
parent 592e696146
commit fc05b8d667

View File

@ -31,7 +31,7 @@ use mononoke_types::{ChangesetId, Generation, FIRST_GENERATION};
use common::{
advance_bfs_layer, changesets_with_generation_numbers, check_if_node_exists, fetch_generation,
fetch_generation_and_join, get_parents,
get_parents,
};
use reachabilityindex::{errors::*, LeastCommonAncestorsHint, NodeFrontier, ReachabilityIndex};
@ -501,35 +501,38 @@ impl ReachabilityIndex for SkiplistIndex {
anc_hash: ChangesetId,
) -> BoxFuture<bool, Error> {
cloned!(self.skip_list_edges);
fetch_generation_and_join(ctx.clone(), changeset_fetcher.clone(), desc_hash)
.join(fetch_generation_and_join(
async move {
let (anc_gen, desc_gen) = try_join!(
changeset_fetcher
.get_generation_number(ctx.clone(), anc_hash)
.compat(),
changeset_fetcher
.get_generation_number(ctx.clone(), desc_hash)
.compat()
)?;
ctx.perf_counters()
.set_counter(PerfCounterType::SkiplistAncestorGen, anc_gen.value() as i64);
ctx.perf_counters().set_counter(
PerfCounterType::SkiplistDescendantGen,
desc_gen.value() as i64,
);
let frontier = process_frontier(
ctx.clone(),
changeset_fetcher.clone(),
anc_hash,
))
.and_then(move |((desc_hash, desc_gen), (anc_hash, anc_gen))| {
ctx.perf_counters()
.set_counter(PerfCounterType::SkiplistAncestorGen, anc_gen.value() as i64);
ctx.perf_counters().set_counter(
PerfCounterType::SkiplistDescendantGen,
desc_gen.value() as i64,
);
process_frontier(
ctx.clone(),
changeset_fetcher,
skip_list_edges,
NodeFrontier::new(hashmap! {desc_gen => hashset!{desc_hash}}),
anc_gen,
)
.map(move |frontier| {
match frontier.get_all_changesets_for_gen_num(anc_gen) {
Some(cs_ids) => cs_ids.contains(&anc_hash),
None => false,
}
})
})
.boxify()
changeset_fetcher,
skip_list_edges,
NodeFrontier::new(hashmap! {desc_gen => hashset!{desc_hash}}),
anc_gen,
)
.compat()
.await?;
match frontier.get_all_changesets_for_gen_num(anc_gen) {
Some(cs_ids) => Ok(cs_ids.contains(&anc_hash)),
None => Ok(false),
}
}
.boxed()
.compat()
.boxify()
}
}