added backfill_derived_data utility

Summary:
Utility to backfill derived data:
 - currently only support `unodes`

Reviewed By: krallin

Differential Revision: D16827897

fbshipit-source-id: aa0bb8ae4fe895233e3f11775b3432184afb37f9
This commit is contained in:
Pavel Aslanov 2019-08-27 07:18:23 -07:00 committed by Facebook Github Bot
parent 7f331ea1b0
commit 242741ab89
8 changed files with 352 additions and 106 deletions

View File

@ -29,7 +29,7 @@ pub use crate::repo_commit::ChangesetHandle;
pub use changeset_fetcher::ChangesetFetcher;
// TODO: This is exported for testing - is this the right place for it?
pub use crate::repo_commit::compute_changed_files;
pub use utils::UnittestOverride;
pub use utils::DangerousOverride;
pub mod internal {
pub use crate::utils::{IncompleteFilenodeInfo, IncompleteFilenodes};

View File

@ -4,7 +4,7 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use super::utils::{IncompleteFilenodeInfo, IncompleteFilenodes, UnittestOverride};
use super::utils::{DangerousOverride, IncompleteFilenodeInfo, IncompleteFilenodes};
use crate::bonsai_generation::{create_bonsai_changeset_object, save_bonsai_changeset_object};
use crate::derive_hg_manifest::derive_hg_manifest;
use crate::errors::*;
@ -2576,8 +2576,8 @@ where
.buffer_unordered(100)
}
impl UnittestOverride<Arc<dyn LeaseOps>> for BlobRepo {
fn unittest_override<F>(&self, modify: F) -> Self
impl DangerousOverride<Arc<dyn LeaseOps>> for BlobRepo {
fn dangerous_override<F>(&self, modify: F) -> Self
where
F: FnOnce(Arc<dyn LeaseOps>) -> Arc<dyn LeaseOps>,
{
@ -2588,3 +2588,22 @@ impl UnittestOverride<Arc<dyn LeaseOps>> for BlobRepo {
}
}
}
impl DangerousOverride<Arc<dyn Blobstore>> for BlobRepo {
fn dangerous_override<F>(&self, modify: F) -> Self
where
F: FnOnce(Arc<dyn Blobstore>) -> Arc<dyn Blobstore>,
{
let (blobstore, repoid) = RepoBlobstoreArgs::new_with_wrapped_inner_blobstore(
self.blobstore.clone(),
self.get_repoid(),
modify,
)
.into_blobrepo_parts();
BlobRepo {
repoid,
blobstore,
..self.clone()
}
}
}

View File

@ -105,10 +105,10 @@ impl IncompleteFilenodes {
/// Create new instance of implementing object with overridden field of spcecified type.
///
/// This trait only supposed to be used from unittests, when it is necessary to replace
/// some of the fields to better tests specific behaviour.
pub trait UnittestOverride<T> {
fn unittest_override<F>(&self, modify: F) -> Self
/// This override can be very dangerous, it should only be used in unittest, or if you
/// really know what you are doing.
pub trait DangerousOverride<T> {
fn dangerous_override<F>(&self, modify: F) -> Self
where
F: FnOnce(T) -> T;
}

View File

@ -4,87 +4,103 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use failure_ext::Error;
use futures::future::Either;
use futures::{Future, IntoFuture};
use futures_ext::{BoxFuture, FutureExt};
use context::CoreContext;
use mononoke_types::BlobstoreBytes;
use blobstore::Blobstore;
use memblob::EagerMemblob;
use context::CoreContext;
use failure_ext::Error;
use futures::{future, stream, Future, Stream};
use futures_ext::{BoxFuture, FutureExt};
use lock_ext::LockExt;
use mononoke_types::BlobstoreBytes;
use std::{
collections::HashMap,
mem,
sync::{Arc, Mutex},
};
#[derive(Clone, Debug)]
enum Cache {
Put(BlobstoreBytes),
Get(Option<BlobstoreBytes>),
}
/// A blobstore wrapper that reads from the underlying blobstore but writes to memory.
#[derive(Clone, Debug)]
pub struct MemWritesBlobstore<T: Blobstore + Clone> {
inner: T,
// TODO: replace with chashmap or evmap
memblob: EagerMemblob,
cache: Arc<Mutex<HashMap<String, Cache>>>,
}
impl<T: Blobstore + Clone> MemWritesBlobstore<T> {
pub fn new(blobstore: T) -> Self {
Self {
inner: blobstore,
memblob: EagerMemblob::new(),
cache: Default::default(),
}
}
/// Writre all in-memory entries to unerlying blobstore.
///
/// NOTE: In case of error all pending changes will be lost.
pub fn persist(&self, ctx: CoreContext) -> impl Future<Item = (), Error = Error> {
let items = self.cache.with(|cache| mem::replace(cache, HashMap::new()));
stream::iter_ok(items)
.filter_map(|(key, cache)| match cache {
Cache::Put(value) => Some((key, value)),
Cache::Get(_) => None,
})
.map({
let inner = self.inner.clone();
move |(key, value)| inner.put(ctx.clone(), key, value)
})
.buffered(4096)
.for_each(|_| Ok(()))
}
pub fn get_inner(&self) -> T {
self.inner.clone()
}
}
impl<T: Blobstore + Clone> Blobstore for MemWritesBlobstore<T> {
fn put(&self, ctx: CoreContext, key: String, value: BlobstoreBytes) -> BoxFuture<(), Error> {
// Don't write the key if it's already present.
self.is_present(ctx.clone(), key.clone())
.and_then({
let memblob = self.memblob.clone();
move |is_present| {
if is_present {
Either::A(Ok(()).into_future())
} else {
Either::B(memblob.put(ctx, key, value))
}
}
})
.boxify()
fn put(&self, _ctx: CoreContext, key: String, value: BlobstoreBytes) -> BoxFuture<(), Error> {
self.cache
.with(|cache| cache.insert(key, Cache::Put(value)));
future::ok(()).boxify()
}
fn get(&self, ctx: CoreContext, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
self.memblob
.get(ctx.clone(), key.clone())
.and_then({
let inner = self.inner.clone();
move |val| match val {
Some(val) => Either::A(Ok(Some(val)).into_future()),
None => Either::B(inner.get(ctx, key)),
}
})
.boxify()
match self.cache.with(|cache| cache.get(&key).cloned()) {
Some(cache) => {
let result = match cache {
Cache::Put(value) => Some(value),
Cache::Get(result) => result,
};
future::ok(result).boxify()
}
None => self
.inner
.get(ctx, key.clone())
.map({
let cache = self.cache.clone();
move |result| {
cache.with(|cache| cache.insert(key, Cache::Get(result.clone())));
result
}
})
.boxify(),
}
}
fn is_present(&self, ctx: CoreContext, key: String) -> BoxFuture<bool, Error> {
self.memblob
.is_present(ctx.clone(), key.clone())
.and_then({
let inner = self.inner.clone();
move |is_present| {
if is_present {
Either::A(Ok(true).into_future())
} else {
Either::B(inner.is_present(ctx, key))
}
}
})
.boxify()
self.get(ctx, key).map(|result| result.is_some()).boxify()
}
}
#[cfg(test)]
mod test {
use super::*;
use bytes::Bytes;
use memblob::EagerMemblob;
#[test]
fn basic_read() {
@ -161,53 +177,24 @@ mod test {
}
#[test]
fn present_in_inner() {
fn test_persist() -> Result<(), Error> {
let ctx = CoreContext::test_mock();
let mut rt = tokio::runtime::Runtime::new()?;
let inner = EagerMemblob::new();
let foo_key = "foo".to_string();
inner
.put(
ctx.clone(),
foo_key.clone(),
BlobstoreBytes::from_bytes("foobar"),
)
.wait()
.expect("initial put should work");
let outer = MemWritesBlobstore::new(inner.clone());
outer
.put(
ctx.clone(),
foo_key.clone(),
BlobstoreBytes::from_bytes("foobar"),
)
.wait()
.expect("put should work");
assert!(
outer
.is_present(ctx.clone(), foo_key.clone())
.wait()
.expect("is_present on outer should work"),
"foo should be present in outer",
);
let key = "key".to_string();
let value = BlobstoreBytes::from_bytes("value");
// Change the value in inner.
inner
.put(
ctx.clone(),
foo_key.clone(),
BlobstoreBytes::from_bytes("bazquux"),
)
.wait()
.expect("second put should work");
assert_eq!(
outer
.get(ctx, foo_key.clone())
.wait()
.expect("get to outer should work")
.expect("value should be present")
.into_bytes(),
Bytes::from("bazquux"),
);
rt.block_on(outer.put(ctx.clone(), key.clone(), value.clone()))?;
assert!(rt.block_on(inner.get(ctx.clone(), key.clone()))?.is_none());
rt.block_on(outer.persist(ctx.clone()))?;
assert_eq!(rt.block_on(inner.get(ctx.clone(), key))?, Some(value));
Ok(())
}
}

View File

@ -0,0 +1,234 @@
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
#![deny(warnings)]
#![feature(duration_float)]
use blobrepo::DangerousOverride;
use blobstore::Blobstore;
use cacheblob::{dummy::DummyLease, LeaseOps, MemWritesBlobstore};
use changesets::{ChangesetEntry, Changesets, SqlChangesets};
use clap::Arg;
use cloned::cloned;
use cmdlib::args;
use context::CoreContext;
use derive_unode_manifest::derived_data_unodes::{RootUnodeManifestId, RootUnodeManifestMapping};
use derived_data::{BonsaiDerived, BonsaiDerivedMapping};
use failure::{err_msg, format_err};
use failure_ext::Error;
use futures::{stream, Future, IntoFuture, Stream};
use futures_ext::{BoxFuture, FutureExt};
use futures_stats::Timed;
use lock_ext::LockExt;
use mononoke_types::{ChangesetId, MononokeId, RepositoryId};
use phases::SqlPhases;
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
time::Duration,
};
fn windows(start: u64, stop: u64, step: u64) -> impl Iterator<Item = (u64, u64)> {
(0..)
.map(move |index| (start + index * step, start + (index + 1) * step))
.take_while(move |(low, _high)| *low < stop)
.map(move |(low, high)| (low, std::cmp::min(stop, high)))
}
// This function is not optimal since it could be made faster by doing more processing
// on XDB side, but for the puprpose of this binary it is good enough
fn fetch_all_public_changesets(
ctx: CoreContext,
repo_id: RepositoryId,
changesets: Arc<SqlChangesets>,
phases: Arc<SqlPhases>,
) -> impl Stream<Item = ChangesetEntry, Error = Error> {
changesets
.get_changesets_ids_bounds(repo_id.clone())
.and_then(move |(start, stop)| {
let start = start.ok_or_else(|| err_msg("changesets table is empty"))?;
let stop = stop.ok_or_else(|| err_msg("changesets table is empty"))?;
let step = 65536;
Ok(stream::iter_ok(windows(start, stop, step)))
})
.flatten_stream()
.and_then(move |(lower_bound, upper_bound)| {
changesets
.get_list_bs_cs_id_in_range(repo_id, lower_bound, upper_bound)
.collect()
.and_then({
cloned!(ctx, changesets, phases);
move |ids| {
changesets
.get_many(ctx, repo_id, ids)
.and_then(move |mut entries| {
phases
.get_public_raw(
repo_id,
&entries.iter().map(|entry| entry.cs_id).collect(),
)
.map(move |public| {
entries.retain(|entry| public.contains(&entry.cs_id));
stream::iter_ok(entries)
})
})
}
})
})
.flatten()
}
const CHUNK_SIZE: usize = 4096;
type DeriveFn = Arc<dyn Fn(ChangesetId) -> BoxFuture<(), Error> + Send + Sync + 'static>;
type DerivePrefetchFn =
Arc<dyn Fn(Vec<ChangesetId>) -> BoxFuture<(), Error> + Send + Sync + 'static>;
fn main() -> Result<(), Error> {
let matches = args::MononokeApp {
safe_writes: false,
hide_advanced_args: true,
default_glog: false,
}
.build("Utility to back-fill bonsai derived data")
.version("0.0.0")
.about("Utility to back-fill bonsai derived data")
.arg(
Arg::with_name("DERIVED_DATA_TYPE")
.required(true)
.index(1)
.possible_values(&[RootUnodeManifestId::NAME])
.help("derived data type for which backfill will be run"),
)
.get_matches();
args::init_cachelib(&matches);
let mut runtime = tokio::runtime::Runtime::new()?;
let ctx = CoreContext::test_mock();
let logger = args::get_logger(&matches);
// Use `MemWritesBlobstore` to avoid blocking on writes to underlying blobstore.
// `::preserve` is later used to bulk write all pending data.
let mut memblobstore = None;
let repo = runtime
.block_on(args::open_repo(&logger, &matches))?
.dangerous_override(|_| Arc::new(DummyLease {}) as Arc<dyn LeaseOps>)
.dangerous_override(|blobstore| -> Arc<dyn Blobstore> {
let blobstore = Arc::new(MemWritesBlobstore::new(blobstore));
memblobstore = Some(blobstore.clone());
blobstore
});
let memblobstore = memblobstore.expect("memblobstore should have been updated");
let sql_changesets = Arc::new(runtime.block_on(args::open_sql::<SqlChangesets>(&matches))?);
let sql_phases = Arc::new(runtime.block_on(args::open_sql::<SqlPhases>(&matches))?);
let (derive, derive_prefetch): (DeriveFn, DerivePrefetchFn) = match matches
.value_of("DERIVED_DATA_TYPE")
{
Some(RootUnodeManifestId::NAME) => {
// TODO: we should probably add generic layer of caching on top of mapping
// which will store changesets one after another, to make it resilient
// to interrupts. Otherwise `MwmWriterBlobstore` can crate mapping entry
// before storing whole derived data.
let mapping = Arc::new(RootUnodeManifestMapping::new(repo.get_blobstore()));
let derive_unodes = {
cloned!(ctx, repo, mapping);
move |csid| {
RootUnodeManifestId::derive(ctx.clone(), repo.clone(), mapping.clone(), csid)
.map(|_| ())
.boxify()
}
};
let derive_prefetch_unodes = {
cloned!(ctx, mapping);
move |csids| mapping.get(ctx.clone(), csids).map(|_| ()).boxify()
};
(Arc::new(derive_unodes), Arc::new(derive_prefetch_unodes))
}
unsupported_type => {
return Err(format_err!(
"Unsupported derived data type: {:?}",
unsupported_type
));
}
};
println!("collecting all changest for: {:?}", repo.get_repoid());
runtime.block_on(
fetch_all_public_changesets(ctx.clone(), repo.get_repoid(), sql_changesets, sql_phases)
.collect()
.and_then(move |mut changesets| {
changesets.sort_by_key(|cs_entry| cs_entry.gen);
println!("starting deriving data for {} changesets", changesets.len());
let total_count = changesets.len();
let generated_count = Arc::new(AtomicUsize::new(0));
let total_duration = Arc::new(Mutex::new(Duration::from_secs(0)));
stream::iter_ok(changesets)
.map(|entry| entry.cs_id)
.chunks(CHUNK_SIZE)
.and_then({
let blobstore = repo.get_blobstore();
cloned!(ctx);
move |chunk| {
let changesets_prefetch = stream::iter_ok(chunk.clone())
.map({
cloned!(ctx, blobstore);
move |csid| blobstore.get(ctx.clone(), csid.blobstore_key())
})
.buffered(CHUNK_SIZE)
.for_each(|_| Ok(()));
(changesets_prefetch, derive_prefetch(chunk.clone()))
.into_future()
.map(move |_| chunk)
}
})
.for_each(move |chunk| {
let chunk_size = chunk.len();
stream::iter_ok(chunk)
.for_each({
cloned!(derive);
move |csid| derive(csid)
})
.and_then({
cloned!(ctx, memblobstore);
move |()| memblobstore.persist(ctx)
})
.timed({
cloned!(generated_count, total_duration);
move |stats, _| {
generated_count.fetch_add(chunk_size, Ordering::SeqCst);
let elapsed = total_duration.with(|total_duration| {
*total_duration += stats.completion_time;
*total_duration
});
let generated = generated_count.load(Ordering::SeqCst) as f32;
let total = total_count as f32;
println!(
"{}/{} estimate:{:.2?} speed:{:.2}/s mean_speed:{:.2}/s",
generated,
total_count,
elapsed.mul_f32((total - generated) / generated),
chunk_size as f32 / stats.completion_time.as_secs() as f32,
generated / elapsed.as_secs() as f32,
);
Ok(())
}
})
})
}),
)?;
Ok(())
}

View File

@ -259,7 +259,7 @@ impl<Derived: BonsaiDerived> DeriveNode<Derived> {
mod test {
use super::*;
use blobrepo::UnittestOverride;
use blobrepo::DangerousOverride;
use bookmarks::BookmarkName;
use cacheblob::LeaseOps;
use failure::err_msg;
@ -543,7 +543,8 @@ mod test {
let ctx = CoreContext::test_mock();
let mut runtime = Runtime::new()?;
let mapping = Arc::new(TestMapping::new());
let repo = linear::getrepo().unittest_override(|_| Arc::new(FailingLease));
let repo =
linear::getrepo().dangerous_override(|_| Arc::new(FailingLease) as Arc<dyn LeaseOps>);
let hg_csid = HgChangesetId::from_str("2d7d4ba9ce0a6ffd222de7785b249ead9c51c536")?;
let csid = runtime

View File

@ -18,6 +18,7 @@ use std::{
iter::FromIterator,
mem,
};
use tracing::{trace_args, Traced};
/// Information passed to `create_tree` function when tree node is constructed
pub struct TreeInfo<TreeId, LeafId> {
@ -113,7 +114,10 @@ where
parents: parents.into_iter().map(Entry::Tree).collect(),
},
// unfold, all merge logic happens in this unfold function
move |merge_node| merge_node.merge(ctx.clone(), blobstore.clone()),
{
let ctx = ctx.clone();
move |merge_node| merge_node.merge(ctx.clone(), blobstore.clone())
},
// fold, this function only creates entries from merge result and already merged subentries
move |merge_result, subentries| match merge_result {
MergeResult::Reuse { name, entry } => future::ok(Some((name, entry))).boxify(),
@ -158,6 +162,7 @@ where
},
)
.map(|result: Option<_>| result.and_then(|(_, entry)| entry.into_tree()))
.traced(&ctx.trace(), "derive_manifest", trace_args! {})
}
// Change is isomorphic to Option, but it makes it easier to understand merge logic

View File

@ -197,7 +197,7 @@ impl SqlPhases {
.map(move |rows| rows.into_iter().next().map(|row| row.0))
}
fn get_public_raw(
pub fn get_public_raw(
&self,
repo_id: RepositoryId,
csids: &Vec<ChangesetId>,