sapling/eden/mononoke/cmds/admin/skiplist_subcommand.rs
Mark Juggurnauth-Thomas 91f2d07dbe tests: add test for derived data tailer
Reviewed By: krallin

Differential Revision: D25562158

fbshipit-source-id: 2ff917c4ae2f7c4b273b91d3f742bab6b05d8b46
2020-12-16 10:38:48 -08:00

357 lines
12 KiB
Rust

/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use anyhow::{anyhow, Error};
use async_trait::async_trait;
use bulkops::fetch_all_public_changesets;
use clap::{App, Arg, ArgMatches, SubCommand};
use fbinit::FacebookInit;
use fbthrift::compact_protocol;
use futures::{
compat::Future01CompatExt,
future::{try_join, FutureExt, TryFutureExt},
stream, StreamExt, TryStreamExt,
};
use futures_ext::{BoxFuture, FutureExt as _};
use futures_old::future::ok;
use futures_old::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
use blobrepo::BlobRepo;
use blobstore::Blobstore;
use changeset_fetcher::ChangesetFetcher;
use changesets::{ChangesetEntry, SqlChangesets};
use cmdlib::args::{self, MononokeMatches};
use context::CoreContext;
use mononoke_types::{BlobstoreBytes, ChangesetId, Generation};
use skiplist::{deserialize_skiplist_index, sparse, SkiplistIndex, SkiplistNodeType};
use slog::{debug, info, Logger};
use std::num::NonZeroU64;
use crate::error::SubcommandError;
pub const SKIPLIST: &str = "skiplist";
const SKIPLIST_BUILD: &str = "build";
const ARG_SPARSE: &str = "sparse";
const SKIPLIST_READ: &str = "read";
const ARG_EXPONENT: &str = "exponent";
// skiplist will jump up to 2^9 changesets
const DEFAULT_SKIPLIST_EXPONENT_STR: &str = "9";
pub fn build_subcommand<'a, 'b>() -> App<'a, 'b> {
SubCommand::with_name(SKIPLIST)
.about("commands to build or read skiplist indexes")
.subcommand(
SubCommand::with_name(SKIPLIST_BUILD)
.about("build skiplist index")
.arg(
Arg::with_name("BLOBSTORE_KEY")
.required(true)
.index(1)
.help("Blobstore key where to store the built skiplist"),
)
.arg(
Arg::with_name("rebuild")
.long("rebuild")
.help("forces the full rebuild instead of incremental update"),
)
.arg(
Arg::with_name(ARG_SPARSE)
.long(ARG_SPARSE)
.help("EXPERIMENTAL: build sparse skiplist. Makes skiplist smaller"),
)
.arg(
Arg::with_name(ARG_EXPONENT)
.long(ARG_EXPONENT)
.default_value(DEFAULT_SKIPLIST_EXPONENT_STR)
.help("Skiplist will skip up to 2^EXPONENT commits"),
),
)
.subcommand(
SubCommand::with_name(SKIPLIST_READ)
.about("read skiplist index")
.arg(
Arg::with_name("BLOBSTORE_KEY")
.required(true)
.index(1)
.help("Blobstore key from where to read the skiplist"),
),
)
}
#[derive(Copy, Clone)]
enum SkiplistType {
Full,
Sparse,
}
impl SkiplistType {
fn new(sparse: bool) -> Self {
if sparse {
SkiplistType::Sparse
} else {
SkiplistType::Full
}
}
}
pub async fn subcommand_skiplist<'a>(
fb: FacebookInit,
logger: Logger,
matches: &'a MononokeMatches<'_>,
sub_m: &'a ArgMatches<'_>,
) -> Result<(), SubcommandError> {
match sub_m.subcommand() {
(SKIPLIST_BUILD, Some(sub_m)) => {
let key = sub_m
.value_of("BLOBSTORE_KEY")
.expect("blobstore key is not specified")
.to_string();
let rebuild = sub_m.is_present("rebuild");
let skiplist_ty = SkiplistType::new(sub_m.is_present(ARG_SPARSE));
let exponent = sub_m
.value_of(ARG_EXPONENT)
.expect("exponent must be set")
.parse::<u32>()
.map_err(Error::from)?;
args::init_cachelib(fb, &matches);
let config_store = args::init_config_store(fb, &logger, matches)?;
let ctx = CoreContext::new_with_logger(fb, logger.clone());
let sql_changesets = args::open_sql::<SqlChangesets>(fb, config_store, &matches);
let repo = args::open_repo(fb, &logger, &matches);
let (repo, sql_changesets) = try_join(repo, sql_changesets).await?;
build_skiplist_index(
&ctx,
&repo,
key,
&logger,
&sql_changesets,
rebuild,
skiplist_ty,
exponent,
)
.await
.map_err(SubcommandError::Error)
}
(SKIPLIST_READ, Some(sub_m)) => {
let key = sub_m
.value_of("BLOBSTORE_KEY")
.expect("blobstore key is not specified")
.to_string();
args::init_cachelib(fb, &matches);
let ctx = CoreContext::test_mock(fb);
let repo = args::open_repo(fb, &logger, &matches).await?;
let maybe_index = read_skiplist_index(ctx.clone(), repo, key, logger.clone())
.compat()
.await?;
match maybe_index {
Some(index) => {
info!(
logger,
"skiplist graph has {} entries",
index.indexed_node_count()
);
}
None => {
info!(logger, "skiplist not found");
}
}
Ok(())
}
_ => Err(SubcommandError::InvalidArgs),
}
}
async fn build_skiplist_index<'a, S: ToString>(
ctx: &'a CoreContext,
repo: &'a BlobRepo,
key: S,
logger: &'a Logger,
sql_changesets: &'a SqlChangesets,
force_full_rebuild: bool,
skiplist_ty: SkiplistType,
exponent: u32,
) -> Result<(), Error> {
let blobstore = repo.get_blobstore();
// Depth must be one more than the maximum exponent.
let skiplist_depth = exponent + 1;
// Index all changesets
let max_index_depth = 20000000000;
let key = key.to_string();
let maybe_skiplist = if force_full_rebuild {
None
} else {
read_skiplist_index(ctx.clone(), repo.clone(), key.clone(), logger.clone())
.compat()
.await?
};
let changeset_fetcher = repo.get_changeset_fetcher();
let cs_fetcher_skiplist_func = async {
match maybe_skiplist {
Some(skiplist) => {
info!(
logger,
"skiplist graph has {} entries",
skiplist.indexed_node_count()
);
Ok((changeset_fetcher, skiplist))
}
None => {
info!(logger, "creating a skiplist from scratch");
let skiplist_index = SkiplistIndex::with_skip_edge_count(skiplist_depth);
let cs_fetcher = fetch_all_public_changesets_and_build_changeset_fetcher(
ctx,
repo,
sql_changesets,
)
.await?;
Ok((cs_fetcher, skiplist_index))
}
}
};
let heads = repo
.get_bonsai_heads_maybe_stale(ctx.clone())
.try_collect::<Vec<_>>();
let (heads, (cs_fetcher, skiplist_index)) = try_join(heads, cs_fetcher_skiplist_func).await?;
let updated_skiplist = match skiplist_ty {
SkiplistType::Full => {
stream::iter(heads)
.map(Ok)
.try_for_each_concurrent(100, |head| {
skiplist_index.add_node(&ctx, &cs_fetcher, head, max_index_depth)
})
.await?;
skiplist_index.get_all_skip_edges()
}
SkiplistType::Sparse => {
let mut index = skiplist_index.get_all_skip_edges();
let max_skip = NonZeroU64::new(2u64.pow(skiplist_depth - 1))
.ok_or(anyhow!("invalid skiplist depth"))?;
sparse::update_sparse_skiplist(&ctx, heads, &mut index, max_skip, &cs_fetcher).await?;
index
}
};
info!(logger, "build {} skiplist nodes", updated_skiplist.len());
// We store only latest skip entry (i.e. entry with the longest jump)
// This saves us storage space
let mut thrift_merge_graph = HashMap::new();
for (cs_id, skiplist_node_type) in updated_skiplist {
let skiplist_node_type = if let SkiplistNodeType::SkipEdges(skip_edges) = skiplist_node_type
{
SkiplistNodeType::SkipEdges(skip_edges.last().cloned().into_iter().collect())
} else {
skiplist_node_type
};
thrift_merge_graph.insert(cs_id.into_thrift(), skiplist_node_type.to_thrift());
}
let bytes = compact_protocol::serialize(&thrift_merge_graph);
debug!(logger, "storing {} bytes", bytes.len());
blobstore
.put(&ctx, key, BlobstoreBytes::from_bytes(bytes))
.await
}
async fn fetch_all_public_changesets_and_build_changeset_fetcher(
ctx: &CoreContext,
repo: &BlobRepo,
sql_changesets: &SqlChangesets,
) -> Result<Arc<dyn ChangesetFetcher>, Error> {
let fetched_changesets = fetch_all_public_changesets(
&ctx,
repo.get_repoid(),
&sql_changesets,
repo.get_phases().get_sql_phases(),
)
.try_collect::<Vec<_>>()
.await?;
let fetched_changesets: HashMap<_, _> = fetched_changesets
.into_iter()
.map(|cs_entry| (cs_entry.cs_id, cs_entry))
.collect();
let cs_fetcher: Arc<dyn ChangesetFetcher> = Arc::new(InMemoryChangesetFetcher {
fetched_changesets: Arc::new(fetched_changesets),
inner: repo.get_changeset_fetcher(),
});
Ok(cs_fetcher)
}
fn read_skiplist_index<S: ToString>(
ctx: CoreContext,
repo: BlobRepo,
key: S,
logger: Logger,
) -> BoxFuture<Option<SkiplistIndex>, Error> {
let blobstore = repo.get_blobstore();
let key = key.to_string();
async move { blobstore.get(&ctx, &key).await }
.boxed()
.compat()
.and_then(move |maybebytes| {
match maybebytes {
Some(bytes) => {
debug!(
logger,
"received {} bytes from blobstore",
bytes.as_bytes().len()
);
let bytes = bytes.into_raw_bytes();
deserialize_skiplist_index(logger.clone(), bytes)
.into_future()
.map(Some)
.left_future()
}
None => ok(None).right_future(),
}
})
.boxify()
}
#[derive(Clone)]
struct InMemoryChangesetFetcher {
fetched_changesets: Arc<HashMap<ChangesetId, ChangesetEntry>>,
inner: Arc<dyn ChangesetFetcher>,
}
#[async_trait]
impl ChangesetFetcher for InMemoryChangesetFetcher {
async fn get_generation_number(
&self,
ctx: CoreContext,
cs_id: ChangesetId,
) -> Result<Generation, Error> {
match self.fetched_changesets.get(&cs_id) {
Some(cs_entry) => Ok(Generation::new(cs_entry.gen)),
None => self.inner.get_generation_number(ctx, cs_id).await,
}
}
async fn get_parents(
&self,
ctx: CoreContext,
cs_id: ChangesetId,
) -> Result<Vec<ChangesetId>, Error> {
match self.fetched_changesets.get(&cs_id) {
Some(cs_entry) => Ok(cs_entry.parents.clone()),
None => self.inner.get_parents(ctx, cs_id).await,
}
}
}