mononoke: add phases helpers in mononoke_admin

Summary:
Add two helpful commands:
1) Fetch the phase of a commit
2) List all public commits

Reviewed By: ikostia

Differential Revision: D16830576

fbshipit-source-id: 03f503cb30a7f150ea383d62fb71913dd2b93e6e
This commit is contained in:
Stanislau Hlebik 2019-08-24 04:11:28 -07:00 committed by Facebook Github Bot
parent 01ea2d3917
commit 2fcd6c0d82
5 changed files with 299 additions and 130 deletions

View File

@ -4,11 +4,14 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
pub const ADD_PUBLIC_PHASES: &'static str = "add-public";
pub const BLOBSTORE_FETCH: &'static str = "blobstore-fetch";
pub const BONSAI_FETCH: &'static str = "bonsai-fetch";
pub const CONTENT_FETCH: &'static str = "content-fetch";
pub const BOOKMARKS: &'static str = "bookmarks";
pub const SKIPLIST: &'static str = "skiplist";
pub const CONTENT_FETCH: &'static str = "content-fetch";
pub const FETCH_PHASE: &'static str = "fetch";
pub const FILENODES: &'static str = "filenodes";
pub const FILESTORE: &'static str = "filestore";
pub const HASH_CONVERT: &'static str = "convert";
pub const HG_CHANGESET: &'static str = "hg-changeset";
pub const HG_CHANGESET_DIFF: &'static str = "diff";
@ -19,12 +22,12 @@ pub const HG_SYNC_SHOW: &'static str = "show";
pub const HG_SYNC_FETCH_BUNDLE: &'static str = "fetch-bundle";
pub const HG_SYNC_LAST_PROCESSED: &'static str = "last-processed";
pub const HG_SYNC_VERIFY: &'static str = "verify";
pub const LIST_PUBLIC: &'static str = "list-public";
pub const SKIPLIST_BUILD: &'static str = "build";
pub const SKIPLIST_READ: &'static str = "read";
pub const ADD_PUBLIC_PHASES: &'static str = "add-public-phases";
pub const PHASES: &'static str = "phases";
pub const REDACTION: &'static str = "redaction";
pub const REDACTION_ADD: &'static str = "add";
pub const REDACTION_REMOVE: &'static str = "remove";
pub const REDACTION_LIST: &'static str = "list";
pub const FILENODES: &'static str = "filenodes";
pub const FILESTORE: &'static str = "filestore";
pub const SKIPLIST: &'static str = "skiplist";

View File

