mononoke: more efficient known() method

Summary: Since we have changesets, we can answer known queries more efficiently.

Reviewed By: farnz

Differential Revision: D7534911

fbshipit-source-id: 72f75c9d9a6cd7f1464660eb0ec16f94ca24f33c
This commit is contained in:
Stanislau Hlebik 2018-04-07 10:57:56 -07:00 committed by Facebook Github Bot
parent b145efaa1a
commit e702e89c81

View File

@ -49,8 +49,7 @@ use blobrepo::BlobRepo;
use errors::*;
use repoinfo::RepoGenCache;
use revset::{AncestorsNodeStream, IntersectNodeStream, NodeStream, SetDifferenceNodeStream,
SingleNodeHash, UnionNodeStream};
use revset::{AncestorsNodeStream, NodeStream, SetDifferenceNodeStream, UnionNodeStream};
const METAKEYFLAG: &str = "f";
const METAKEYSIZE: &str = "s";
@ -630,73 +629,16 @@ impl HgCommands for RepoClient {
// @wireprotocommand('known', 'nodes *'), but the '*' is ignored
fn known(&self, nodes: Vec<NodeHash>) -> HgCommandRes<Vec<bool>> {
info!(self.logger, "known: {:?}", nodes);
let repo_generation = &self.repo.repo_generation;
let hgrepo = &self.repo.hgrepo;
let hgrepo = self.repo.hgrepo.clone();
let scuba = self.repo.scuba.clone();
let sample = self.repo.scuba_sample(ops::KNOWN);
let remote = self.repo.remote.clone();
// Ultimately, this block takes all ancestors of heads in this repo intersected with
// the nodes passed in by the client, and then returns a Vec<bool>, true if the
// intersection contains the matching node in nodes, false if it does not.
// Note that revsets are lazy, and will not generate unnecessary nodes.
hgrepo
.get_heads()
// Convert Stream<Heads> into Stream<Ancestors<Heads>>
.map({
let repo_generation = repo_generation.clone();
let hgrepo = hgrepo.clone();
move |hash| AncestorsNodeStream::new(&hgrepo, repo_generation.clone(), hash).boxed()
})
// Convert Stream<Ancestors<Heads>>> into Future<Vec<Ancestors<Heads>>>
.collect()
// Do the next few steps inside the Future; the parameter to the closure is
// Vec<Ancestors<Heads>>
.map({
let repo_generation = repo_generation.clone();
let hgrepo = hgrepo.clone();
let nodes = nodes.clone();
move |vec| {
// Intersect the union of the Vec<Ancestors<Heads>> that's passed in, with
// a union of the known nodes the client asked about.
IntersectNodeStream::new(
&hgrepo,
repo_generation.clone(),
vec![
// This is the union of all ancestors of heads
UnionNodeStream::new(&hgrepo, repo_generation.clone(), vec).boxed(),
// This is the union of all passed in nodes.
UnionNodeStream::new(
&hgrepo,
repo_generation,
nodes.into_iter().map({
let hgrepo = hgrepo.clone();
move |node| SingleNodeHash::new(node, &hgrepo).boxed()
}),
).boxed(),
],
// collect() below will result in a Future<Vec<NodeHash>> which is all
// nodes that are both an ancestor of a get_heads() head and were
// passed in by the client
).collect()
.from_err::<hgproto::Error>()
}
})
// We have a Future<Future<Vec<NodeHash>>> - collapse one layer of Future.
.flatten()
// Finally, within the Future, use the Vec<NodeHash> that's only nodes that were
// passed in by the client and that are ancestors of a get_heads() head to convert
// the Vec of client known nodes to a Vec<bool> telling the client if we also
// know of the nodes it asked us about.
.map(move |known| {
nodes
.iter()
.map(|node| known.contains(node))
.collect::<Vec<bool>>()
})
.timed(move |stats, _| {
add_common_stats_and_send_to_scuba(scuba, sample, stats, remote)
})
future::join_all(
nodes
.into_iter()
.map(move |node| hgrepo.changeset_exists(&HgChangesetId::new(node))),
).timed(move |stats, _| add_common_stats_and_send_to_scuba(scuba, sample, stats, remote))
.boxify()
}