sapling/eden/mononoke/cmds/statistics_collector.rs
Simon Farnsworth 454de31134 Switch Loadable and Storable interfaces to new-style futures
Summary:
Eventually, we want everything to be `async`/`await`; as a stepping stone in that direction, switch some of the blobstore interfaces to new-style `BoxFuture` with a `'static` lifetime.

This does not enable any fixes at this point, but does mean that `.compat()` moves to the places that need old-style futures instead of new. It also means that the work needed to make the transition fully complete is changed from a full conversion to new futures, to simply changing the lifetimes involved and fixing the resulting compile failures.

Reviewed By: krallin

Differential Revision: D22164315

fbshipit-source-id: dc655c36db4711d84d42d1e81b76e5dddd16f59d
2020-06-25 08:45:37 -07:00

785 lines
26 KiB
Rust

/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
#![deny(warnings)]
use anyhow::Error;
use blobrepo::BlobRepo;
use blobrepo_hg::BlobRepoHg;
use blobstore::{Blobstore, Loadable};
use bookmarks::BookmarkName;
use bytes::Bytes;
use changesets::{deserialize_cs_entries, ChangesetEntry};
use clap::{App, Arg, ArgMatches, SubCommand};
use cmdlib::{args, helpers::block_execute};
use context::CoreContext;
use fbinit::FacebookInit;
use futures::compat::{Future01CompatExt, Stream01CompatExt};
use futures::stream::{self, StreamExt, TryStreamExt};
use futures::{future::FutureExt, try_join};
use futures_ext::{BoxStream, StreamExt as OldStreamExt};
use futures_old::stream::Stream;
use manifest::{Diff, Entry, ManifestOps};
use mercurial_types::{FileBytes, HgChangesetId, HgFileNodeId, HgManifestId};
use mononoke_types::{FileType, RepositoryId};
use scuba_ext::ScubaSampleBuilder;
use slog::{info, Logger};
use stats::prelude::*;
use std::collections::HashMap;
use std::fs;
use std::ops::{Add, Sub};
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::delay_for;
define_stats! {
prefix = "mononoke.statistics_collector";
calculated_changesets: timeseries(Rate, Sum),
}
const ARG_IN_FILENAME: &str = "in-filename";
const SUBCOMMAND_STATISTICS_FROM_FILE: &str = "statistics-from-commits-in-file";
const SCUBA_DATASET_NAME: &str = "mononoke_repository_statistics";
// Tool doesn't count number of lines from files with size greater than 10MB
const BIG_FILE_THRESHOLD: u64 = 10000000;
fn setup_app<'a, 'b>() -> App<'a, 'b> {
args::MononokeApp::new("Tool to calculate repo statistic")
.with_fb303_args()
.build()
.version("0.0.0")
.subcommand(
SubCommand::with_name(SUBCOMMAND_STATISTICS_FROM_FILE)
.about(
"calculate statistics for commits in provided file and save them to json file",
)
.arg(
Arg::with_name(ARG_IN_FILENAME)
.long(ARG_IN_FILENAME)
.takes_value(true)
.required(true)
.help("a file with a list of bonsai changesets to calculate stats for"),
),
)
.arg(
Arg::with_name("bookmark")
.long("bookmark")
.takes_value(true)
.required(false)
.help("bookmark from which we get statistics"),
)
.arg(
Arg::with_name("log-to-scuba")
.long("log-to-scuba")
.takes_value(false)
.required(false)
.help("if set then statistics are logged to scuba"),
)
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct RepoStatistics {
num_files: i64,
total_file_size: i64,
num_lines: i64,
}
impl RepoStatistics {
pub fn new(num_files: i64, total_file_size: i64, num_lines: i64) -> Self {
Self {
num_files,
total_file_size,
num_lines,
}
}
}
impl Add for RepoStatistics {
type Output = RepoStatistics;
fn add(self, other: Self) -> Self {
Self {
num_files: self.num_files + other.num_files,
total_file_size: self.total_file_size + other.total_file_size,
num_lines: self.num_lines + other.num_lines,
}
}
}
impl Sub for RepoStatistics {
type Output = RepoStatistics;
fn sub(self, other: Self) -> Self {
Self {
num_files: self.num_files - other.num_files,
total_file_size: self.total_file_size - other.total_file_size,
num_lines: self.num_lines - other.num_lines,
}
}
}
pub async fn number_of_lines(bytes_stream: BoxStream<FileBytes, Error>) -> Result<i64, Error> {
bytes_stream
.compat()
.map_ok(|bytes| {
bytes.into_iter().fold(0, |num_lines, byte| {
if byte == '\n' as u8 {
num_lines + 1
} else {
num_lines
}
})
})
.try_fold(0, |result, num_lines| async move {
Ok::<_, Error>(result + num_lines)
})
.await
}
pub async fn get_manifest_from_changeset(
ctx: &CoreContext,
repo: &BlobRepo,
hg_cs_id: &HgChangesetId,
) -> Result<HgManifestId, Error> {
let changeset = hg_cs_id.load(ctx.clone(), repo.blobstore()).await?;
Ok(changeset.manifestid())
}
pub async fn get_changeset_timestamp_from_changeset(
ctx: &CoreContext,
repo: &BlobRepo,
hg_cs_id: &HgChangesetId,
) -> Result<i64, Error> {
let changeset = hg_cs_id.load(ctx.clone(), repo.blobstore()).await?;
Ok(changeset.time().timestamp_secs())
}
// Calculates number of lines only for regular-type file
pub async fn get_statistics_from_entry(
ctx: &CoreContext,
repo: &BlobRepo,
entry: Entry<HgManifestId, (FileType, HgFileNodeId)>,
) -> Result<RepoStatistics, Error> {
match entry {
Entry::Leaf((file_type, filenode_id)) => {
let envelope = filenode_id.load(ctx.clone(), repo.blobstore()).await?;
let size = envelope.content_size();
let content_id = envelope.content_id();
let lines = if FileType::Regular == file_type && size < BIG_FILE_THRESHOLD {
let content = filestore::fetch_stream(repo.blobstore(), ctx.clone(), content_id)
.map(FileBytes)
.boxify();
number_of_lines(content).await?
} else {
0
};
Ok(RepoStatistics::new(1, size as i64, lines))
}
Entry::Tree(_) => Ok(RepoStatistics::default()),
}
}
pub async fn get_statistics_from_changeset(
ctx: &CoreContext,
repo: &BlobRepo,
blobstore: &(impl Blobstore + Clone),
hg_cs_id: &HgChangesetId,
) -> Result<RepoStatistics, Error> {
info!(
ctx.logger(),
"Started calculating statistics for changeset {}", hg_cs_id
);
let manifest_id = get_manifest_from_changeset(ctx, repo, hg_cs_id).await?;
let statistics = manifest_id
.list_leaf_entries(ctx.clone(), blobstore.clone())
.compat()
.map(move |result| match result {
Ok((_, leaf)) => get_statistics_from_entry(ctx, repo, Entry::Leaf(leaf)).boxed(),
Err(e) => async move { Err(e) }.boxed(),
})
.buffered(100usize)
.try_fold(
RepoStatistics::default(),
|statistics, new_stat| async move { Ok::<_, Error>(statistics + new_stat) },
)
.await?;
info!(
ctx.logger(),
"Finished calculating statistics for changeset {}", hg_cs_id
);
Ok(statistics)
}
pub async fn update_statistics<'a>(
ctx: &'a CoreContext,
repo: &'a BlobRepo,
statistics: RepoStatistics,
diff: BoxStream<Diff<Entry<HgManifestId, (FileType, HgFileNodeId)>>, Error>,
) -> Result<RepoStatistics, Error> {
diff.compat()
.map(move |result| async move {
let diff = result?;
match diff {
Diff::Added(_, entry) => {
let stat = get_statistics_from_entry(ctx, repo, entry).await?;
Ok((stat, Operation::Add))
}
Diff::Removed(_, entry) => {
let stat = get_statistics_from_entry(ctx, repo, entry).await?;
Ok((stat, Operation::Sub))
}
Diff::Changed(_, old_entry, new_entry) => {
let (old_stats, new_stats) = try_join!(
get_statistics_from_entry(ctx, repo, old_entry),
get_statistics_from_entry(ctx, repo, new_entry),
)?;
let stat = new_stats - old_stats;
Ok((stat, Operation::Add))
}
}
})
.buffered(100usize)
.try_fold(
statistics,
|statistics, (file_stats, operation)| async move {
match operation {
Operation::Add => Ok::<_, Error>(statistics + file_stats),
Operation::Sub => Ok::<_, Error>(statistics - file_stats),
}
},
)
.await
}
pub fn log_statistics(
ctx: &CoreContext,
mut scuba_logger: ScubaSampleBuilder,
cs_timestamp: i64,
repo_name: &String,
hg_cs_id: &HgChangesetId,
statistics: &RepoStatistics,
) {
info!(
ctx.logger(),
"Statistics for changeset {}\nCs timestamp: {}\nNumber of files {}\nTotal file size {}\nNumber of lines {}",
hg_cs_id,
cs_timestamp,
statistics.num_files,
statistics.total_file_size,
statistics.num_lines
);
scuba_logger
.add("repo_name", repo_name.clone())
.add("num_files", statistics.num_files)
.add("total_file_size", statistics.total_file_size)
.add("num_lines", statistics.num_lines)
.add("changeset", hg_cs_id.to_hex().to_string())
.log_with_time(cs_timestamp as u64);
}
fn parse_serialized_commits<P: AsRef<Path>>(file: P) -> Result<Vec<ChangesetEntry>, Error> {
let data = fs::read(file).map_err(Error::from)?;
deserialize_cs_entries(&Bytes::from(data))
}
pub async fn generate_statistics_from_file<P: AsRef<Path>>(
ctx: &CoreContext,
repo: &BlobRepo,
in_path: &P,
) -> Result<(), Error> {
// 1 day in seconds
const REQUIRED_COMMITS_DISTANCE: i64 = 60 * 60 * 24;
let blobstore = Arc::new(repo.get_blobstore());
// TODO(dgrzegorzewski): T55705023 consider creating csv file here and save statistics using
// e.g. serde deserialize. To avoid saving fields separately it may be necessary to add new
// fields to RepoStatistics struct, like cs_timestamp, hg_cs_id, repo_id and refactor code.
println!("repo_id,hg_cs_id,cs_timestamp,num_files,total_file_size,num_lines");
let changesets = parse_serialized_commits(in_path)?;
info!(ctx.logger(), "Started calculating changesets timestamps");
let mut changeset_info_vec = stream::iter(changesets)
.map({
move |changeset| async move {
let ChangesetEntry { repo_id, cs_id, .. } = changeset;
let hg_cs_id = repo
.get_hg_from_bonsai_changeset(ctx.clone(), cs_id)
.compat()
.await?;
let cs_timestamp =
get_changeset_timestamp_from_changeset(ctx, repo, &hg_cs_id).await?;
// the error type annotation in principle should be inferred,
// but the compiler currently needs it. See https://fburl.com/n1s2ujjb
Ok::<(HgChangesetId, i64, RepositoryId), Error>((hg_cs_id, cs_timestamp, repo_id))
}
})
.buffered(100)
.try_collect::<Vec<_>>()
.await?;
info!(
ctx.logger(),
"Timestamps calculated, sorting them and starting calculating statistics"
);
changeset_info_vec.sort_by_key(|(_, cs_timestamp, _)| cs_timestamp.clone());
// accumulate stats into a map
let mut repo_stats_map = HashMap::<RepositoryId, (i64, HgChangesetId, RepoStatistics)>::new();
for (hg_cs_id, cs_timestamp, repo_id) in changeset_info_vec {
match repo_stats_map.get(&repo_id).cloned() {
Some((old_cs_timestamp, old_hg_cs_id, old_stats)) => {
// Calculate statistics for changeset only if changeset
// was created at least REQUIRED_COMMITS_DISTANCE seconds after
// changeset we used previously to calculate statistics.
if cs_timestamp - old_cs_timestamp <= REQUIRED_COMMITS_DISTANCE {
continue;
}
info!(
ctx.logger(),
"Changeset {} with timestamp {} was created more than {} seconds after previous, calculating statistics for it",
hg_cs_id, cs_timestamp, REQUIRED_COMMITS_DISTANCE
);
let (old_manifest, manifest) = try_join!(
get_manifest_from_changeset(ctx, repo, &old_hg_cs_id,),
get_manifest_from_changeset(ctx, repo, &hg_cs_id),
)?;
let statistics = update_statistics(
ctx,
repo,
old_stats,
old_manifest.diff(ctx.clone(), blobstore.clone(), manifest.clone()),
)
.await?;
info!(
ctx.logger(),
"Statistics for changeset {} calculated", hg_cs_id
);
println!(
"{},{},{},{},{},{}",
repo_id.id(),
hg_cs_id.to_hex(),
cs_timestamp,
statistics.num_files,
statistics.total_file_size,
statistics.num_lines
);
repo_stats_map.insert(repo_id, (cs_timestamp, hg_cs_id, statistics));
}
None => {
info!(
ctx.logger(),
"Found first changeset for repo_id {}",
repo_id.id()
);
let statistics =
get_statistics_from_changeset(ctx, repo, &blobstore, &hg_cs_id).await?;
info!(
ctx.logger(),
"First changeset for repo_id {} calculated",
repo_id.id()
);
println!(
"{},{},{},{},{},{}",
repo_id.id(),
hg_cs_id.to_hex().to_string(),
cs_timestamp,
statistics.num_files,
statistics.total_file_size,
statistics.num_lines
);
repo_stats_map.insert(repo_id, (cs_timestamp, hg_cs_id, statistics));
}
}
}
Ok(())
}
enum Operation {
Add,
Sub,
}
#[allow(unreachable_code)]
async fn run_statistics<'a>(
fb: FacebookInit,
ctx: CoreContext,
logger: &Logger,
scuba_logger: ScubaSampleBuilder,
matches: ArgMatches<'a>,
repo_name: String,
bookmark: BookmarkName,
) -> Result<(), Error> {
let repo = args::open_repo(fb, &logger, &matches).compat().await?;
if let (SUBCOMMAND_STATISTICS_FROM_FILE, Some(sub_m)) = matches.subcommand() {
// Both arguments are set to be required
let in_filename = sub_m
.value_of(ARG_IN_FILENAME)
.expect("missing required argument");
return Ok(generate_statistics_from_file(&ctx, &repo, &in_filename).await?);
}
let blobstore = Arc::new(repo.get_blobstore());
let mut changeset = repo
.get_bookmark(ctx.clone(), &bookmark)
.compat()
.await?
.ok_or(Error::msg("cannot load bookmark"))?;
// initialize the loop
let mut statistics = get_statistics_from_changeset(&ctx, &repo, &blobstore, &changeset).await?;
let cs_timestamp = get_changeset_timestamp_from_changeset(&ctx, &repo, &changeset).await?;
log_statistics(
&ctx,
scuba_logger.clone(),
cs_timestamp,
&repo_name,
&changeset,
&statistics,
);
STATS::calculated_changesets.add_value(1);
// run the loop
loop {
let prev_changeset = changeset;
changeset = repo
.get_bookmark(ctx.clone(), &bookmark)
.compat()
.await?
.ok_or(Error::msg("cannot load bookmark"))?;
if prev_changeset == changeset {
let duration = Duration::from_millis(1000);
info!(
ctx.logger(),
"Changeset hasn't changed, sleeping {:?}", duration
);
delay_for(duration).await;
} else {
info!(
ctx.logger(),
"Found new changeset: {}, updating statistics", changeset
);
let (prev_manifest_id, cur_manifest_id) = try_join!(
get_manifest_from_changeset(&ctx, &repo, &prev_changeset),
get_manifest_from_changeset(&ctx, &repo, &changeset),
)?;
statistics = update_statistics(
&ctx,
&repo,
statistics,
prev_manifest_id.diff(ctx.clone(), blobstore.clone(), cur_manifest_id.clone()),
)
.await?;
info!(ctx.logger(), "Statistics for new changeset updated.");
let cs_timestamp =
get_changeset_timestamp_from_changeset(&ctx, &repo, &changeset).await?;
log_statistics(
&ctx,
scuba_logger.clone(),
cs_timestamp,
&repo_name,
&changeset,
&statistics,
);
STATS::calculated_changesets.add_value(1);
}
}
// unreachable, but needed so that the future has type Result
// which lets us propagate Errors to main.
Ok(())
}
#[fbinit::main]
fn main(fb: FacebookInit) -> Result<(), Error> {
let matches = setup_app().get_matches();
args::init_cachelib(fb, &matches, None);
let logger = args::init_logging(fb, &matches);
let ctx = CoreContext::new_with_logger(fb, logger.clone());
let bookmark = match matches.value_of("bookmark") {
Some(name) => name.to_string(),
None => String::from("master"),
};
let bookmark = BookmarkName::new(bookmark.clone())?;
let repo_name = args::get_repo_name(fb, &matches)?;
let scuba_logger = if matches.is_present("log-to-scuba") {
ScubaSampleBuilder::new(fb, SCUBA_DATASET_NAME)
} else {
ScubaSampleBuilder::with_discard()
};
block_execute(
run_statistics(
fb,
ctx,
&logger,
scuba_logger,
matches.clone(),
repo_name,
bookmark,
),
fb,
"statistics_collector",
&logger,
&matches,
cmdlib::monitoring::AliveService,
)
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use fixtures::linear;
use futures::future::TryFutureExt;
use futures_old::stream as old_stream;
use maplit::btreemap;
use std::str::FromStr;
use tests_utils::{create_commit, store_files};
use tokio_compat::runtime::Runtime;
#[test]
fn test_number_of_lines_empty_stream() -> Result<(), Error> {
let mut rt = Runtime::new().unwrap();
let stream: BoxStream<FileBytes, Error> =
Box::new(old_stream::once(Ok(FileBytes(Bytes::from(&b""[..])))));
let result = rt.block_on(number_of_lines(stream).boxed().compat())?;
assert_eq!(result, 0);
Ok(())
}
#[test]
fn test_number_of_lines_one_line() -> Result<(), Error> {
let mut rt = Runtime::new().unwrap();
let stream: BoxStream<FileBytes, Error> = Box::new(old_stream::once(Ok(FileBytes(
Bytes::from(&b"First line\n"[..]),
))));
let result = rt.block_on(number_of_lines(stream).boxed().compat())?;
assert_eq!(result, 1);
Ok(())
}
#[test]
fn test_number_of_lines_many_lines() -> Result<(), Error> {
let mut rt = Runtime::new().unwrap();
let stream: BoxStream<FileBytes, Error> = Box::new(old_stream::once(Ok(FileBytes(
Bytes::from(&b"First line\nSecond line\nThird line\n"[..]),
))));
let result = rt.block_on(number_of_lines(stream).boxed().compat())?;
assert_eq!(result, 3);
Ok(())
}
#[test]
fn test_number_of_lines_many_items() -> Result<(), Error> {
let mut rt = Runtime::new().unwrap();
let vec = vec![
FileBytes(Bytes::from(&b"First line\n"[..])),
FileBytes(Bytes::from(&b""[..])),
FileBytes(Bytes::from(&b"First line\nSecond line\nThird line\n"[..])),
];
let stream: BoxStream<FileBytes, Error> = Box::new(old_stream::iter_ok(vec));
let result = rt.block_on(number_of_lines(stream).boxed().compat())?;
assert_eq!(result, 4);
Ok(())
}
#[fbinit::test]
fn linear_test_get_statistics_from_changeset(fb: FacebookInit) {
let mut runtime = Runtime::new().unwrap();
runtime.block_on_std(async move {
let repo = linear::getrepo(fb).await;
let ctx = CoreContext::test_mock(fb);
let blobstore = repo.get_blobstore();
// Commit consists two files (name => content):
// "1" => "1\n"
// "files" => "1\n"
// */
let root = HgChangesetId::from_str("2d7d4ba9ce0a6ffd222de7785b249ead9c51c536").unwrap();
let p = repo
.get_bonsai_from_hg(ctx.clone(), root)
.compat()
.await
.unwrap()
.unwrap();
let parents = vec![p];
let bcs_id = create_commit(
ctx.clone(),
repo.clone(),
parents,
store_files(
ctx.clone(),
btreemap! {
"dir1/dir2/file1" => Some("first line\nsecond line\n"),
"dir1/dir3/file2" => Some("first line\n"),
},
repo.clone(),
)
.await,
)
.await;
let hg_cs_id = repo
.get_hg_from_bonsai_changeset(ctx.clone(), bcs_id)
.compat()
.await
.unwrap();
let stats = get_statistics_from_changeset(&ctx, &repo, &blobstore, &hg_cs_id)
.await
.unwrap();
// (num_files, total_file_size, num_lines)
assert_eq!(stats, RepoStatistics::new(4, 38, 5));
});
}
#[fbinit::test]
fn linear_test_get_statistics_from_entry_tree(fb: FacebookInit) {
let mut runtime = Runtime::new().unwrap();
runtime.block_on_std(async move {
let repo = linear::getrepo(fb).await;
let ctx = CoreContext::test_mock(fb);
let blobstore = repo.get_blobstore();
// Commit consists two files (name => content):
// "1" => "1\n"
// "files" => "1\n"
// */
let root = HgChangesetId::from_str("2d7d4ba9ce0a6ffd222de7785b249ead9c51c536").unwrap();
let p = repo
.get_bonsai_from_hg(ctx.clone(), root)
.compat()
.await
.unwrap()
.unwrap();
let parents = vec![p];
let bcs_id = create_commit(
ctx.clone(),
repo.clone(),
parents,
store_files(
ctx.clone(),
btreemap! {
"dir1/dir2/file1" => Some("first line\nsecond line\n"),
"dir1/dir3/file2" => Some("first line\n"),
},
repo.clone(),
)
.await,
)
.await;
let hg_cs_id = repo
.get_hg_from_bonsai_changeset(ctx.clone(), bcs_id)
.compat()
.await
.unwrap();
let manifest = get_manifest_from_changeset(&ctx, &repo, &hg_cs_id)
.await
.unwrap();
let mut tree_entries = manifest
.list_all_entries(ctx.clone(), blobstore.clone())
.filter_map(|(_, entry)| match entry {
Entry::Tree(_) => Some(entry),
_ => None,
})
.collect()
.compat()
.await
.unwrap();
let stats = get_statistics_from_entry(&ctx, &repo, tree_entries.pop().unwrap())
.await
.unwrap();
// For Entry::Tree we expect repository with all statistics equal 0
// (num_files, total_file_size, num_lines)
assert_eq!(stats, RepoStatistics::default());
});
}
#[fbinit::test]
fn linear_test_update_statistics(fb: FacebookInit) {
let mut runtime = Runtime::new().unwrap();
runtime.block_on_std(async move {
let repo = linear::getrepo(fb).await;
let ctx = CoreContext::test_mock(fb);
let blobstore = repo.get_blobstore();
/*
Commit consists two files (name => content):
"1" => "1\n"
"files" => "1\n"
*/
let prev_hg_cs_id =
HgChangesetId::from_str("2d7d4ba9ce0a6ffd222de7785b249ead9c51c536").unwrap();
/*
Commit consists two files (name => content):
"2" => "2\n"
"files" => "1\n2\n"
*/
let cur_hg_cs_id =
HgChangesetId::from_str("3e0e761030db6e479a7fb58b12881883f9f8c63f").unwrap();
let stats = get_statistics_from_changeset(&ctx, &repo, &blobstore, &prev_hg_cs_id)
.await
.unwrap();
let (prev_manifest, cur_manifest) = try_join!(
get_manifest_from_changeset(&ctx, &repo, &prev_hg_cs_id),
get_manifest_from_changeset(&ctx, &repo, &cur_hg_cs_id),
)
.unwrap();
let new_stats = update_statistics(
&ctx,
&repo,
stats,
prev_manifest.diff(ctx.clone(), blobstore.clone(), cur_manifest.clone()),
)
.await
.unwrap();
// (num_files, total_file_size, num_lines)
assert_eq!(new_stats, RepoStatistics::new(3, 8, 4));
});
}
}