@ -5,6 +5,7 @@
// GNU General Public License version 2 or any later version.
#![deny(warnings)]
#![feature(async_await)]
#![feature(process_exitcode_placeholder)]
use clap::{App, Arg, SubCommand};
@ -20,11 +21,11 @@ use slog::error;
use crate::blobstore_fetch::subcommand_blobstore_fetch;
use crate::bonsai_fetch::subcommand_bonsai_fetch;
use crate::cmdargs::{
ADD_PUBLIC_PHASES, BLOBSTORE_FETCH, BONSAI_FETCH, BOOKMARKS, CONTENT_FETCH, FILENODES,
FILESTORE, HASH_CONVERT, HG_CHANGESET, HG_CHANGESET_DIFF, HG_CHANGESET_RANGE, HG_SYNC_BUNDLE,
HG_SYNC_FETCH_BUNDLE, HG_SYNC_LAST_PROCESSED, HG_SYNC_REMAINS, HG_SYNC_SHOW, HG_SYNC_VERIFY,
REDACTION, REDACTION_ADD, REDACTION_LIST, REDACTION_REMOVE, SKIPLIST, SKIPLIST_BUILD,
SKIPLIST_READ,
ADD_PUBLIC_PHASES, BLOBSTORE_FETCH, BONSAI_FETCH, BOOKMARKS, CONTENT_FETCH, FETCH_PHASE,
FILENODES, FILESTORE, HASH_CONVERT, HG_CHANGESET, HG_CHANGESET_DIFF, HG_CHANGESET_RANGE,
HG_SYNC_BUNDLE, HG_SYNC_FETCH_BUNDLE, HG_SYNC_LAST_PROCESSED, HG_SYNC_REMAINS, HG_SYNC_SHOW,
HG_SYNC_VERIFY, LIST_PUBLIC, PHASES, REDACTION, REDACTION_ADD, REDACTION_LIST,
REDACTION_REMOVE, SKIPLIST, SKIPLIST_BUILD, SKIPLIST_READ,
};
use crate::content_fetch::subcommand_content_fetch;
use crate::error::SubcommandError;
@ -32,7 +33,6 @@ use crate::filenodes::subcommand_filenodes;
use crate::hash_convert::subcommand_hash_convert;
use crate::hg_changeset::subcommand_hg_changeset;
use crate::hg_sync::subcommand_process_hg_sync;
use crate::public_phases::subcommand_add_public_phases;
use crate::redaction::subcommand_redaction;
use crate::skiplist_subcommand::subcommand_skiplist;
@ -48,7 +48,7 @@ mod filestore;
mod hash_convert;
mod hg_changeset;
mod hg_sync;
mod public_phases;
mod phases;
mod redaction;
mod skiplist_subcommand;
@ -238,19 +238,58 @@ fn setup_app<'a, 'b>() -> App<'a, 'b> {
.about("verify the consistency of yet-to-be-processed bookmark log entries"),
);
let add_public_phases = SubCommand::with_name(ADD_PUBLIC_PHASES)
.about("mark mercurial commits as public from provided new-line separated list")
.arg(
Arg::with_name("input-file")
.help("new-line separated mercurial public commits")
.required(true)
.index(1),
let phases = SubCommand::with_name(PHASES)
.about("commands to work with phases")
.subcommand(
SubCommand::with_name(ADD_PUBLIC_PHASES)
.about("mark mercurial commits as public from provided new-line separated list")
.arg(
Arg::with_name("input-file")
.help("new-line separated mercurial public commits")
.required(true)
.index(1),
)
.arg(
Arg::with_name("chunk-size")
.help("partition input file to chunks of specified size")
.long("chunk-size")
.takes_value(true),
),
)
.arg(
Arg::with_name("chunk-size")
.help("partition input file to chunks of specified size")
.long("chunk-size")
.takes_value(true),
.subcommand(
SubCommand::with_name(FETCH_PHASE)
.about("fetch phase of a commit")
.arg(
Arg::with_name("changeset-type")
.long("changeset-type")
.short("c")
.takes_value(true)
.possible_values(&["bonsai", "hg"])
.required(false)
.help(
"What changeset type to return, either bonsai or hg. Defaults to hg.",
),
)
.arg(
Arg::with_name("hash")
.help("changeset hash")
.takes_value(true),
),
)
.subcommand(
SubCommand::with_name(LIST_PUBLIC)
.arg(
Arg::with_name("changeset-type")
.long("changeset-type")
.short("c")
.takes_value(true)
.possible_values(&["bonsai", "hg"])
.required(false)
.help(
"What changeset type to return, either bonsai or hg. Defaults to hg.",
),
)
.about("List all public commits"),
);
let redaction = SubCommand::with_name(REDACTION)
@ -321,9 +360,9 @@ fn setup_app<'a, 'b>() -> App<'a, 'b> {
.subcommand(skiplist)
.subcommand(convert)
.subcommand(hg_sync)
.subcommand(add_public_phases)
.subcommand(redaction)
.subcommand(filenodes::build_subcommand(FILENODES))
.subcommand(phases)
.subcommand(filestore::build_subcommand(FILESTORE))
}
@ -349,10 +388,10 @@ fn main() -> ExitCode {
}
(SKIPLIST, Some(sub_m)) => subcommand_skiplist(logger, &matches, sub_m),
(HASH_CONVERT, Some(sub_m)) => subcommand_hash_convert(logger, &matches, sub_m),
(ADD_PUBLIC_PHASES, Some(sub_m)) => subcommand_add_public_phases(logger, &matches, sub_m),
(REDACTION, Some(sub_m)) => subcommand_redaction(logger, &matches, sub_m),
(FILENODES, Some(sub_m)) => subcommand_filenodes(logger, &matches, sub_m),
(FILESTORE, Some(sub_m)) => filestore::execute_command(logger, &matches, sub_m),
(PHASES, Some(sub_m)) => phases::subcommand_phases(logger, &matches, sub_m),
_ => Err(SubcommandError::InvalidArgs).into_future().boxify(),
};

215
cmds/admin/phases.rs Normal file
View File

@ -0,0 +1,215 @@
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use crate::cmdargs::{ADD_PUBLIC_PHASES, FETCH_PHASE, LIST_PUBLIC};
use clap::ArgMatches;
use cloned::cloned;
use failure_ext::{err_msg, Error};
use futures::{stream, Future, IntoFuture, Stream};
use futures_ext::{try_boxfuture, BoxFuture, FutureExt};
use futures_preview::{
compat::Future01CompatExt,
future::{FutureExt as PreviewFutureExt, TryFutureExt},
};
use std::{
fs::File,
io::{BufRead, BufReader, Write},
str::FromStr,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use blobrepo::BlobRepo;
use cmdlib::args;
use context::CoreContext;
use mercurial_types::HgChangesetId;
use mononoke_types::ChangesetId;
use phases::{Phases, SqlPhases};
use slog::{info, Logger};
use crate::error::SubcommandError;
pub fn subcommand_phases(
logger: Logger,
matches: &ArgMatches<'_>,
sub_m: &ArgMatches<'_>,
) -> BoxFuture<(), SubcommandError> {
let repo = args::open_repo(&logger, &matches);
let phases = args::open_sql::<SqlPhases>(&matches);
args::init_cachelib(&matches);
let ctx = CoreContext::new_with_logger(logger.clone());
match sub_m.subcommand() {
(FETCH_PHASE, Some(sub_m)) => {
let ty = sub_m
.value_of("changeset-type")
.map(|s| s)
.unwrap_or("hg")
.to_string();
let hash = sub_m
.value_of("hash")
.map(|s| s.to_string())
.ok_or(err_msg("changeset hash is not specified"));
subcommand_fetch_phase_impl(repo, phases, hash, ty)
.boxed()
.compat()
.from_err()
.boxify()
}
(ADD_PUBLIC_PHASES, Some(sub_m)) => {
let path = String::from(sub_m.value_of("input-file").unwrap());
let chunk_size = sub_m
.value_of("chunk-size")
.and_then(|chunk_size| chunk_size.parse::<usize>().ok())
.unwrap_or(16384);
repo.join(phases)
.and_then(move |(repo, phases)| {
add_public_phases(ctx, repo, Arc::new(phases), logger, path, chunk_size)
})
.from_err()
.boxify()
}
(LIST_PUBLIC, Some(sub_m)) => {
let ty = sub_m
.value_of("changeset-type")
.map(|s| s)
.unwrap_or("hg")
.to_string();
subcommand_list_public_impl(ctx, ty, repo, phases)
.boxed()
.compat()
.from_err()
.boxify()
}
_ => Err(SubcommandError::InvalidArgs).into_future().boxify(),
}
}
fn add_public_phases(
ctx: CoreContext,
repo: BlobRepo,
phases: Arc<SqlPhases>,
logger: Logger,
path: impl AsRef<str>,
chunk_size: usize,
) -> impl Future<Item = (), Error = Error> {
let file = try_boxfuture!(File::open(path.as_ref()).map_err(Error::from));
let hg_changesets = BufReader::new(file).lines().filter_map(|id_str| {
id_str
.map_err(Error::from)
.and_then(|v| HgChangesetId::from_str(&v))
.ok()
});
let entries_processed = Arc::new(AtomicUsize::new(0));
info!(logger, "start processing hashes");
stream::iter_ok(hg_changesets)
.chunks(chunk_size)
.and_then(move |chunk| {
let count = chunk.len();
repo.get_hg_bonsai_mapping(ctx.clone(), chunk)
.and_then({
cloned!(ctx, repo, phases);
move |changesets| {
phases.add_public(
ctx,
repo,
changesets.into_iter().map(|(_, cs)| cs).collect(),
)
}
})
.and_then({
cloned!(entries_processed);
move |_| {
print!(
"\x1b[Khashes processed: {}\r",
entries_processed.fetch_add(count, Ordering::SeqCst) + count,
);
std::io::stdout().flush().expect("flush on stdout failed");
tokio_timer::sleep(Duration::from_secs(5)).map_err(Error::from)
}
})
})
.for_each(|_| Ok(()))
.boxify()
}
async fn subcommand_list_public_impl(
ctx: CoreContext,
ty: String,
repo: impl Future<Item = BlobRepo, Error = Error>,
phases: impl Future<Item = SqlPhases, Error = Error>,
) -> Result<(), Error> {
let repo = repo.compat().await?;
let phases = phases.compat().await?;
let public = phases
.list_all_public(ctx.clone(), repo.get_repoid())
.compat()
.await?;
if ty == "bonsai" {
for p in public {
println!("{}", p);
}
} else {
for chunk in public.chunks(1000) {
let bonsais: Vec<_> = chunk.iter().cloned().collect();
let hg_bonsais = repo
.get_hg_bonsai_mapping(ctx.clone(), bonsais)
.compat()
.await?;
let hg_css: Vec<HgChangesetId> = hg_bonsais
.clone()
.into_iter()
.map(|(hg_cs_id, _)| hg_cs_id)
.collect();
for hg_cs in hg_css {
println!("{}", hg_cs);
}
}
}
Ok(())
}
pub async fn subcommand_fetch_phase_impl<'a>(
repo: impl Future<Item = BlobRepo, Error = Error>,
phases: impl Future<Item = SqlPhases, Error = Error>,
hash: Result<String, Error>,
ty: String,
) -> Result<(), Error> {
let ctx = CoreContext::test_mock();
let repo = repo.compat().await?;
let phases = phases.compat().await?;
let hash = hash?;
let bcs_id = if ty == "bonsai" {
ChangesetId::from_str(&hash)?
} else if ty == "hg" {
let maybe_bonsai = repo
.get_bonsai_from_hg(ctx.clone(), HgChangesetId::from_str(&hash)?)
.compat()
.await?;
maybe_bonsai.ok_or(err_msg(format!("bonsai not found for {}", hash)))?
} else {
return Err(err_msg(format!("unknown hash type: {}", ty)));
};
let public_phases = phases.get_public(ctx, repo, vec![bcs_id]).compat().await?;
if public_phases.contains(&bcs_id) {
println!("public");
} else {
println!("draft");
}
Ok(())
}

View File

@ -1,104 +0,0 @@
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use clap::ArgMatches;
use cloned::cloned;
use failure_ext::{Error, FutureFailureErrorExt};
use futures::{stream, Future, Stream};
use futures_ext::{try_boxfuture, BoxFuture, FutureExt};
use std::{
fs::File,
io::{BufRead, BufReader, Write},
str::FromStr,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use blobrepo::BlobRepo;
use cmdlib::args;
use context::CoreContext;
use mercurial_types::HgChangesetId;
use phases::SqlPhases;
use slog::{info, Logger};
use crate::error::SubcommandError;
fn add_public_phases(
ctx: CoreContext,
repo: BlobRepo,
phases: Arc<SqlPhases>,
logger: Logger,
path: impl AsRef<str>,
chunk_size: usize,
) -> impl Future<Item = (), Error = Error> {
let file = try_boxfuture!(File::open(path.as_ref()).map_err(Error::from));
let hg_changesets = BufReader::new(file).lines().filter_map(|id_str| {
id_str
.map_err(Error::from)
.and_then(|v| HgChangesetId::from_str(&v))
.ok()
});
let entries_processed = Arc::new(AtomicUsize::new(0));
info!(logger, "start processing hashes");
stream::iter_ok(hg_changesets)
.chunks(chunk_size)
.and_then(move |chunk| {
let count = chunk.len();
repo.get_hg_bonsai_mapping(ctx.clone(), chunk)
.and_then({
cloned!(ctx, repo, phases);
move |changesets| {
phases.add_public(
ctx,
repo,
changesets.into_iter().map(|(_, cs)| cs).collect(),
)
}
})
.and_then({
cloned!(entries_processed);
move |_| {
print!(
"\x1b[Khashes processed: {}\r",
entries_processed.fetch_add(count, Ordering::SeqCst) + count,
);
std::io::stdout().flush().expect("flush on stdout failed");
tokio_timer::sleep(Duration::from_secs(5)).map_err(Error::from)
}
})
})
.for_each(|_| Ok(()))
.boxify()
}
pub fn subcommand_add_public_phases(
logger: Logger,
matches: &ArgMatches<'_>,
sub_m: &ArgMatches<'_>,
) -> BoxFuture<(), SubcommandError> {
let path = String::from(sub_m.value_of("input-file").unwrap());
let chunk_size = sub_m
.value_of("chunk-size")
.and_then(|chunk_size| chunk_size.parse::<usize>().ok())
.unwrap_or(16384);
let ctx = CoreContext::new_with_logger(logger.clone());
args::init_cachelib(&matches);
let phases = args::open_sql::<SqlPhases>(&matches)
.context("While opening SqlPhases")
.from_err();
args::open_repo(&logger, &matches)
.join(phases)
.and_then(move |(repo, phases)| {
add_public_phases(ctx, repo, Arc::new(phases), logger, path, chunk_size)
})
.from_err()
.boxify()
}

View File

@ -170,6 +170,13 @@ queries! {
WHERE repo_id = {repo_id}
AND cs_id IN {cs_ids}"
}
read SelectAllPublic(repo_id: RepositoryId) -> (ChangesetId, ) {
"SELECT cs_id
FROM phases
WHERE repo_id = {repo_id}
AND phase = 'Public'"
}
}
#[derive(Clone)]
@ -232,6 +239,15 @@ impl SqlPhases {
.map(|_| ())
.right_future()
}
pub fn list_all_public(
&self,
_ctx: CoreContext,
repo_id: RepositoryId,
) -> impl Future<Item = Vec<ChangesetId>, Error = Error> {
SelectAllPublic::query(&self.read_connection, &repo_id)
.map(|ans| ans.into_iter().map(|x| x.0).collect())
}
}
impl SqlConstructors for SqlPhases {