2019-10-11 23:51:17 +03:00
|
|
|
/*
|
|
|
|
* Copyright (c) Facebook, Inc. and its affiliates.
|
|
|
|
*
|
|
|
|
* This software may be used and distributed according to the terms of the
|
2020-02-11 13:42:43 +03:00
|
|
|
* GNU General Public License version 2.
|
2019-10-11 23:51:17 +03:00
|
|
|
*/
|
2019-05-20 12:16:55 +03:00
|
|
|
|
2020-07-27 13:46:49 +03:00
|
|
|
use anyhow::{anyhow, Error};
|
2020-07-14 01:15:36 +03:00
|
|
|
use bulkops::fetch_all_public_changesets;
|
2020-06-04 00:22:42 +03:00
|
|
|
use clap::{App, Arg, ArgMatches, SubCommand};
|
2019-05-20 12:16:55 +03:00
|
|
|
use cloned::cloned;
|
2019-09-14 06:16:08 +03:00
|
|
|
use fbinit::FacebookInit;
|
2019-09-27 06:21:15 +03:00
|
|
|
use fbthrift::compact_protocol;
|
2020-06-25 18:43:18 +03:00
|
|
|
use futures::{
|
2020-07-12 13:18:32 +03:00
|
|
|
compat::{Future01CompatExt, Stream01CompatExt},
|
|
|
|
future::{try_join, FutureExt as NewFutureExt, TryFutureExt},
|
|
|
|
stream, StreamExt, TryStreamExt,
|
2020-06-25 18:43:18 +03:00
|
|
|
};
|
2019-09-12 17:53:27 +03:00
|
|
|
use futures_ext::{BoxFuture, FutureExt};
|
2020-07-12 13:18:32 +03:00
|
|
|
use futures_old::future::ok;
|
2020-03-03 08:00:03 +03:00
|
|
|
use futures_old::prelude::*;
|
2019-05-20 12:16:55 +03:00
|
|
|
use std::collections::HashMap;
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
|
|
|
use blobrepo::BlobRepo;
|
|
|
|
use blobstore::Blobstore;
|
|
|
|
use changeset_fetcher::ChangesetFetcher;
|
2020-07-14 01:15:36 +03:00
|
|
|
use changesets::{ChangesetEntry, SqlChangesets};
|
2019-05-20 12:16:55 +03:00
|
|
|
use cmdlib::args;
|
|
|
|
use context::CoreContext;
|
2020-07-14 01:15:36 +03:00
|
|
|
use mononoke_types::{BlobstoreBytes, ChangesetId, Generation};
|
2020-07-27 13:46:49 +03:00
|
|
|
use skiplist::{deserialize_skiplist_index, sparse, SkiplistIndex, SkiplistNodeType};
|
2019-05-20 12:16:55 +03:00
|
|
|
use slog::{debug, info, Logger};
|
2020-07-27 13:46:49 +03:00
|
|
|
use std::num::NonZeroU64;
|
2019-05-20 12:16:55 +03:00
|
|
|
|
2019-08-07 16:39:41 +03:00
|
|
|
use crate::error::SubcommandError;
|
2019-05-20 12:16:55 +03:00
|
|
|
|
2020-04-21 12:27:29 +03:00
|
|
|
pub const SKIPLIST: &str = "skiplist";
|
|
|
|
const SKIPLIST_BUILD: &str = "build";
|
2020-07-27 13:46:49 +03:00
|
|
|
const ARG_SPARSE: &str = "sparse";
|
2020-04-21 12:27:29 +03:00
|
|
|
const SKIPLIST_READ: &str = "read";
|
|
|
|
|
2020-04-21 12:27:29 +03:00
|
|
|
pub fn build_subcommand<'a, 'b>() -> App<'a, 'b> {
|
|
|
|
SubCommand::with_name(SKIPLIST)
|
|
|
|
.about("commands to build or read skiplist indexes")
|
|
|
|
.subcommand(
|
|
|
|
SubCommand::with_name(SKIPLIST_BUILD)
|
|
|
|
.about("build skiplist index")
|
2020-06-04 00:22:42 +03:00
|
|
|
.arg(
|
|
|
|
Arg::with_name("BLOBSTORE_KEY")
|
|
|
|
.required(true)
|
|
|
|
.index(1)
|
|
|
|
.help("Blobstore key where to store the built skiplist"),
|
|
|
|
)
|
|
|
|
.arg(
|
|
|
|
Arg::with_name("rebuild")
|
|
|
|
.long("rebuild")
|
|
|
|
.help("forces the full rebuild instead of incremental update"),
|
2020-07-27 13:46:49 +03:00
|
|
|
)
|
|
|
|
.arg(
|
|
|
|
Arg::with_name(ARG_SPARSE)
|
|
|
|
.long(ARG_SPARSE)
|
|
|
|
.help("EXPERIMENTAL: build sparse skiplist. Makes skiplist smaller"),
|
2020-04-21 12:27:29 +03:00
|
|
|
),
|
|
|
|
)
|
|
|
|
.subcommand(
|
|
|
|
SubCommand::with_name(SKIPLIST_READ)
|
|
|
|
.about("read skiplist index")
|
2020-06-04 00:22:42 +03:00
|
|
|
.arg(
|
|
|
|
Arg::with_name("BLOBSTORE_KEY")
|
|
|
|
.required(true)
|
|
|
|
.index(1)
|
|
|
|
.help("Blobstore key from where to read the skiplist"),
|
2020-04-21 12:27:29 +03:00
|
|
|
),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
2020-07-27 13:46:49 +03:00
|
|
|
#[derive(Copy, Clone)]
|
|
|
|
enum SkiplistType {
|
|
|
|
Full,
|
|
|
|
Sparse,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl SkiplistType {
|
|
|
|
fn new(sparse: bool) -> Self {
|
|
|
|
if sparse {
|
|
|
|
SkiplistType::Sparse
|
|
|
|
} else {
|
|
|
|
SkiplistType::Full
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-02-27 23:29:48 +03:00
|
|
|
pub async fn subcommand_skiplist<'a>(
|
2019-09-14 06:16:08 +03:00
|
|
|
fb: FacebookInit,
|
2019-05-20 12:16:55 +03:00
|
|
|
logger: Logger,
|
2020-02-27 23:29:48 +03:00
|
|
|
matches: &'a ArgMatches<'_>,
|
|
|
|
sub_m: &'a ArgMatches<'_>,
|
|
|
|
) -> Result<(), SubcommandError> {
|
2019-05-20 12:16:55 +03:00
|
|
|
match sub_m.subcommand() {
|
|
|
|
(SKIPLIST_BUILD, Some(sub_m)) => {
|
|
|
|
let key = sub_m
|
|
|
|
.value_of("BLOBSTORE_KEY")
|
|
|
|
.expect("blobstore key is not specified")
|
|
|
|
.to_string();
|
2020-06-04 00:22:42 +03:00
|
|
|
let rebuild = sub_m.is_present("rebuild");
|
2020-07-27 13:46:49 +03:00
|
|
|
let skiplist_ty = SkiplistType::new(sub_m.is_present(ARG_SPARSE));
|
2019-05-20 12:16:55 +03:00
|
|
|
|
2020-01-31 20:50:17 +03:00
|
|
|
args::init_cachelib(fb, &matches, None);
|
2019-09-14 06:16:08 +03:00
|
|
|
let ctx = CoreContext::new_with_logger(fb, logger.clone());
|
2019-12-16 19:12:13 +03:00
|
|
|
let sql_changesets = args::open_sql::<SqlChangesets>(fb, &matches);
|
2019-09-14 06:16:08 +03:00
|
|
|
let repo = args::open_repo(fb, &logger, &matches);
|
2019-05-20 12:16:55 +03:00
|
|
|
repo.join(sql_changesets)
|
|
|
|
.and_then(move |(repo, sql_changesets)| {
|
2020-07-12 13:18:32 +03:00
|
|
|
async move {
|
2020-07-27 13:46:49 +03:00
|
|
|
build_skiplist_index(
|
|
|
|
&ctx,
|
|
|
|
&repo,
|
|
|
|
key,
|
|
|
|
&logger,
|
|
|
|
&sql_changesets,
|
|
|
|
rebuild,
|
|
|
|
skiplist_ty,
|
|
|
|
)
|
|
|
|
.await
|
2020-07-12 13:18:32 +03:00
|
|
|
}
|
|
|
|
.boxed()
|
|
|
|
.compat()
|
2019-05-20 12:16:55 +03:00
|
|
|
})
|
2019-08-07 16:39:41 +03:00
|
|
|
.from_err()
|
2019-05-20 12:16:55 +03:00
|
|
|
.boxify()
|
|
|
|
}
|
|
|
|
(SKIPLIST_READ, Some(sub_m)) => {
|
|
|
|
let key = sub_m
|
|
|
|
.value_of("BLOBSTORE_KEY")
|
|
|
|
.expect("blobstore key is not specified")
|
|
|
|
.to_string();
|
|
|
|
|
2020-01-31 20:50:17 +03:00
|
|
|
args::init_cachelib(fb, &matches, None);
|
2019-09-14 06:16:08 +03:00
|
|
|
let ctx = CoreContext::test_mock(fb);
|
|
|
|
args::open_repo(fb, &logger, &matches)
|
2019-09-12 17:53:27 +03:00
|
|
|
.and_then({
|
|
|
|
cloned!(logger);
|
|
|
|
move |repo| read_skiplist_index(ctx.clone(), repo, key, logger)
|
|
|
|
})
|
2020-09-09 17:50:14 +03:00
|
|
|
.map(move |maybe_index| {
|
|
|
|
match maybe_index {
|
|
|
|
Some(index) => {
|
|
|
|
info!(
|
|
|
|
logger,
|
|
|
|
"skiplist graph has {} entries",
|
|
|
|
index.indexed_node_count()
|
|
|
|
);
|
|
|
|
}
|
|
|
|
None => {
|
|
|
|
info!(logger, "skiplist not found");
|
|
|
|
}
|
2019-09-12 17:53:27 +03:00
|
|
|
}
|
|
|
|
})
|
2019-08-07 16:39:41 +03:00
|
|
|
.from_err()
|
2019-05-20 12:16:55 +03:00
|
|
|
.boxify()
|
|
|
|
}
|
2019-08-07 16:39:41 +03:00
|
|
|
_ => Err(SubcommandError::InvalidArgs).into_future().boxify(),
|
2019-05-20 12:16:55 +03:00
|
|
|
}
|
2020-02-27 23:29:48 +03:00
|
|
|
.compat()
|
|
|
|
.await
|
2019-05-20 12:16:55 +03:00
|
|
|
}
|
|
|
|
|
2020-07-12 13:18:32 +03:00
|
|
|
async fn build_skiplist_index<'a, S: ToString>(
|
|
|
|
ctx: &'a CoreContext,
|
|
|
|
repo: &'a BlobRepo,
|
2019-05-20 12:16:55 +03:00
|
|
|
key: S,
|
2020-07-12 13:18:32 +03:00
|
|
|
logger: &'a Logger,
|
|
|
|
sql_changesets: &'a SqlChangesets,
|
2020-06-04 00:22:42 +03:00
|
|
|
force_full_rebuild: bool,
|
2020-07-27 13:46:49 +03:00
|
|
|
skiplist_ty: SkiplistType,
|
2020-07-12 13:18:32 +03:00
|
|
|
) -> Result<(), Error> {
|
2019-05-20 12:16:55 +03:00
|
|
|
let blobstore = repo.get_blobstore();
|
|
|
|
// skiplist will jump up to 2^9 changesets
|
|
|
|
let skiplist_depth = 10;
|
|
|
|
// Index all changesets
|
|
|
|
let max_index_depth = 20000000000;
|
|
|
|
let key = key.to_string();
|
2020-07-12 13:18:32 +03:00
|
|
|
let maybe_skiplist = if force_full_rebuild {
|
|
|
|
None
|
2020-06-04 00:22:42 +03:00
|
|
|
} else {
|
2020-07-12 13:18:32 +03:00
|
|
|
read_skiplist_index(ctx.clone(), repo.clone(), key.clone(), logger.clone())
|
|
|
|
.compat()
|
|
|
|
.await?
|
2020-06-04 00:22:42 +03:00
|
|
|
};
|
2019-05-20 12:16:55 +03:00
|
|
|
|
2020-07-12 13:18:32 +03:00
|
|
|
let changeset_fetcher = repo.get_changeset_fetcher();
|
|
|
|
let cs_fetcher_skiplist_func = async {
|
|
|
|
match maybe_skiplist {
|
|
|
|
Some(skiplist) => {
|
2019-05-20 12:16:55 +03:00
|
|
|
info!(
|
|
|
|
logger,
|
2020-07-12 13:18:32 +03:00
|
|
|
"skiplist graph has {} entries",
|
|
|
|
skiplist.indexed_node_count()
|
2019-05-20 12:16:55 +03:00
|
|
|
);
|
2020-07-12 13:18:32 +03:00
|
|
|
Ok((changeset_fetcher, skiplist))
|
2019-05-20 12:16:55 +03:00
|
|
|
}
|
2020-07-12 13:18:32 +03:00
|
|
|
None => {
|
|
|
|
info!(logger, "creating a skiplist from scratch");
|
|
|
|
let skiplist_index = SkiplistIndex::with_skip_edge_count(skiplist_depth);
|
2020-07-27 13:46:49 +03:00
|
|
|
let cs_fetcher = fetch_all_public_changesets_and_build_changeset_fetcher(
|
|
|
|
ctx,
|
|
|
|
repo,
|
|
|
|
sql_changesets,
|
2020-07-12 13:18:32 +03:00
|
|
|
)
|
|
|
|
.await?;
|
|
|
|
Ok((cs_fetcher, skiplist_index))
|
2019-05-20 12:16:55 +03:00
|
|
|
}
|
2020-07-12 13:18:32 +03:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let heads = repo
|
|
|
|
.get_bonsai_heads_maybe_stale(ctx.clone())
|
|
|
|
.compat()
|
|
|
|
.try_collect::<Vec<_>>();
|
|
|
|
|
|
|
|
let (heads, (cs_fetcher, skiplist_index)) = try_join(heads, cs_fetcher_skiplist_func).await?;
|
|
|
|
|
2020-07-27 13:46:49 +03:00
|
|
|
let updated_skiplist = match skiplist_ty {
|
|
|
|
SkiplistType::Full => {
|
|
|
|
stream::iter(heads)
|
|
|
|
.map(Ok)
|
|
|
|
.try_for_each_concurrent(100, |head| {
|
|
|
|
skiplist_index.add_node(&ctx, &cs_fetcher, head, max_index_depth)
|
|
|
|
})
|
|
|
|
.await?;
|
|
|
|
skiplist_index.get_all_skip_edges()
|
|
|
|
}
|
|
|
|
SkiplistType::Sparse => {
|
|
|
|
let mut index = skiplist_index.get_all_skip_edges();
|
|
|
|
let max_skip = NonZeroU64::new(2u64.pow(skiplist_depth - 1))
|
|
|
|
.ok_or(anyhow!("invalid skiplist depth"))?;
|
|
|
|
sparse::update_sparse_skiplist(&ctx, heads, &mut index, max_skip, &cs_fetcher).await?;
|
|
|
|
index
|
|
|
|
}
|
|
|
|
};
|
2020-07-12 13:18:32 +03:00
|
|
|
|
2020-07-27 13:46:49 +03:00
|
|
|
info!(logger, "build {} skiplist nodes", updated_skiplist.len());
|
2020-07-12 13:18:32 +03:00
|
|
|
|
|
|
|
// We store only latest skip entry (i.e. entry with the longest jump)
|
|
|
|
// This saves us storage space
|
|
|
|
let mut thrift_merge_graph = HashMap::new();
|
2020-07-27 13:46:49 +03:00
|
|
|
for (cs_id, skiplist_node_type) in updated_skiplist {
|
2020-07-12 13:18:32 +03:00
|
|
|
let skiplist_node_type = if let SkiplistNodeType::SkipEdges(skip_edges) = skiplist_node_type
|
|
|
|
{
|
|
|
|
SkiplistNodeType::SkipEdges(skip_edges.last().cloned().into_iter().collect())
|
|
|
|
} else {
|
|
|
|
skiplist_node_type
|
|
|
|
};
|
|
|
|
|
|
|
|
thrift_merge_graph.insert(cs_id.into_thrift(), skiplist_node_type.to_thrift());
|
|
|
|
}
|
|
|
|
let bytes = compact_protocol::serialize(&thrift_merge_graph);
|
|
|
|
|
|
|
|
debug!(logger, "storing {} bytes", bytes.len());
|
|
|
|
blobstore
|
|
|
|
.put(ctx.clone(), key, BlobstoreBytes::from_bytes(bytes))
|
|
|
|
.await
|
2019-05-20 12:16:55 +03:00
|
|
|
}
|
|
|
|
|
2020-07-27 13:46:49 +03:00
|
|
|
async fn fetch_all_public_changesets_and_build_changeset_fetcher(
|
|
|
|
ctx: &CoreContext,
|
|
|
|
repo: &BlobRepo,
|
|
|
|
sql_changesets: &SqlChangesets,
|
|
|
|
) -> Result<Arc<dyn ChangesetFetcher>, Error> {
|
|
|
|
let fetched_changesets = fetch_all_public_changesets(
|
|
|
|
&ctx,
|
|
|
|
repo.get_repoid(),
|
|
|
|
&sql_changesets,
|
|
|
|
repo.get_phases().get_sql_phases(),
|
|
|
|
)
|
|
|
|
.try_collect::<Vec<_>>()
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
let fetched_changesets: HashMap<_, _> = fetched_changesets
|
|
|
|
.into_iter()
|
|
|
|
.map(|cs_entry| (cs_entry.cs_id, cs_entry))
|
|
|
|
.collect();
|
|
|
|
let cs_fetcher: Arc<dyn ChangesetFetcher> = Arc::new(InMemoryChangesetFetcher {
|
|
|
|
fetched_changesets: Arc::new(fetched_changesets),
|
|
|
|
inner: repo.get_changeset_fetcher(),
|
|
|
|
});
|
|
|
|
|
|
|
|
Ok(cs_fetcher)
|
|
|
|
}
|
|
|
|
|
2019-05-20 12:16:55 +03:00
|
|
|
fn read_skiplist_index<S: ToString>(
|
|
|
|
ctx: CoreContext,
|
|
|
|
repo: BlobRepo,
|
|
|
|
key: S,
|
|
|
|
logger: Logger,
|
2019-09-12 17:53:27 +03:00
|
|
|
) -> BoxFuture<Option<SkiplistIndex>, Error> {
|
2019-05-20 12:16:55 +03:00
|
|
|
repo.get_blobstore()
|
|
|
|
.get(ctx, key.to_string())
|
2020-06-26 13:52:21 +03:00
|
|
|
.compat()
|
2020-09-09 17:50:14 +03:00
|
|
|
.and_then(move |maybebytes| {
|
|
|
|
match maybebytes {
|
|
|
|
Some(bytes) => {
|
|
|
|
debug!(
|
|
|
|
logger,
|
|
|
|
"received {} bytes from blobstore",
|
|
|
|
bytes.as_bytes().len()
|
|
|
|
);
|
|
|
|
let bytes = bytes.into_raw_bytes();
|
|
|
|
deserialize_skiplist_index(logger.clone(), bytes)
|
|
|
|
.into_future()
|
|
|
|
.map(Some)
|
|
|
|
.left_future()
|
|
|
|
}
|
|
|
|
None => ok(None).right_future(),
|
2019-09-12 17:53:27 +03:00
|
|
|
}
|
2019-05-20 12:16:55 +03:00
|
|
|
})
|
|
|
|
.boxify()
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
struct InMemoryChangesetFetcher {
|
|
|
|
fetched_changesets: Arc<HashMap<ChangesetId, ChangesetEntry>>,
|
|
|
|
inner: Arc<dyn ChangesetFetcher>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ChangesetFetcher for InMemoryChangesetFetcher {
|
|
|
|
fn get_generation_number(
|
|
|
|
&self,
|
|
|
|
ctx: CoreContext,
|
|
|
|
cs_id: ChangesetId,
|
|
|
|
) -> BoxFuture<Generation, Error> {
|
|
|
|
match self.fetched_changesets.get(&cs_id) {
|
|
|
|
Some(cs_entry) => ok(Generation::new(cs_entry.gen)).boxify(),
|
|
|
|
None => self.inner.get_generation_number(ctx, cs_id),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn get_parents(
|
|
|
|
&self,
|
|
|
|
ctx: CoreContext,
|
|
|
|
cs_id: ChangesetId,
|
|
|
|
) -> BoxFuture<Vec<ChangesetId>, Error> {
|
|
|
|
match self.fetched_changesets.get(&cs_id) {
|
|
|
|
Some(cs_entry) => ok(cs_entry.parents.clone()).boxify(),
|
|
|
|
None => self.inner.get_parents(ctx, cs_id),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|