mononoke: admin command to build and read skiplist indexes

Summary:
Let's add a command that builds and reads a skiplist indexes. This indexes will
be used by getbundle wireproto request to decrease the latency and cpu usage.

Note that we are saving only the longest "jump" from the skiplist. This is done
in order to save space.

Reviewed By: jsgf

Differential Revision: D13169018

fbshipit-source-id: 4d654284b0c0d8a579444816781419ba6ad86baa
This commit is contained in:
Stanislau Hlebik 2018-11-29 08:15:24 -08:00 committed by Facebook Github Bot
parent a48e88dc28
commit af03216ee2
6 changed files with 215 additions and 29 deletions

View File

@ -186,7 +186,7 @@ impl From<ReachabilityIndexError> for ErrorKind {
CheckExistenceFailed(s, t) => {
ErrorKind::NotFound(s.clone(), Some(CheckExistenceFailed(s, t).into()))
}
e @ GenerationFetchFailed(_) | e @ ParentsFetchFailed(_) => {
e @ GenerationFetchFailed(_) | e @ ParentsFetchFailed(_) | e @ UknownSkiplistThriftEncoding => {
ErrorKind::InternalError(e.into())
}
}

View File

@ -72,6 +72,7 @@ use repo_commit::*;
define_stats! {
prefix = "mononoke.blobrepo";
get_bonsai_changeset: timeseries(RATE, SUM),
get_bonsai_heads_maybe_stale: timeseries(RATE, SUM),
get_file_content: timeseries(RATE, SUM),
get_raw_hg_content: timeseries(RATE, SUM),
get_changesets: timeseries(RATE, SUM),
@ -659,6 +660,13 @@ impl BlobRepo {
})
}
pub fn get_bonsai_heads_maybe_stale(&self) -> impl Stream<Item = ChangesetId, Error = Error> {
STATS::get_bonsai_heads_maybe_stale.add_value(1);
self.bookmarks
.list_by_prefix(&BookmarkPrefix::empty(), &self.repoid)
.map(|(_, cs_id)| cs_id)
}
// TODO(stash): make it accept ChangesetId
pub fn changeset_exists(&self, changesetid: &HgChangesetId) -> BoxFuture<bool, Error> {
STATS::changeset_exists.add_value(1);

View File

@ -29,7 +29,9 @@ extern crate futures_ext;
extern crate manifoldblob;
extern crate mercurial_types;
extern crate mononoke_types;
extern crate reachabilityindex;
extern crate revset;
extern crate rust_thrift;
#[macro_use]
extern crate slog;
extern crate tempdir;
@ -39,7 +41,7 @@ mod config_repo;
mod bookmarks_manager;
use std::borrow::Borrow;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::fmt;
use std::io;
@ -48,7 +50,7 @@ use std::sync::Arc;
use clap::{App, Arg, SubCommand};
use failure::{err_msg, Error, Result};
use futures::future;
use futures::future::{self, loop_fn, ok, Loop};
use futures::prelude::*;
use futures::stream::iter_ok;
@ -63,7 +65,9 @@ use mercurial_types::{Changeset, HgChangesetEnvelope, HgChangesetId, HgFileEnvel
HgManifestEnvelope, HgManifestId, MPath, MPathElement, Manifest};
use mercurial_types::manifest::Content;
use mononoke_types::{BlobstoreBytes, BlobstoreValue, BonsaiChangeset, FileContents};
use reachabilityindex::{deserialize_skiplist_map, SkiplistIndex, SkiplistNodeType};
use revset::RangeNodeStream;
use rust_thrift::compact_protocol;
use slog::Logger;
const BLOBSTORE_FETCH: &'static str = "blobstore-fetch";
@ -71,10 +75,13 @@ const BONSAI_FETCH: &'static str = "bonsai-fetch";
const CONTENT_FETCH: &'static str = "content-fetch";
const CONFIG_REPO: &'static str = "config";
const BOOKMARKS: &'static str = "bookmarks";
const SKIPLIST: &'static str = "skiplist";
const HG_CHANGESET: &'static str = "hg-changeset";
const HG_CHANGESET_DIFF: &'static str = "diff";
const HG_CHANGESET_RANGE: &'static str = "range";
const SKIPLIST_BUILD: &'static str = "build";
const SKIPLIST_READ: &'static str = "read";
fn setup_app<'a, 'b>() -> App<'a, 'b> {
let blobstore_fetch = SubCommand::with_name(BLOBSTORE_FETCH)
@ -137,6 +144,23 @@ fn setup_app<'a, 'b>() -> App<'a, 'b> {
),
);
let skiplist = SubCommand::with_name(SKIPLIST)
.about("commands to build or read skiplist indexes")
.subcommand(
SubCommand::with_name(SKIPLIST_BUILD)
.about("build skiplist index")
.args_from_usage(
"<BLOBSTORE_KEY> 'Blobstore key where to store the built skiplist'",
),
)
.subcommand(
SubCommand::with_name(SKIPLIST_READ)
.about("read skiplist index")
.args_from_usage(
"<BLOBSTORE_KEY> 'Blobstore key from where to read the skiplist'",
),
);
let app = args::MononokeApp {
safe_writes: false,
hide_advanced_args: true,
@ -156,6 +180,7 @@ fn setup_app<'a, 'b>() -> App<'a, 'b> {
BOOKMARKS,
)))
.subcommand(hg_changeset)
.subcommand(skiplist)
}
fn fetch_content_from_manifest(
@ -393,6 +418,89 @@ fn hg_changeset_diff(
})
}
fn build_skiplist_index<S: ToString>(
repo: BlobRepo,
key: S,
logger: Logger,
) -> BoxFuture<(), Error> {
let blobstore = repo.get_blobstore();
let skiplist_depth = 16;
let max_index_depth = 2000000000;
let skiplist_index = SkiplistIndex::with_skip_edge_count(skiplist_depth);
let cs_fetcher = repo.get_changeset_fetcher();
let key = key.to_string();
repo.get_bonsai_heads_maybe_stale()
.collect()
.and_then(move |heads| {
loop_fn(
(heads.into_iter(), skiplist_index),
move |(mut heads, skiplist_index)| match heads.next() {
Some(head) => {
let f = skiplist_index.add_node(cs_fetcher.clone(), head, max_index_depth);
f.map(move |()| Loop::Continue((heads, skiplist_index)))
.boxify()
}
None => ok(Loop::Break(skiplist_index)).boxify(),
},
)
})
.inspect(|skiplist_index| {
println!(
"build {} skiplist nodes",
skiplist_index.indexed_node_count()
);
})
.map(|skiplist_index| {
// We store only latest skip entry (i.e. entry with the longest jump)
// This saves us storage space
let mut thrift_merge_graph = HashMap::new();
for (cs_id, skiplist_node_type) in skiplist_index.get_all_skip_edges() {
let skiplist_node_type = if let SkiplistNodeType::SkipEdges(skip_edges) =
skiplist_node_type
{
SkiplistNodeType::SkipEdges(skip_edges.last().cloned().into_iter().collect())
} else {
skiplist_node_type
};
thrift_merge_graph.insert(cs_id.into_thrift(), skiplist_node_type.to_thrift());
}
compact_protocol::serialize(&thrift_merge_graph)
})
.and_then(move |bytes| {
debug!(logger, "storing {} bytes", bytes.len());
blobstore.put(key, BlobstoreBytes::from_bytes(bytes))
})
.boxify()
}
fn read_skiplist_index<S: ToString>(
repo: BlobRepo,
key: S,
logger: Logger,
) -> BoxFuture<(), Error> {
repo.get_blobstore()
.get(key.to_string())
.and_then(move |maybebytes| {
match maybebytes {
Some(bytes) => {
debug!(logger, "received {} bytes from blobstore", bytes.len());
let bytes = bytes.into_bytes();
let skiplist_map = try_boxfuture!(deserialize_skiplist_map(bytes));
info!(logger, "skiplist graph has {} entries", skiplist_map.len());
}
None => {
println!("not found map");
}
};
ok(()).boxify()
})
.boxify()
}
fn main() -> Result<()> {
let matches = setup_app().get_matches();
@ -605,6 +713,30 @@ fn main() -> Result<()> {
::std::process::exit(1);
}
},
(SKIPLIST, Some(sub_m)) => match sub_m.subcommand() {
(SKIPLIST_BUILD, Some(sub_m)) => {
args::init_cachelib(&matches);
let repo = args::open_repo(&logger, &matches)?.blobrepo().clone();
build_skiplist_index(repo, sub_m.value_of("BLOBSTORE_KEY").unwrap(), logger)
}
(SKIPLIST_READ, Some(sub_m)) => {
args::init_cachelib(&matches);
let repo = args::open_repo(&logger, &matches)?.blobrepo().clone();
read_skiplist_index(
repo,
sub_m
.value_of("BLOBSTORE_KEY")
.expect("blobstore key is not specified"),
logger,
)
}
_ => {
println!("{}", sub_m.usage());
::std::process::exit(1);
}
},
_ => {
println!("{}", matches.usage());
::std::process::exit(1);

View File

@ -52,4 +52,5 @@ pub enum ErrorKind {
ParentsFetchFailed(#[cause] BlobRepoErrorCause),
#[fail(display = "checking existence failed")]
CheckExistenceFailed(String, #[cause] BlobRepoErrorCause),
#[fail(display = "Unknown field in thrift encoding")] UknownSkiplistThriftEncoding,
}

View File

@ -33,7 +33,7 @@ mod genbfs;
pub use genbfs::GenerationNumberBFS;
mod skiplist;
pub use skiplist::{SkiplistIndex, SkiplistNodeType};
pub use skiplist::{deserialize_skiplist_map, SkiplistIndex, SkiplistNodeType};
#[cfg(test)]
pub extern crate async_unit;
#[cfg(test)]

View File

@ -10,7 +10,7 @@ use std::sync::Arc;
use bytes::Bytes;
use chashmap::CHashMap;
use failure::Error;
use failure::{Error, Result, SyncFailure};
use futures::Stream;
use futures::future::{join_all, ok, Future};
use futures::future::{loop_fn, Loop};
@ -27,6 +27,8 @@ use skiplist_thrift;
use rust_thrift::compact_protocol;
use errors::*;
const DEFAULT_EDGE_COUNT: u32 = 10;
// Each indexed node fits into one of two categories:
@ -70,12 +72,54 @@ impl SkiplistNodeType {
}
}
pub fn from_thrift(skiplist_node: skiplist_thrift::SkiplistNodeType) -> Result<Self> {
fn decode_vec_to_thrift(
edges: Vec<skiplist_thrift::CommitAndGenerationNumber>,
) -> Result<Vec<(ChangesetId, Generation)>> {
edges
.into_iter()
.map(|commit_gen_num| {
let cs_id = commit_gen_num.cs_id;
let gen_num = commit_gen_num.gen;
ChangesetId::from_thrift(cs_id)
.map(|cs_id| (cs_id, Generation::new(gen_num.0 as u64)))
})
.collect()
}
match skiplist_node {
skiplist_thrift::SkiplistNodeType::SkipEdges(thrift_edges) => {
decode_vec_to_thrift(thrift_edges.edges).map(SkiplistNodeType::SkipEdges)
}
skiplist_thrift::SkiplistNodeType::ParentEdges(thrift_edges) => {
decode_vec_to_thrift(thrift_edges.edges).map(SkiplistNodeType::ParentEdges)
}
_ => Err(ErrorKind::UknownSkiplistThriftEncoding.into()),
}
}
pub fn serialize(&self) -> Bytes {
let thrift_skiplist_node_type = self.to_thrift();
compact_protocol::serialize(&thrift_skiplist_node_type)
}
}
pub fn deserialize_skiplist_map(bytes: Bytes) -> Result<HashMap<ChangesetId, SkiplistNodeType>> {
let map: HashMap<_, skiplist_thrift::SkiplistNodeType> =
compact_protocol::deserialize(&bytes).map_err(SyncFailure::new)?;
let v: Result<Vec<_>> = map.into_iter()
.map(|(cs_id, skiplist_thrift)| {
ChangesetId::from_thrift(cs_id).map(|cs_id| {
SkiplistNodeType::from_thrift(skiplist_thrift)
.map(move |skiplist| (cs_id, skiplist))
})
})
.collect();
v?.into_iter().collect()
}
struct SkiplistEdgeMapping {
pub mapping: CHashMap<ChangesetId, SkiplistNodeType>,
pub skip_edges_per_node: u32,
@ -306,37 +350,34 @@ fn query_reachability_with_generation_hints(
dst_hash_gen,
)
}
}))
.map(|parent_results| parent_results.into_iter().any(|x| x))
.boxify();
})).map(|parent_results| parent_results.into_iter().any(|x| x))
.boxify();
}
}
}
changeset_fetcher.get_parents(src_hash_gen.0)
changeset_fetcher
.get_parents(src_hash_gen.0)
.and_then({
cloned!(changeset_fetcher);
|parent_changesets| {
changesets_with_generation_numbers(
changeset_fetcher,
parent_changesets,
)
changesets_with_generation_numbers(changeset_fetcher, parent_changesets)
}
})
.and_then(move |parent_edges| {
join_all(parent_edges.into_iter().map({
move |parent_gen_pair| {
query_reachability_with_generation_hints(
changeset_fetcher.clone(),
skip_list_edges.clone(),
parent_gen_pair,
dst_hash_gen,
)
}
}))
})
.map(|parent_results| parent_results.into_iter().any(|x| x))
.boxify()
.and_then(move |parent_edges| {
join_all(parent_edges.into_iter().map({
move |parent_gen_pair| {
query_reachability_with_generation_hints(
changeset_fetcher.clone(),
skip_list_edges.clone(),
parent_gen_pair,
dst_hash_gen,
)
}
}))
})
.map(|parent_results| parent_results.into_iter().any(|x| x))
.boxify()
}
}
@ -369,7 +410,7 @@ impl SkiplistIndex {
}
}
pub fn with_skip_edge_count(self, skip_edges_per_node: u32) -> Self {
pub fn with_skip_edge_count(skip_edges_per_node: u32) -> Self {
SkiplistIndex {
skip_list_edges: Arc::new(
SkiplistEdgeMapping::new().with_skip_edge_count(skip_edges_per_node),
@ -410,6 +451,10 @@ impl SkiplistIndex {
}
}
pub fn get_all_skip_edges(&self) -> HashMap<ChangesetId, SkiplistNodeType> {
self.skip_list_edges.mapping.clone().into_iter().collect()
}
pub fn is_node_indexed(&self, node: ChangesetId) -> bool {
self.skip_list_edges.mapping.contains_key(&node)
}
@ -588,7 +633,7 @@ mod test {
let sli = SkiplistIndex::new();
assert_eq!(sli.skip_edge_count(), DEFAULT_EDGE_COUNT);
let sli_with_20 = SkiplistIndex::new().with_skip_edge_count(20);
let sli_with_20 = SkiplistIndex::with_skip_edge_count(20);
assert_eq!(sli_with_20.skip_edge_count(), 20);
});
}