diff --git a/cmds/admin/cmdargs.rs b/cmds/admin/cmdargs.rs index c0b284b9c3..17f5a1d7c2 100644 --- a/cmds/admin/cmdargs.rs +++ b/cmds/admin/cmdargs.rs @@ -4,11 +4,14 @@ // This software may be used and distributed according to the terms of the // GNU General Public License version 2 or any later version. +pub const ADD_PUBLIC_PHASES: &'static str = "add-public"; pub const BLOBSTORE_FETCH: &'static str = "blobstore-fetch"; pub const BONSAI_FETCH: &'static str = "bonsai-fetch"; -pub const CONTENT_FETCH: &'static str = "content-fetch"; pub const BOOKMARKS: &'static str = "bookmarks"; -pub const SKIPLIST: &'static str = "skiplist"; +pub const CONTENT_FETCH: &'static str = "content-fetch"; +pub const FETCH_PHASE: &'static str = "fetch"; +pub const FILENODES: &'static str = "filenodes"; +pub const FILESTORE: &'static str = "filestore"; pub const HASH_CONVERT: &'static str = "convert"; pub const HG_CHANGESET: &'static str = "hg-changeset"; pub const HG_CHANGESET_DIFF: &'static str = "diff"; @@ -19,12 +22,12 @@ pub const HG_SYNC_SHOW: &'static str = "show"; pub const HG_SYNC_FETCH_BUNDLE: &'static str = "fetch-bundle"; pub const HG_SYNC_LAST_PROCESSED: &'static str = "last-processed"; pub const HG_SYNC_VERIFY: &'static str = "verify"; +pub const LIST_PUBLIC: &'static str = "list-public"; pub const SKIPLIST_BUILD: &'static str = "build"; pub const SKIPLIST_READ: &'static str = "read"; -pub const ADD_PUBLIC_PHASES: &'static str = "add-public-phases"; +pub const PHASES: &'static str = "phases"; pub const REDACTION: &'static str = "redaction"; pub const REDACTION_ADD: &'static str = "add"; pub const REDACTION_REMOVE: &'static str = "remove"; pub const REDACTION_LIST: &'static str = "list"; -pub const FILENODES: &'static str = "filenodes"; -pub const FILESTORE: &'static str = "filestore"; +pub const SKIPLIST: &'static str = "skiplist"; diff --git a/cmds/admin/main.rs b/cmds/admin/main.rs index 803fee5a80..5eff52fc30 100644 --- a/cmds/admin/main.rs +++ b/cmds/admin/main.rs @@ -5,6 +5,7 @@ // GNU General Public License version 2 or any later version. #![deny(warnings)] +#![feature(async_await)] #![feature(process_exitcode_placeholder)] use clap::{App, Arg, SubCommand}; @@ -20,11 +21,11 @@ use slog::error; use crate::blobstore_fetch::subcommand_blobstore_fetch; use crate::bonsai_fetch::subcommand_bonsai_fetch; use crate::cmdargs::{ - ADD_PUBLIC_PHASES, BLOBSTORE_FETCH, BONSAI_FETCH, BOOKMARKS, CONTENT_FETCH, FILENODES, - FILESTORE, HASH_CONVERT, HG_CHANGESET, HG_CHANGESET_DIFF, HG_CHANGESET_RANGE, HG_SYNC_BUNDLE, - HG_SYNC_FETCH_BUNDLE, HG_SYNC_LAST_PROCESSED, HG_SYNC_REMAINS, HG_SYNC_SHOW, HG_SYNC_VERIFY, - REDACTION, REDACTION_ADD, REDACTION_LIST, REDACTION_REMOVE, SKIPLIST, SKIPLIST_BUILD, - SKIPLIST_READ, + ADD_PUBLIC_PHASES, BLOBSTORE_FETCH, BONSAI_FETCH, BOOKMARKS, CONTENT_FETCH, FETCH_PHASE, + FILENODES, FILESTORE, HASH_CONVERT, HG_CHANGESET, HG_CHANGESET_DIFF, HG_CHANGESET_RANGE, + HG_SYNC_BUNDLE, HG_SYNC_FETCH_BUNDLE, HG_SYNC_LAST_PROCESSED, HG_SYNC_REMAINS, HG_SYNC_SHOW, + HG_SYNC_VERIFY, LIST_PUBLIC, PHASES, REDACTION, REDACTION_ADD, REDACTION_LIST, + REDACTION_REMOVE, SKIPLIST, SKIPLIST_BUILD, SKIPLIST_READ, }; use crate::content_fetch::subcommand_content_fetch; use crate::error::SubcommandError; @@ -32,7 +33,6 @@ use crate::filenodes::subcommand_filenodes; use crate::hash_convert::subcommand_hash_convert; use crate::hg_changeset::subcommand_hg_changeset; use crate::hg_sync::subcommand_process_hg_sync; -use crate::public_phases::subcommand_add_public_phases; use crate::redaction::subcommand_redaction; use crate::skiplist_subcommand::subcommand_skiplist; @@ -48,7 +48,7 @@ mod filestore; mod hash_convert; mod hg_changeset; mod hg_sync; -mod public_phases; +mod phases; mod redaction; mod skiplist_subcommand; @@ -238,19 +238,58 @@ fn setup_app<'a, 'b>() -> App<'a, 'b> { .about("verify the consistency of yet-to-be-processed bookmark log entries"), ); - let add_public_phases = SubCommand::with_name(ADD_PUBLIC_PHASES) - .about("mark mercurial commits as public from provided new-line separated list") - .arg( - Arg::with_name("input-file") - .help("new-line separated mercurial public commits") - .required(true) - .index(1), + let phases = SubCommand::with_name(PHASES) + .about("commands to work with phases") + .subcommand( + SubCommand::with_name(ADD_PUBLIC_PHASES) + .about("mark mercurial commits as public from provided new-line separated list") + .arg( + Arg::with_name("input-file") + .help("new-line separated mercurial public commits") + .required(true) + .index(1), + ) + .arg( + Arg::with_name("chunk-size") + .help("partition input file to chunks of specified size") + .long("chunk-size") + .takes_value(true), + ), ) - .arg( - Arg::with_name("chunk-size") - .help("partition input file to chunks of specified size") - .long("chunk-size") - .takes_value(true), + .subcommand( + SubCommand::with_name(FETCH_PHASE) + .about("fetch phase of a commit") + .arg( + Arg::with_name("changeset-type") + .long("changeset-type") + .short("c") + .takes_value(true) + .possible_values(&["bonsai", "hg"]) + .required(false) + .help( + "What changeset type to return, either bonsai or hg. Defaults to hg.", + ), + ) + .arg( + Arg::with_name("hash") + .help("changeset hash") + .takes_value(true), + ), + ) + .subcommand( + SubCommand::with_name(LIST_PUBLIC) + .arg( + Arg::with_name("changeset-type") + .long("changeset-type") + .short("c") + .takes_value(true) + .possible_values(&["bonsai", "hg"]) + .required(false) + .help( + "What changeset type to return, either bonsai or hg. Defaults to hg.", + ), + ) + .about("List all public commits"), ); let redaction = SubCommand::with_name(REDACTION) @@ -321,9 +360,9 @@ fn setup_app<'a, 'b>() -> App<'a, 'b> { .subcommand(skiplist) .subcommand(convert) .subcommand(hg_sync) - .subcommand(add_public_phases) .subcommand(redaction) .subcommand(filenodes::build_subcommand(FILENODES)) + .subcommand(phases) .subcommand(filestore::build_subcommand(FILESTORE)) } @@ -349,10 +388,10 @@ fn main() -> ExitCode { } (SKIPLIST, Some(sub_m)) => subcommand_skiplist(logger, &matches, sub_m), (HASH_CONVERT, Some(sub_m)) => subcommand_hash_convert(logger, &matches, sub_m), - (ADD_PUBLIC_PHASES, Some(sub_m)) => subcommand_add_public_phases(logger, &matches, sub_m), (REDACTION, Some(sub_m)) => subcommand_redaction(logger, &matches, sub_m), (FILENODES, Some(sub_m)) => subcommand_filenodes(logger, &matches, sub_m), (FILESTORE, Some(sub_m)) => filestore::execute_command(logger, &matches, sub_m), + (PHASES, Some(sub_m)) => phases::subcommand_phases(logger, &matches, sub_m), _ => Err(SubcommandError::InvalidArgs).into_future().boxify(), }; diff --git a/cmds/admin/phases.rs b/cmds/admin/phases.rs new file mode 100644 index 0000000000..dbaa8ad746 --- /dev/null +++ b/cmds/admin/phases.rs @@ -0,0 +1,215 @@ +// Copyright (c) 2004-present, Facebook, Inc. +// All Rights Reserved. +// +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2 or any later version. + +use crate::cmdargs::{ADD_PUBLIC_PHASES, FETCH_PHASE, LIST_PUBLIC}; +use clap::ArgMatches; +use cloned::cloned; +use failure_ext::{err_msg, Error}; +use futures::{stream, Future, IntoFuture, Stream}; +use futures_ext::{try_boxfuture, BoxFuture, FutureExt}; +use futures_preview::{ + compat::Future01CompatExt, + future::{FutureExt as PreviewFutureExt, TryFutureExt}, +}; +use std::{ + fs::File, + io::{BufRead, BufReader, Write}, + str::FromStr, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; + +use blobrepo::BlobRepo; +use cmdlib::args; +use context::CoreContext; +use mercurial_types::HgChangesetId; +use mononoke_types::ChangesetId; +use phases::{Phases, SqlPhases}; +use slog::{info, Logger}; + +use crate::error::SubcommandError; + +pub fn subcommand_phases( + logger: Logger, + matches: &ArgMatches<'_>, + sub_m: &ArgMatches<'_>, +) -> BoxFuture<(), SubcommandError> { + let repo = args::open_repo(&logger, &matches); + let phases = args::open_sql::(&matches); + args::init_cachelib(&matches); + let ctx = CoreContext::new_with_logger(logger.clone()); + + match sub_m.subcommand() { + (FETCH_PHASE, Some(sub_m)) => { + let ty = sub_m + .value_of("changeset-type") + .map(|s| s) + .unwrap_or("hg") + .to_string(); + let hash = sub_m + .value_of("hash") + .map(|s| s.to_string()) + .ok_or(err_msg("changeset hash is not specified")); + + subcommand_fetch_phase_impl(repo, phases, hash, ty) + .boxed() + .compat() + .from_err() + .boxify() + } + (ADD_PUBLIC_PHASES, Some(sub_m)) => { + let path = String::from(sub_m.value_of("input-file").unwrap()); + let chunk_size = sub_m + .value_of("chunk-size") + .and_then(|chunk_size| chunk_size.parse::().ok()) + .unwrap_or(16384); + + repo.join(phases) + .and_then(move |(repo, phases)| { + add_public_phases(ctx, repo, Arc::new(phases), logger, path, chunk_size) + }) + .from_err() + .boxify() + } + (LIST_PUBLIC, Some(sub_m)) => { + let ty = sub_m + .value_of("changeset-type") + .map(|s| s) + .unwrap_or("hg") + .to_string(); + + subcommand_list_public_impl(ctx, ty, repo, phases) + .boxed() + .compat() + .from_err() + .boxify() + } + _ => Err(SubcommandError::InvalidArgs).into_future().boxify(), + } +} + +fn add_public_phases( + ctx: CoreContext, + repo: BlobRepo, + phases: Arc, + logger: Logger, + path: impl AsRef, + chunk_size: usize, +) -> impl Future { + let file = try_boxfuture!(File::open(path.as_ref()).map_err(Error::from)); + let hg_changesets = BufReader::new(file).lines().filter_map(|id_str| { + id_str + .map_err(Error::from) + .and_then(|v| HgChangesetId::from_str(&v)) + .ok() + }); + let entries_processed = Arc::new(AtomicUsize::new(0)); + info!(logger, "start processing hashes"); + stream::iter_ok(hg_changesets) + .chunks(chunk_size) + .and_then(move |chunk| { + let count = chunk.len(); + repo.get_hg_bonsai_mapping(ctx.clone(), chunk) + .and_then({ + cloned!(ctx, repo, phases); + move |changesets| { + phases.add_public( + ctx, + repo, + changesets.into_iter().map(|(_, cs)| cs).collect(), + ) + } + }) + .and_then({ + cloned!(entries_processed); + move |_| { + print!( + "\x1b[Khashes processed: {}\r", + entries_processed.fetch_add(count, Ordering::SeqCst) + count, + ); + std::io::stdout().flush().expect("flush on stdout failed"); + tokio_timer::sleep(Duration::from_secs(5)).map_err(Error::from) + } + }) + }) + .for_each(|_| Ok(())) + .boxify() +} + +async fn subcommand_list_public_impl( + ctx: CoreContext, + ty: String, + repo: impl Future, + phases: impl Future, +) -> Result<(), Error> { + let repo = repo.compat().await?; + let phases = phases.compat().await?; + + let public = phases + .list_all_public(ctx.clone(), repo.get_repoid()) + .compat() + .await?; + if ty == "bonsai" { + for p in public { + println!("{}", p); + } + } else { + for chunk in public.chunks(1000) { + let bonsais: Vec<_> = chunk.iter().cloned().collect(); + let hg_bonsais = repo + .get_hg_bonsai_mapping(ctx.clone(), bonsais) + .compat() + .await?; + let hg_css: Vec = hg_bonsais + .clone() + .into_iter() + .map(|(hg_cs_id, _)| hg_cs_id) + .collect(); + + for hg_cs in hg_css { + println!("{}", hg_cs); + } + } + } + Ok(()) +} + +pub async fn subcommand_fetch_phase_impl<'a>( + repo: impl Future, + phases: impl Future, + hash: Result, + ty: String, +) -> Result<(), Error> { + let ctx = CoreContext::test_mock(); + let repo = repo.compat().await?; + let phases = phases.compat().await?; + let hash = hash?; + + let bcs_id = if ty == "bonsai" { + ChangesetId::from_str(&hash)? + } else if ty == "hg" { + let maybe_bonsai = repo + .get_bonsai_from_hg(ctx.clone(), HgChangesetId::from_str(&hash)?) + .compat() + .await?; + maybe_bonsai.ok_or(err_msg(format!("bonsai not found for {}", hash)))? + } else { + return Err(err_msg(format!("unknown hash type: {}", ty))); + }; + + let public_phases = phases.get_public(ctx, repo, vec![bcs_id]).compat().await?; + + if public_phases.contains(&bcs_id) { + println!("public"); + } else { + println!("draft"); + } + + Ok(()) +} diff --git a/cmds/admin/public_phases.rs b/cmds/admin/public_phases.rs deleted file mode 100644 index b496b07596..0000000000 --- a/cmds/admin/public_phases.rs +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright (c) 2004-present, Facebook, Inc. -// All Rights Reserved. -// -// This software may be used and distributed according to the terms of the -// GNU General Public License version 2 or any later version. - -use clap::ArgMatches; -use cloned::cloned; -use failure_ext::{Error, FutureFailureErrorExt}; -use futures::{stream, Future, Stream}; -use futures_ext::{try_boxfuture, BoxFuture, FutureExt}; -use std::{ - fs::File, - io::{BufRead, BufReader, Write}, - str::FromStr, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - time::Duration, -}; - -use blobrepo::BlobRepo; -use cmdlib::args; -use context::CoreContext; -use mercurial_types::HgChangesetId; -use phases::SqlPhases; -use slog::{info, Logger}; - -use crate::error::SubcommandError; - -fn add_public_phases( - ctx: CoreContext, - repo: BlobRepo, - phases: Arc, - logger: Logger, - path: impl AsRef, - chunk_size: usize, -) -> impl Future { - let file = try_boxfuture!(File::open(path.as_ref()).map_err(Error::from)); - let hg_changesets = BufReader::new(file).lines().filter_map(|id_str| { - id_str - .map_err(Error::from) - .and_then(|v| HgChangesetId::from_str(&v)) - .ok() - }); - let entries_processed = Arc::new(AtomicUsize::new(0)); - info!(logger, "start processing hashes"); - stream::iter_ok(hg_changesets) - .chunks(chunk_size) - .and_then(move |chunk| { - let count = chunk.len(); - repo.get_hg_bonsai_mapping(ctx.clone(), chunk) - .and_then({ - cloned!(ctx, repo, phases); - move |changesets| { - phases.add_public( - ctx, - repo, - changesets.into_iter().map(|(_, cs)| cs).collect(), - ) - } - }) - .and_then({ - cloned!(entries_processed); - move |_| { - print!( - "\x1b[Khashes processed: {}\r", - entries_processed.fetch_add(count, Ordering::SeqCst) + count, - ); - std::io::stdout().flush().expect("flush on stdout failed"); - tokio_timer::sleep(Duration::from_secs(5)).map_err(Error::from) - } - }) - }) - .for_each(|_| Ok(())) - .boxify() -} - -pub fn subcommand_add_public_phases( - logger: Logger, - matches: &ArgMatches<'_>, - sub_m: &ArgMatches<'_>, -) -> BoxFuture<(), SubcommandError> { - let path = String::from(sub_m.value_of("input-file").unwrap()); - let chunk_size = sub_m - .value_of("chunk-size") - .and_then(|chunk_size| chunk_size.parse::().ok()) - .unwrap_or(16384); - let ctx = CoreContext::new_with_logger(logger.clone()); - args::init_cachelib(&matches); - - let phases = args::open_sql::(&matches) - .context("While opening SqlPhases") - .from_err(); - - args::open_repo(&logger, &matches) - .join(phases) - .and_then(move |(repo, phases)| { - add_public_phases(ctx, repo, Arc::new(phases), logger, path, chunk_size) - }) - .from_err() - .boxify() -} diff --git a/phases/src/lib.rs b/phases/src/lib.rs index 7eaba7fe91..d8922d8912 100644 --- a/phases/src/lib.rs +++ b/phases/src/lib.rs @@ -170,6 +170,13 @@ queries! { WHERE repo_id = {repo_id} AND cs_id IN {cs_ids}" } + + read SelectAllPublic(repo_id: RepositoryId) -> (ChangesetId, ) { + "SELECT cs_id + FROM phases + WHERE repo_id = {repo_id} + AND phase = 'Public'" + } } #[derive(Clone)] @@ -232,6 +239,15 @@ impl SqlPhases { .map(|_| ()) .right_future() } + + pub fn list_all_public( + &self, + _ctx: CoreContext, + repo_id: RepositoryId, + ) -> impl Future, Error = Error> { + SelectAllPublic::query(&self.read_connection, &repo_id) + .map(|ans| ans.into_iter().map(|x| x.0).collect()) + } } impl SqlConstructors for SqlPhases {