mononoke: add batch_derived() method

Summary:
A new method on BonsaiDerived trait that derives data for a batch of commits.
Default implementation just derives them in parallel, so it's not particularly
useful. However it might be overriden if a particular derived data has a more
efficinet way of deriving a batch of commits

Reviewed By: farnz

Differential Revision: D21039983

fbshipit-source-id: 3c6a7eaa682f5eaf6b8a768ca61d6f8a8f1258a7
This commit is contained in:
Stanislau Hlebik 2020-04-15 08:56:46 -07:00 committed by Facebook GitHub Bot
parent 06eaf3c226
commit d3ec8dd0f3
4 changed files with 73 additions and 0 deletions

View File

@ -27,6 +27,7 @@ stats = { git = "https://github.com/facebookexperimental/rust-shed.git", branch
time_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
tracing = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
async-trait = "0.1.29"
bytes = { version = "0.5", features = ["serde"] }
futures = { version = "0.3", features = ["async-await", "compat"] }
futures-old = { package = "futures", version = "0.1" }

View File

@ -28,6 +28,7 @@ futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", b
lock_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
ascii = "1.0"
async-trait = "0.1.29"
bytes = { version = "0.5", features = ["serde"] }
digest = "0.8"
futures = { version = "0.3", features = ["async-await", "compat"] }

View File

@ -896,6 +896,51 @@ mod test {
Ok(())
}
#[fbinit::compat_test]
async fn test_batch_derive(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let from_batch = {
let repo = linear::getrepo(fb).await;
let repo = repo.dangerous_override(|mut derived_data_config: DerivedDataConfig| {
derived_data_config
.derived_data_types
.insert(TestGenNum::NAME.to_string());
derived_data_config
});
let master_cs_id = resolve_cs_id(&ctx, &repo, "master").await?;
let cs_ids =
AncestorsNodeStream::new(ctx.clone(), &repo.get_changeset_fetcher(), master_cs_id)
.collect()
.compat()
.await?;
// Reverse them to derive parents before children
let cs_ids = cs_ids.clone().into_iter().rev().collect::<Vec<_>>();
let derived_batch = TestGenNum::batch_derive(&ctx, &repo, cs_ids).await?;
derived_batch
.get(&master_cs_id)
.unwrap_or_else(|| panic!("{} has not been derived", master_cs_id))
.clone()
};
let sequential = {
let repo = linear::getrepo(fb).await;
let repo = repo.dangerous_override(|mut derived_data_config: DerivedDataConfig| {
derived_data_config
.derived_data_types
.insert(TestGenNum::NAME.to_string());
derived_data_config
});
let master_cs_id = resolve_cs_id(&ctx, &repo, "master").await?;
TestGenNum::derive(ctx.clone(), repo.clone(), master_cs_id)
.compat()
.await?
};
assert_eq!(from_batch, sequential);
Ok(())
}
#[fbinit::test]
fn test_leases(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);

View File

@ -8,8 +8,10 @@
#![deny(warnings)]
use anyhow::Error;
use async_trait::async_trait;
use blobrepo::BlobRepo;
use context::CoreContext;
use futures::{compat::Future01CompatExt, stream, StreamExt, TryStreamExt};
use futures_ext::{BoxFuture, FutureExt};
use futures_old::Future;
use lock_ext::LockExt;
@ -41,6 +43,7 @@ pub enum DeriveError {
/// Trait for the data that can be derived from bonsai changeset.
/// Examples of that are hg changeset id, unodes root manifest id, git changeset ids etc
#[async_trait]
pub trait BonsaiDerived: Sized + 'static + Send + Sync + Clone {
/// Name of derived data
///
@ -128,6 +131,29 @@ pub trait BonsaiDerived: Sized + 'static + Send + Sync + Clone {
.map(|count| count == 0)
.boxify()
}
/// This method might be overridden by BonsaiDerived implementors if there's a more efficienta
/// way to derive a batch of commits
async fn batch_derive<'a, Iter>(
ctx: &CoreContext,
repo: &BlobRepo,
csids: Iter,
) -> Result<HashMap<ChangesetId, Self>, Error>
where
Iter: IntoIterator<Item = ChangesetId> + Send,
Iter::IntoIter: Send,
{
let iter = csids.into_iter();
stream::iter(iter.map(|cs_id| async move {
let derived = Self::derive(ctx.clone(), repo.clone(), cs_id)
.compat()
.await?;
Ok((cs_id, derived))
}))
.buffered(100)
.try_collect::<HashMap<_, _>>()
.await
}
}
/// After derived data was generated then it will be stored in BonsaiDerivedMapping, which is