mononoke: use batch_derive method in derived data utils

Summary:
Previously backfill_batch_dangerous method was calling internal derive_impl() method
directly. That wasn't great (after all, we are calling a function whose name suggests it should only be called from inside derive data crate) and this diff changes it so that we call batch_derive() method instead.

This gives a few benefits:
1) We no longer call internal derive_impl function
2) It allows different types of derived data to override batching behaviour.
For example, we've already overriden it for fsnodes and next diff will override
it for blame as well.

To make it compatible with derive_impl() batch_derive() now accepts derive data mode and mapping

Reviewed By: krallin

Differential Revision: D22435044

fbshipit-source-id: a4d911606284676566583a94199195860ffe2ecf
This commit is contained in:
Stanislau Hlebik 2020-07-09 10:43:53 -07:00 committed by Facebook GitHub Bot
parent df2b9b9009
commit 361f4e98a7
5 changed files with 113 additions and 90 deletions

View File

@ -21,10 +21,12 @@ unodes = { path = "../unodes" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
async-trait = "0.1.29"
bytes = { version = "0.5", features = ["serde"] }
futures = { version = "0.3.5", features = ["async-await", "compat"] }
futures-old = { package = "futures", version = "0.1" }
thiserror = "1.0"
tokio = { version = "=0.2.13", features = ["full"] }
[dev-dependencies]
blobrepo_factory = { path = "../../blobrepo/factory" }

View File

@ -13,7 +13,7 @@ use blobrepo::BlobRepo;
use blobstore::{Blobstore, BlobstoreGetData};
use bytes::Bytes;
use context::CoreContext;
use derived_data::{BonsaiDerived, BonsaiDerivedMapping};
use derived_data::{BonsaiDerived, BonsaiDerivedMapping, DeriveError};
use futures::{
compat::Future01CompatExt, stream as new_stream, StreamExt as NewStreamExt, TryFutureExt,
TryStreamExt,
@ -95,20 +95,20 @@ impl BonsaiDerived for RootFsnodeId {
.boxify()
}
async fn batch_derive<'a, Iter>(
async fn batch_derive_impl<'a, Iter, M>(
ctx: &CoreContext,
repo: &BlobRepo,
csids: Iter,
) -> Result<HashMap<ChangesetId, Self>, Error>
mapping: &M,
) -> Result<HashMap<ChangesetId, Self>, DeriveError>
where
Iter: IntoIterator<Item = ChangesetId> + Send,
Iter::IntoIter: Send,
M: BonsaiDerivedMapping<Value = Self> + Send + Sync + Clone + 'static,
{
let csids = csids.into_iter().collect::<Vec<_>>();
let derived = derive_fsnode_in_batch(ctx, repo, csids.clone()).await?;
let mapping = Self::mapping(ctx, repo);
new_stream::iter(derived.into_iter().map(|(cs_id, derived)| {
let mapping = mapping.clone();
async move {
@ -122,6 +122,7 @@ impl BonsaiDerived for RootFsnodeId {
}))
.buffered(100)
.try_collect::<HashMap<_, _>>()
.map_err(DeriveError::Error)
.await
}
}

View File

@ -5,7 +5,7 @@
* GNU General Public License version 2.
*/
use crate::{BonsaiDerived, BonsaiDerivedMapping, DeriveError, Mode};
use crate::{BonsaiDerived, BonsaiDerivedMapping, DeriveError};
use anyhow::Error;
use blobrepo::BlobRepo;
use blobstore::Loadable;
@ -79,12 +79,11 @@ pub fn derive_impl<
repo: BlobRepo,
derived_mapping: Mapping,
start_csid: ChangesetId,
mode: Mode,
) -> impl Future<Item = Derived, Error = DeriveError> {
async move {
let derivation = async {
let all_csids =
find_topo_sorted_underived(&ctx, &repo, &derived_mapping, &start_csid, None, mode)
find_topo_sorted_underived(&ctx, &repo, &derived_mapping, &start_csid, None)
.await?;
for csid in &all_csids {
@ -149,18 +148,6 @@ pub fn derive_impl<
.compat()
}
fn fail_if_disabled<Derived: BonsaiDerived>(repo: &BlobRepo) -> Result<(), DeriveError> {
if !repo
.get_derived_data_config()
.derived_data_types
.contains(Derived::NAME)
{
STATS::derived_data_disabled.add_value(1, (repo.get_repoid().id(), Derived::NAME));
return Err(DeriveError::Disabled(Derived::NAME, repo.get_repoid()));
}
Ok(())
}
pub(crate) async fn find_topo_sorted_underived<
Derived: BonsaiDerived,
Mapping: BonsaiDerivedMapping<Value = Derived> + Send + Sync + Clone + 'static,
@ -170,12 +157,7 @@ pub(crate) async fn find_topo_sorted_underived<
derived_mapping: &Mapping,
start_csid: &ChangesetId,
limit: Option<u64>,
mode: Mode,
) -> Result<Vec<ChangesetId>, Error> {
if mode == Mode::OnlyIfEnabled {
fail_if_disabled::<Derived>(repo)?;
}
let changeset_fetcher = repo.get_changeset_fetcher();
// This is necessary to avoid visiting the same commit a lot of times in mergy repos
let visited: Arc<Mutex<HashSet<ChangesetId>>> = Arc::new(Mutex::new(HashSet::new()));
@ -521,6 +503,7 @@ impl<Derived: BonsaiDerived> DeriveNode<Derived> {
mod test {
use super::*;
use crate::Mode;
use anyhow::Error;
use blobrepo_hg::BlobRepoHg;
use blobrepo_override::DangerousOverride;
@ -829,7 +812,10 @@ mod test {
.await?;
// Reverse them to derive parents before children
let cs_ids = cs_ids.clone().into_iter().rev().collect::<Vec<_>>();
let derived_batch = TestGenNum::batch_derive(&ctx, &repo, cs_ids).await?;
let mapping = TestGenNum::mapping(&ctx, &repo);
let derived_batch =
TestGenNum::batch_derive(&ctx, &repo, cs_ids, &mapping, Mode::OnlyIfEnabled)
.await?;
derived_batch
.get(&master_cs_id)
.unwrap_or_else(|| panic!("{} has not been derived", master_cs_id))

View File

@ -11,17 +11,18 @@ use anyhow::Error;
use async_trait::async_trait;
use blobrepo::BlobRepo;
use context::CoreContext;
use futures::{compat::Future01CompatExt, stream, StreamExt, TryStreamExt};
use futures_ext::{BoxFuture, FutureExt as OldFutureExt};
use futures::{compat::Future01CompatExt, stream, StreamExt, TryFutureExt, TryStreamExt};
use futures_ext::{try_boxfuture, BoxFuture, FutureExt as OldFutureExt};
use lock_ext::LockExt;
use mononoke_types::{BonsaiChangeset, ChangesetId, RepositoryId};
use stats::prelude::*;
use std::{
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
};
use thiserror::Error;
pub mod derive_impl;
mod derive_impl;
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum Mode {
@ -76,15 +77,12 @@ pub trait BonsaiDerived: Sized + 'static + Send + Sync + Clone {
///
/// This function fails immediately if this type of derived data is not enabled for this repo.
fn derive(ctx: CoreContext, repo: BlobRepo, csid: ChangesetId) -> BoxFuture<Self, DeriveError> {
try_boxfuture!(Self::check_if_derivation_allowed(
&repo,
Mode::OnlyIfEnabled
));
let mapping = Self::mapping(&ctx, &repo);
derive_impl::derive_impl::<Self, Self::Mapping>(
ctx,
repo,
mapping,
csid,
Mode::OnlyIfEnabled,
)
.boxify()
derive_impl::derive_impl::<Self, Self::Mapping>(ctx, repo, mapping, csid).boxify()
}
/// Derives derived data even if it's disabled in the config. Should normally
@ -95,8 +93,9 @@ pub trait BonsaiDerived: Sized + 'static + Send + Sync + Clone {
csid: ChangesetId,
mode: Mode,
) -> BoxFuture<Self, DeriveError> {
try_boxfuture!(Self::check_if_derivation_allowed(&repo, mode));
let mapping = Self::mapping(&ctx, &repo);
derive_impl::derive_impl::<Self, Self::Mapping>(ctx, repo, mapping, csid, mode).boxify()
derive_impl::derive_impl::<Self, Self::Mapping>(ctx, repo, mapping, csid).boxify()
}
/// Returns min(number of ancestors of `csid` to be derived, `limit`)
@ -109,13 +108,13 @@ pub trait BonsaiDerived: Sized + 'static + Send + Sync + Clone {
limit: u64,
) -> Result<u64, DeriveError> {
let mapping = Self::mapping(&ctx, &repo);
Self::check_if_derivation_allowed(repo, Mode::OnlyIfEnabled)?;
let underived = derive_impl::find_topo_sorted_underived::<Self, Self::Mapping>(
ctx,
repo,
&mapping,
csid,
Some(limit),
Mode::OnlyIfEnabled,
)
.await?;
Ok(underived.len() as u64)
@ -127,13 +126,9 @@ pub trait BonsaiDerived: Sized + 'static + Send + Sync + Clone {
csid: &ChangesetId,
) -> Result<Vec<ChangesetId>, DeriveError> {
let mapping = Self::mapping(&ctx, &repo);
Self::check_if_derivation_allowed(repo, Mode::OnlyIfEnabled)?;
let underived = derive_impl::find_topo_sorted_underived::<Self, Self::Mapping>(
ctx,
repo,
&mapping,
csid,
None,
Mode::OnlyIfEnabled,
ctx, repo, &mapping, csid, None,
)
.await?;
Ok(underived)
@ -148,26 +143,62 @@ pub trait BonsaiDerived: Sized + 'static + Send + Sync + Clone {
Ok(count == 0)
}
fn check_if_derivation_allowed(repo: &BlobRepo, mode: Mode) -> Result<(), DeriveError> {
if mode == Mode::OnlyIfEnabled {
if !repo
.get_derived_data_config()
.derived_data_types
.contains(Self::NAME)
{
derive_impl::STATS::derived_data_disabled
.add_value(1, (repo.get_repoid().id(), Self::NAME));
return Err(DeriveError::Disabled(Self::NAME, repo.get_repoid()));
}
}
Ok(())
}
/// This method might be overridden by BonsaiDerived implementors if there's a more efficienta
/// way to derive a batch of commits
async fn batch_derive<'a, Iter>(
async fn batch_derive<'a, Iter, M>(
ctx: &CoreContext,
repo: &BlobRepo,
csids: Iter,
) -> Result<HashMap<ChangesetId, Self>, Error>
mapping: &M,
mode: Mode,
) -> Result<HashMap<ChangesetId, Self>, DeriveError>
where
Iter: IntoIterator<Item = ChangesetId> + Send,
Iter::IntoIter: Send,
M: BonsaiDerivedMapping<Value = Self> + Send + Sync + Clone + 'static,
{
Self::check_if_derivation_allowed(repo, mode)?;
Self::batch_derive_impl(ctx, repo, csids, mapping).await
}
async fn batch_derive_impl<'a, Iter, M>(
ctx: &CoreContext,
repo: &BlobRepo,
csids: Iter,
mapping: &M,
) -> Result<HashMap<ChangesetId, Self>, DeriveError>
where
Iter: IntoIterator<Item = ChangesetId> + Send,
Iter::IntoIter: Send,
M: BonsaiDerivedMapping<Value = Self> + Send + Sync + Clone + 'static,
{
let iter = csids.into_iter();
stream::iter(iter.map(|cs_id| async move {
let derived = Self::derive(ctx.clone(), repo.clone(), cs_id)
.compat()
.await?;
let derived =
derive_impl::derive_impl(ctx.clone(), repo.clone(), mapping.clone(), cs_id)
.compat()
.await?;
Ok((cs_id, derived))
}))
.buffered(100)
.try_collect::<HashMap<_, _>>()
.map_err(DeriveError::Error)
.await
}
}

View File

@ -5,7 +5,9 @@
* GNU General Public License version 2.
*/
use anyhow::{format_err, Error};
#![type_length_limit = "4522397"]
use anyhow::{anyhow, format_err, Error};
use async_trait::async_trait;
use blame::{BlameRoot, BlameRootMapping};
use blobrepo::BlobRepo;
@ -17,8 +19,7 @@ use cloned::cloned;
use context::CoreContext;
use deleted_files_manifest::{RootDeletedManifestId, RootDeletedManifestMapping};
use derived_data::{
derive_impl::derive_impl, BonsaiDerived, BonsaiDerivedMapping, DeriveError, Mode as DeriveMode,
RegenerateMapping,
BonsaiDerived, BonsaiDerivedMapping, DeriveError, Mode as DeriveMode, RegenerateMapping,
};
use derived_data_filenodes::{FilenodesOnlyPublic, FilenodesOnlyPublicMapping};
use fastlog::{RootFastlog, RootFastlogMapping};
@ -27,7 +28,7 @@ use futures::{
compat::Future01CompatExt,
future::ready,
stream::{self, futures_unordered::FuturesUnordered},
Future, StreamExt, TryStreamExt,
Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt,
};
use futures_ext::{BoxFuture, FutureExt as OldFutureExt};
use futures_old::{future, stream as stream_old, Future as OldFuture, Stream};
@ -150,14 +151,23 @@ where
repo: BlobRepo,
csid: ChangesetId,
) -> BoxFuture<String, Error> {
// We call derive_impl directly so that we can pass
// We call batch_derive so that we can pass
// `self.mapping` there. This will allow us to
// e.g. regenerate derived data for the commit
// even if it was already generated (see RegenerateMapping call).
derive_impl::<M::Value, _>(ctx, repo, self.mapping.clone(), csid, self.mode)
.map(|result| format!("{:?}", result))
.from_err()
.boxify()
let mode = self.mode;
let mapping = self.mapping.clone();
async move {
let res = M::Value::batch_derive(&ctx, &repo, vec![csid], &mapping, mode).await?;
let val = res
.get(&csid)
.ok_or(anyhow!("internal derived data error"))?;
Ok(format!("{:?}", val))
}
.boxed()
.compat()
.boxify()
}
/// !!!!This function is dangerous and should be used with care!!!!
@ -194,37 +204,30 @@ where
});
let memblobstore = memblobstore.expect("memblobstore should have been updated");
stream_old::iter_ok(csids)
.for_each({
cloned!(ctx, in_memory_mapping, repo);
move |csid| {
// create new context so each derivation would have its own trace
let ctx = CoreContext::new_with_logger(ctx.fb, ctx.logger().clone());
derive_impl::<M::Value, _>(
ctx.clone(),
repo.clone(),
in_memory_mapping.clone(),
csid,
DeriveMode::Unsafe,
)
.map(|_| ())
.from_err()
}
})
.and_then({
cloned!(ctx, memblobstore);
move |_| memblobstore.persist(ctx)
})
.and_then(move |_| {
let buffer = in_memory_mapping.into_buffer();
let buffer = buffer.lock().unwrap();
let mut futs = vec![];
for (cs_id, value) in buffer.iter() {
futs.push(orig_mapping.put(ctx.clone(), *cs_id, value.clone()));
}
stream_old::futures_unordered(futs).for_each(|_| Ok(()))
})
.boxify()
{
cloned!(ctx, repo, in_memory_mapping);
async move {
M::Value::batch_derive(&ctx, &repo, csids, &in_memory_mapping, DeriveMode::Unsafe)
.await
}
}
.boxed()
.compat()
.from_err()
.and_then({
cloned!(ctx, memblobstore);
move |_| memblobstore.persist(ctx)
})
.and_then(move |_| {
let buffer = in_memory_mapping.into_buffer();
let buffer = buffer.lock().unwrap();
let mut futs = vec![];
for (cs_id, value) in buffer.iter() {
futs.push(orig_mapping.put(ctx.clone(), *cs_id, value.clone()));
}
stream_old::futures_unordered(futs).for_each(|_| Ok(()))
})
.boxify()
}
fn pending(