tool to manually populate public phases

Summary: Tool to populate public commits

Reviewed By: HarveyHunt

Differential Revision: D14686428

fbshipit-source-id: 98457596c273ca2561bc457d35275bb15e5415e0
This commit is contained in:
Pavel Aslanov 2019-04-03 03:37:04 -07:00 committed by Facebook Github Bot
parent 731b56b977
commit a5ecb01926

View File

@ -5,11 +5,26 @@
// GNU General Public License version 2 or any later version.
#![deny(warnings)]
use std::borrow::Borrow;
use std::collections::{BTreeMap, HashMap};
use std::fmt;
use std::fs::File;
use std::io::{self, BufRead, BufReader, Write};
use std::str::FromStr;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::time::Duration;
mod bookmarks_manager;
use clap::{App, Arg, ArgMatches, SubCommand};
use cloned::cloned;
use failure_ext::{err_msg, format_err, Error, Result};
use futures::future::{self, loop_fn, ok, Loop};
use futures::prelude::*;
use futures::stream::iter_ok;
use futures_ext::{try_boxfuture, BoxFuture, FutureExt};
use rust_thrift::compact_protocol;
use serde_derive::Serialize;
use blobrepo::BlobRepo;
@ -19,15 +34,9 @@ use bookmarks::{Bookmark, Bookmarks};
use cacheblob::{new_memcache_blobstore, CacheBlobstoreExt};
use changeset_fetcher::ChangesetFetcher;
use changesets::{ChangesetEntry, Changesets, SqlChangesets};
use clap::{App, Arg, ArgMatches, SubCommand};
use cmdlib::args;
use context::CoreContext;
use dbbookmarks::SqlBookmarks;
use failure_ext::{err_msg, format_err, Error, Result};
use futures::future::{self, loop_fn, ok, Loop};
use futures::prelude::*;
use futures::stream::iter_ok;
use futures_ext::{try_boxfuture, BoxFuture, FutureExt};
use manifoldblob::ManifoldBlob;
use mercurial_types::manifest::Content;
use mercurial_types::{
@ -40,17 +49,13 @@ use mononoke_types::{
FileContents, Generation, RepositoryId,
};
use mutable_counters::{MutableCounters, SqlMutableCounters};
use phases::{Phase, Phases, SqlPhases};
use prefixblob::PrefixBlobstore;
use revset::RangeNodeStream;
use rust_thrift::compact_protocol;
use skiplist::{deserialize_skiplist_map, SkiplistIndex, SkiplistNodeType};
use slog::{debug, info, warn, Logger};
use std::borrow::Borrow;
use std::collections::{BTreeMap, HashMap};
use std::fmt;
use std::io;
use std::str::FromStr;
use std::sync::Arc;
mod bookmarks_manager;
const BLOBSTORE_FETCH: &'static str = "blobstore-fetch";
const BONSAI_FETCH: &'static str = "bonsai-fetch";
@ -66,6 +71,7 @@ const HG_SYNC_REMAINS: &'static str = "remains";
const HG_SYNC_LAST_PROCESSED: &'static str = "last-processed";
const SKIPLIST_BUILD: &'static str = "build";
const SKIPLIST_READ: &'static str = "read";
const ADD_PUBLIC_PHASES: &'static str = "add-public-phases";
fn setup_app<'a, 'b>() -> App<'a, 'b> {
let blobstore_fetch = SubCommand::with_name(BLOBSTORE_FETCH)
@ -195,6 +201,21 @@ fn setup_app<'a, 'b>() -> App<'a, 'b> {
),
);
let add_public_phases = SubCommand::with_name(ADD_PUBLIC_PHASES)
.about("mark mercurial commits as public from provided new-line separated list")
.arg(
Arg::with_name("input-file")
.help("new-line separated mercurial public commits")
.required(true)
.index(1),
)
.arg(
Arg::with_name("chunk-size")
.help("partition input file to chunks of specified size")
.long("chunk-size")
.takes_value(true),
);
let app = args::MononokeApp {
safe_writes: false,
hide_advanced_args: true,
@ -214,6 +235,7 @@ fn setup_app<'a, 'b>() -> App<'a, 'b> {
.subcommand(skiplist)
.subcommand(convert)
.subcommand(hg_sync)
.subcommand(add_public_phases)
}
fn fetch_content_from_manifest(
@ -659,6 +681,54 @@ fn read_skiplist_index<S: ToString>(
.boxify()
}
fn add_public_phases(
ctx: CoreContext,
repo: BlobRepo,
phases: Arc<SqlPhases>,
logger: Logger,
path: impl AsRef<str>,
chunk_size: usize,
) -> BoxFuture<(), Error> {
let file = try_boxfuture!(File::open(path.as_ref()).map_err(Error::from));
let hg_changesets = BufReader::new(file).lines().filter_map(|id_str| {
id_str
.map_err(Error::from)
.and_then(|v| HgChangesetId::from_str(&v))
.ok()
});
let entries_processed = Arc::new(AtomicUsize::new(0));
info!(logger, "start processing hashes");
iter_ok(hg_changesets)
.chunks(chunk_size)
.and_then(move |chunk| {
let count = chunk.len();
repo.get_hg_bonsai_mapping(ctx.clone(), chunk)
.map(|changesets| {
changesets
.into_iter()
.map(|(_, cs)| (cs, Phase::Public))
.collect()
})
.and_then({
cloned!(ctx, repo, phases);
move |phases_mapping| phases.add_all(ctx, repo, phases_mapping)
})
.and_then({
cloned!(entries_processed);
move |_| {
print!(
"\x1b[Khashes processed: {}\r",
entries_processed.fetch_add(count, Ordering::SeqCst) + count,
);
std::io::stdout().flush().expect("flush on stdout failed");
tokio_timer::sleep(Duration::from_secs(5)).map_err(Error::from)
}
})
})
.for_each(|_| ok(()))
.boxify()
}
const LATEST_REPLAYED_REQUEST_KEY: &'static str = "latest-replayed-request";
fn process_hg_sync_subcommand<'a>(
@ -732,10 +802,9 @@ fn process_hg_sync_subcommand<'a>(
})
.and_then({
cloned!(ctx, repo_id);
move |counter|
bookmarks.count_further_bookmark_log_entries(
ctx, counter as u64, repo_id
)
move |counter| {
bookmarks.count_further_bookmark_log_entries(ctx, counter as u64, repo_id)
}
})
.map({
cloned!(logger, repo_id);
@ -743,14 +812,20 @@ fn process_hg_sync_subcommand<'a>(
if quiet {
println!("{}", remaining);
} else {
info!(logger, "Remaining bundles to replay in {:?}: {}", repo_id, remaining);
info!(
logger,
"Remaining bundles to replay in {:?}: {}", repo_id, remaining
);
}
}
})
.map_err({
cloned!(logger, repo_id);
move |e| {
info!(logger, "Failed to fetch remaining bundles to replay for {:?}", repo_id);
info!(
logger,
"Failed to fetch remaining bundles to replay for {:?}", repo_id
);
e
}
})
@ -1039,7 +1114,9 @@ fn main() -> Result<()> {
::std::process::exit(1);
}
},
(HG_SYNC_BUNDLE, Some(sub_m)) => process_hg_sync_subcommand(sub_m, &matches, repo_id, logger.clone()),
(HG_SYNC_BUNDLE, Some(sub_m)) => {
process_hg_sync_subcommand(sub_m, &matches, repo_id, logger.clone())
}
(SKIPLIST, Some(sub_m)) => match sub_m.subcommand() {
(SKIPLIST_BUILD, Some(sub_m)) => {
let key = sub_m
@ -1122,6 +1199,23 @@ fn main() -> Result<()> {
})
.boxify()
}
(ADD_PUBLIC_PHASES, Some(sub_m)) => {
let path = String::from(sub_m.value_of("input-file").unwrap());
let chunk_size = sub_m
.value_of("chunk-size")
.and_then(|chunk_size| chunk_size.parse::<usize>().ok())
.unwrap_or(16384);
let ctx = CoreContext::test_mock();
args::init_cachelib(&matches);
let phases: Arc<SqlPhases> = Arc::new(
args::open_sql(&matches, "phases").expect("Failed to open the db with phases"),
);
args::open_repo(&logger, &matches)
.and_then(move |repo| {
add_public_phases(ctx, repo, phases, logger, path, chunk_size)
})
.boxify()
}
_ => {
println!("{}", matches.usage());
::std::process::exit(1);