mirror of
https://github.com/facebook/sapling.git
synced 2024-10-09 00:14:35 +03:00
skeleton manifest: switch to manager-based derivation
Summary:
Same as D30974102 (91c4748c5b
) but for skeleton manifest.
Needed some changes regarding using `DerivationContext` instead of `DerivedDataManager`.
Reviewed By: StanislavGlebik
Differential Revision: D31111484
fbshipit-source-id: eacc1d3247dffac4537745ec2a2071ef0abcbd43
This commit is contained in:
parent
803c704617
commit
4dccacc11b
@ -24,7 +24,6 @@ itertools = "0.10.1"
|
||||
lock_ext = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
metaconfig_types = { version = "0.1.0", path = "../metaconfig/types" }
|
||||
mononoke_types = { version = "0.1.0", path = "../mononoke_types" }
|
||||
repo_blobstore = { version = "0.1.0", path = "../blobrepo/repo_blobstore" }
|
||||
repo_derived_data = { version = "0.1.0", path = "../repo_attributes/repo_derived_data" }
|
||||
scuba_ext = { version = "0.1.0", path = "../common/scuba_ext" }
|
||||
slog = { version = "2.5", features = ["max_level_trace", "nested-values"] }
|
||||
|
@ -13,7 +13,6 @@ path = "lib.rs"
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
async-trait = "0.1.51"
|
||||
blobrepo = { version = "0.1.0", path = "../../blobrepo" }
|
||||
blobstore = { version = "0.1.0", path = "../../blobstore" }
|
||||
borrowed = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
bytes = { version = "1.0", features = ["serde"] }
|
||||
@ -24,13 +23,12 @@ derived_data_manager = { version = "0.1.0", path = "../manager" }
|
||||
futures = { version = "0.3.13", features = ["async-await", "compat"] }
|
||||
manifest = { version = "0.1.0", path = "../../manifest" }
|
||||
mononoke_types = { version = "0.1.0", path = "../../mononoke_types" }
|
||||
repo_blobstore = { version = "0.1.0", path = "../../blobrepo/repo_blobstore" }
|
||||
repo_derived_data = { version = "0.1.0", path = "../../repo_attributes/repo_derived_data" }
|
||||
sorted_vector_map = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
thiserror = "1.0.29"
|
||||
tokio = { version = "1.10", features = ["full", "test-util", "tracing"] }
|
||||
|
||||
[dev-dependencies]
|
||||
blobrepo = { version = "0.1.0", path = "../../blobrepo" }
|
||||
blobrepo_hg = { version = "0.1.0", path = "../../blobrepo/blobrepo_hg" }
|
||||
bookmarks = { version = "0.1.0", path = "../../bookmarks" }
|
||||
derived_data_test_utils = { version = "0.1.0", path = "../test_utils" }
|
||||
@ -39,6 +37,7 @@ fbinit-tokio = { version = "0.1.0", git = "https://github.com/facebookexperiment
|
||||
fixtures = { version = "0.1.0", path = "../../tests/fixtures" }
|
||||
mercurial_types = { version = "0.1.0", path = "../../mercurial/types" }
|
||||
pretty_assertions = "0.6"
|
||||
repo_derived_data = { version = "0.1.0", path = "../../repo_attributes/repo_derived_data" }
|
||||
revset = { version = "0.1.0", path = "../../revset" }
|
||||
test_repo_factory = { version = "0.1.0", path = "../../repo_factory/test_repo_factory" }
|
||||
tests_utils = { version = "0.1.0", path = "../../tests/utils" }
|
||||
|
@ -8,15 +8,13 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use anyhow::Error;
|
||||
use blobrepo::BlobRepo;
|
||||
use borrowed::borrowed;
|
||||
use cloned::cloned;
|
||||
use context::CoreContext;
|
||||
use derived_data::batch::{split_batch_in_linear_stacks, FileConflicts};
|
||||
use derived_data::{derive_impl, BonsaiDerivedMappingContainer};
|
||||
use derived_data_manager::DerivationContext;
|
||||
use futures::stream::{FuturesOrdered, TryStreamExt};
|
||||
use mononoke_types::{ChangesetId, SkeletonManifestId};
|
||||
use repo_derived_data::RepoDerivedDataRef;
|
||||
use mononoke_types::ChangesetId;
|
||||
|
||||
use crate::derive::derive_skeleton_manifest;
|
||||
use crate::RootSkeletonManifestId;
|
||||
@ -29,20 +27,18 @@ use crate::RootSkeletonManifestId;
|
||||
/// more details.
|
||||
pub async fn derive_skeleton_manifests_in_batch(
|
||||
ctx: &CoreContext,
|
||||
repo: &BlobRepo,
|
||||
mapping: &BonsaiDerivedMappingContainer<RootSkeletonManifestId>,
|
||||
derivation_ctx: &DerivationContext,
|
||||
batch: Vec<ChangesetId>,
|
||||
gap_size: Option<usize>,
|
||||
) -> Result<HashMap<ChangesetId, SkeletonManifestId>, Error> {
|
||||
let manager = repo.repo_derived_data().manager();
|
||||
) -> Result<HashMap<ChangesetId, RootSkeletonManifestId>, Error> {
|
||||
let linear_stacks = split_batch_in_linear_stacks(
|
||||
ctx,
|
||||
manager.repo_blobstore(),
|
||||
derivation_ctx.blobstore(),
|
||||
batch,
|
||||
FileConflicts::ChangeDelete,
|
||||
)
|
||||
.await?;
|
||||
let mut res = HashMap::new();
|
||||
let mut res: HashMap<ChangesetId, RootSkeletonManifestId> = HashMap::new();
|
||||
for linear_stack in linear_stacks {
|
||||
// Fetch the parent skeleton manifests, either from a previous
|
||||
// iteration of this loop (which will have stored the mapping in
|
||||
@ -54,14 +50,13 @@ pub async fn derive_skeleton_manifests_in_batch(
|
||||
.map(|p| {
|
||||
borrowed!(res);
|
||||
async move {
|
||||
match res.get(&p) {
|
||||
Some(sk_mf_id) => Ok::<_, Error>(*sk_mf_id),
|
||||
None => Ok(derive_impl::derive_impl::<RootSkeletonManifestId>(
|
||||
ctx, repo, mapping, p,
|
||||
)
|
||||
.await?
|
||||
.into_skeleton_manifest_id()),
|
||||
}
|
||||
anyhow::Result::<_>::Ok(
|
||||
match res.get(&p) {
|
||||
Some(sk_mf_id) => sk_mf_id.clone(),
|
||||
None => derivation_ctx.fetch_dependency(ctx, p).await?,
|
||||
}
|
||||
.into_skeleton_manifest_id(),
|
||||
)
|
||||
}
|
||||
})
|
||||
.collect::<FuturesOrdered<_>>()
|
||||
@ -83,20 +78,20 @@ pub async fn derive_skeleton_manifests_in_batch(
|
||||
// Clone the values that we need owned copies of to move
|
||||
// into the future we are going to spawn, which means it
|
||||
// must have static lifetime.
|
||||
cloned!(ctx, manager, parent_skeleton_manifests);
|
||||
cloned!(ctx, derivation_ctx, parent_skeleton_manifests);
|
||||
async move {
|
||||
let cs_id = item.cs_id;
|
||||
let derivation_fut = async move {
|
||||
derive_skeleton_manifest(
|
||||
&ctx,
|
||||
&manager,
|
||||
&derivation_ctx,
|
||||
parent_skeleton_manifests,
|
||||
item.combined_file_changes.into_iter().collect(),
|
||||
)
|
||||
.await
|
||||
};
|
||||
let derivation_handle = tokio::spawn(derivation_fut);
|
||||
let sk_mf_id: SkeletonManifestId = derivation_handle.await??;
|
||||
let sk_mf_id = RootSkeletonManifestId(derivation_handle.await??);
|
||||
Result::<_, Error>::Ok((cs_id, sk_mf_id))
|
||||
}
|
||||
})
|
||||
@ -113,12 +108,12 @@ pub async fn derive_skeleton_manifests_in_batch(
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use derived_data::BonsaiDerivedOld;
|
||||
use derived_data_manager::BatchDeriveOptions;
|
||||
use fbinit::FacebookInit;
|
||||
use fixtures::linear;
|
||||
use futures::compat::Stream01CompatExt;
|
||||
use repo_derived_data::RepoDerivedDataRef;
|
||||
use revset::AncestorsNodeStream;
|
||||
use std::sync::Arc;
|
||||
use tests_utils::resolve_cs_id;
|
||||
|
||||
#[fbinit::test]
|
||||
@ -128,27 +123,34 @@ mod test {
|
||||
let repo = linear::getrepo(fb).await;
|
||||
let master_cs_id = resolve_cs_id(&ctx, &repo, "master").await?;
|
||||
|
||||
let mapping = BonsaiDerivedMappingContainer::new(
|
||||
ctx.fb,
|
||||
repo.name(),
|
||||
repo.get_derived_data_config().scuba_table.as_deref(),
|
||||
Arc::new(RootSkeletonManifestId::default_mapping(&ctx, &repo)?),
|
||||
);
|
||||
let mut cs_ids =
|
||||
AncestorsNodeStream::new(ctx.clone(), &repo.get_changeset_fetcher(), master_cs_id)
|
||||
.compat()
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
cs_ids.reverse();
|
||||
let sk_mf_ids =
|
||||
derive_skeleton_manifests_in_batch(&ctx, &repo, &mapping, cs_ids, None).await?;
|
||||
sk_mf_ids.get(&master_cs_id).unwrap().clone()
|
||||
let manager = repo.repo_derived_data().manager();
|
||||
manager
|
||||
.backfill_batch::<RootSkeletonManifestId>(
|
||||
&ctx,
|
||||
cs_ids,
|
||||
BatchDeriveOptions::Parallel { gap_size: None },
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
manager
|
||||
.fetch_derived::<RootSkeletonManifestId>(&ctx, master_cs_id, None)
|
||||
.await?
|
||||
.unwrap()
|
||||
.into_skeleton_manifest_id()
|
||||
};
|
||||
|
||||
let sequential = {
|
||||
let repo = linear::getrepo(fb).await;
|
||||
let master_cs_id = resolve_cs_id(&ctx, &repo, "master").await?;
|
||||
RootSkeletonManifestId::derive(&ctx, &repo, master_cs_id)
|
||||
repo.repo_derived_data()
|
||||
.manager()
|
||||
.derive::<RootSkeletonManifestId>(&ctx, master_cs_id, None)
|
||||
.await?
|
||||
.into_skeleton_manifest_id()
|
||||
};
|
||||
|
@ -6,13 +6,14 @@
|
||||
*/
|
||||
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{format_err, Context, Error, Result};
|
||||
use blobstore::{Blobstore, Loadable};
|
||||
use borrowed::borrowed;
|
||||
use cloned::cloned;
|
||||
use context::CoreContext;
|
||||
use derived_data_manager::DerivedDataManager;
|
||||
use derived_data_manager::DerivationContext;
|
||||
use futures::channel::mpsc;
|
||||
use futures::future::{BoxFuture, FutureExt};
|
||||
use futures::stream::{FuturesOrdered, FuturesUnordered, TryStreamExt};
|
||||
@ -23,7 +24,6 @@ use mononoke_types::skeleton_manifest::{
|
||||
use mononoke_types::{
|
||||
BlobstoreValue, ContentId, FileType, MPath, MPathElement, MononokeId, SkeletonManifestId,
|
||||
};
|
||||
use repo_blobstore::RepoBlobstore;
|
||||
use sorted_vector_map::SortedVectorMap;
|
||||
|
||||
use crate::SkeletonManifestDerivationError;
|
||||
@ -34,11 +34,11 @@ use crate::SkeletonManifestDerivationError;
|
||||
/// single fsnode, and check that the leaf entries are valid during merges.
|
||||
pub(crate) async fn derive_skeleton_manifest(
|
||||
ctx: &CoreContext,
|
||||
manager: &DerivedDataManager,
|
||||
derivation_ctx: &DerivationContext,
|
||||
parents: Vec<SkeletonManifestId>,
|
||||
changes: Vec<(MPath, Option<(ContentId, FileType)>)>,
|
||||
) -> Result<SkeletonManifestId> {
|
||||
let blobstore = manager.repo_blobstore();
|
||||
let blobstore = derivation_ctx.blobstore();
|
||||
|
||||
// We must box and store the derivation future, otherwise lifetime
|
||||
// analysis is unable to see that the blobstore lasts long enough.
|
||||
@ -81,7 +81,7 @@ pub(crate) async fn derive_skeleton_manifest(
|
||||
/// from the parent skeleton manifests to avoid fetching too much.
|
||||
async fn collect_skeleton_subentries(
|
||||
ctx: &CoreContext,
|
||||
blobstore: &RepoBlobstore,
|
||||
blobstore: &Arc<dyn Blobstore>,
|
||||
parents: &[SkeletonManifestId],
|
||||
subentries: BTreeMap<
|
||||
MPathElement,
|
||||
@ -171,7 +171,7 @@ async fn collect_skeleton_subentries(
|
||||
/// Create a new skeleton manifest for the tree described by `tree_info`.
|
||||
async fn create_skeleton_manifest(
|
||||
ctx: &CoreContext,
|
||||
blobstore: &RepoBlobstore,
|
||||
blobstore: &Arc<dyn Blobstore>,
|
||||
sender: Option<mpsc::UnboundedSender<BoxFuture<'static, Result<(), Error>>>>,
|
||||
tree_info: TreeInfo<SkeletonManifestId, (), Option<SkeletonManifestSummary>>,
|
||||
) -> Result<(Option<SkeletonManifestSummary>, SkeletonManifestId)> {
|
||||
@ -382,11 +382,12 @@ mod test {
|
||||
async fn test_skeleton_manifests(fb: FacebookInit) -> Result<()> {
|
||||
let ctx = CoreContext::test_mock(fb);
|
||||
let (repo, changesets) = init_repo(&ctx).await?;
|
||||
let manager = repo.repo_derived_data().manager();
|
||||
let derivation_ctx = repo.repo_derived_data().manager().derivation_context(None);
|
||||
|
||||
let a_bcs = changesets["A"].load(&ctx, repo.blobstore()).await?;
|
||||
let a_skeleton_id =
|
||||
derive_skeleton_manifest(&ctx, manager, vec![], get_file_changes(&a_bcs)).await?;
|
||||
derive_skeleton_manifest(&ctx, &derivation_ctx, vec![], get_file_changes(&a_bcs))
|
||||
.await?;
|
||||
let a_skeleton = a_skeleton_id.load(&ctx, repo.blobstore()).await?;
|
||||
assert_eq!(
|
||||
a_skeleton.lookup(&MPathElement::new(b"A".to_vec())?),
|
||||
@ -405,9 +406,13 @@ mod test {
|
||||
|
||||
// Changeset B introduces some subdirectories
|
||||
let b_bcs = changesets["B"].load(&ctx, repo.blobstore()).await?;
|
||||
let b_skeleton_id =
|
||||
derive_skeleton_manifest(&ctx, manager, vec![a_skeleton_id], get_file_changes(&b_bcs))
|
||||
.await?;
|
||||
let b_skeleton_id = derive_skeleton_manifest(
|
||||
&ctx,
|
||||
&derivation_ctx,
|
||||
vec![a_skeleton_id],
|
||||
get_file_changes(&b_bcs),
|
||||
)
|
||||
.await?;
|
||||
let b_skeleton = b_skeleton_id.load(&ctx, repo.blobstore()).await?;
|
||||
assert_eq!(
|
||||
b_skeleton.summary(),
|
||||
@ -442,9 +447,13 @@ mod test {
|
||||
|
||||
// Changeset C introduces some case conflicts
|
||||
let c_bcs = changesets["C"].load(&ctx, repo.blobstore()).await?;
|
||||
let c_skeleton_id =
|
||||
derive_skeleton_manifest(&ctx, manager, vec![b_skeleton_id], get_file_changes(&c_bcs))
|
||||
.await?;
|
||||
let c_skeleton_id = derive_skeleton_manifest(
|
||||
&ctx,
|
||||
&derivation_ctx,
|
||||
vec![b_skeleton_id],
|
||||
get_file_changes(&c_bcs),
|
||||
)
|
||||
.await?;
|
||||
let c_skeleton = c_skeleton_id.load(&ctx, repo.blobstore()).await?;
|
||||
assert_eq!(
|
||||
c_skeleton.summary(),
|
||||
@ -511,9 +520,13 @@ mod test {
|
||||
|
||||
// Changeset D removes some of the conflicts
|
||||
let d_bcs = changesets["D"].load(&ctx, repo.blobstore()).await?;
|
||||
let d_skeleton_id =
|
||||
derive_skeleton_manifest(&ctx, manager, vec![c_skeleton_id], get_file_changes(&d_bcs))
|
||||
.await?;
|
||||
let d_skeleton_id = derive_skeleton_manifest(
|
||||
&ctx,
|
||||
&derivation_ctx,
|
||||
vec![c_skeleton_id],
|
||||
get_file_changes(&d_bcs),
|
||||
)
|
||||
.await?;
|
||||
let d_skeleton = d_skeleton_id.load(&ctx, repo.blobstore()).await?;
|
||||
assert_eq!(d_skeleton.summary().child_case_conflicts, false);
|
||||
assert_eq!(d_skeleton.summary().descendant_case_conflicts, true);
|
||||
@ -550,9 +563,13 @@ mod test {
|
||||
|
||||
// Changeset E removes them all
|
||||
let e_bcs = changesets["E"].load(&ctx, repo.blobstore()).await?;
|
||||
let e_skeleton_id =
|
||||
derive_skeleton_manifest(&ctx, manager, vec![d_skeleton_id], get_file_changes(&e_bcs))
|
||||
.await?;
|
||||
let e_skeleton_id = derive_skeleton_manifest(
|
||||
&ctx,
|
||||
&derivation_ctx,
|
||||
vec![d_skeleton_id],
|
||||
get_file_changes(&e_bcs),
|
||||
)
|
||||
.await?;
|
||||
let e_skeleton = e_skeleton_id.load(&ctx, repo.blobstore()).await?;
|
||||
assert_eq!(
|
||||
e_skeleton.summary(),
|
||||
@ -569,9 +586,13 @@ mod test {
|
||||
|
||||
// Changeset F adds a non-UTF-8 filename
|
||||
let f_bcs = changesets["F"].load(&ctx, repo.blobstore()).await?;
|
||||
let f_skeleton_id =
|
||||
derive_skeleton_manifest(&ctx, manager, vec![e_skeleton_id], get_file_changes(&f_bcs))
|
||||
.await?;
|
||||
let f_skeleton_id = derive_skeleton_manifest(
|
||||
&ctx,
|
||||
&derivation_ctx,
|
||||
vec![e_skeleton_id],
|
||||
get_file_changes(&f_bcs),
|
||||
)
|
||||
.await?;
|
||||
let f_skeleton = f_skeleton_id.load(&ctx, repo.blobstore()).await?;
|
||||
assert_eq!(
|
||||
f_skeleton.summary(),
|
||||
@ -602,9 +623,13 @@ mod test {
|
||||
|
||||
// Changeset G adds some files that are not valid on Windows
|
||||
let g_bcs = changesets["G"].load(&ctx, repo.blobstore()).await?;
|
||||
let g_skeleton_id =
|
||||
derive_skeleton_manifest(&ctx, manager, vec![f_skeleton_id], get_file_changes(&g_bcs))
|
||||
.await?;
|
||||
let g_skeleton_id = derive_skeleton_manifest(
|
||||
&ctx,
|
||||
&derivation_ctx,
|
||||
vec![f_skeleton_id],
|
||||
get_file_changes(&g_bcs),
|
||||
)
|
||||
.await?;
|
||||
let g_skeleton = g_skeleton_id.load(&ctx, repo.blobstore()).await?;
|
||||
assert_eq!(
|
||||
g_skeleton.summary(),
|
||||
@ -650,9 +675,13 @@ mod test {
|
||||
// Changeset H introduces a new case conflict on top of the ones
|
||||
// already in changeset C.
|
||||
let h_bcs = changesets["H"].load(&ctx, repo.blobstore()).await?;
|
||||
let h_skeleton_id =
|
||||
derive_skeleton_manifest(&ctx, manager, vec![c_skeleton_id], get_file_changes(&h_bcs))
|
||||
.await?;
|
||||
let h_skeleton_id = derive_skeleton_manifest(
|
||||
&ctx,
|
||||
&derivation_ctx,
|
||||
vec![c_skeleton_id],
|
||||
get_file_changes(&h_bcs),
|
||||
)
|
||||
.await?;
|
||||
let h_skeleton = h_skeleton_id.load(&ctx, repo.blobstore()).await?;
|
||||
assert_eq!(
|
||||
h_skeleton.summary(),
|
||||
@ -690,7 +719,8 @@ mod test {
|
||||
// Changeset J has an internal case conflict.
|
||||
let j_bcs = changesets["J"].load(&ctx, repo.blobstore()).await?;
|
||||
let j_skeleton_id =
|
||||
derive_skeleton_manifest(&ctx, manager, vec![], get_file_changes(&j_bcs)).await?;
|
||||
derive_skeleton_manifest(&ctx, &derivation_ctx, vec![], get_file_changes(&j_bcs))
|
||||
.await?;
|
||||
let j_skeleton = j_skeleton_id.load(&ctx, repo.blobstore()).await?;
|
||||
assert_eq!(
|
||||
j_skeleton
|
||||
@ -704,7 +734,7 @@ mod test {
|
||||
let k_bcs = changesets["K"].load(&ctx, repo.blobstore()).await?;
|
||||
let k_skeleton_id = derive_skeleton_manifest(
|
||||
&ctx,
|
||||
manager,
|
||||
&derivation_ctx,
|
||||
vec![h_skeleton_id, j_skeleton_id],
|
||||
get_file_changes(&k_bcs),
|
||||
)
|
||||
@ -724,7 +754,8 @@ mod test {
|
||||
// Changeset L has no case conflicts.
|
||||
let l_bcs = changesets["L"].load(&ctx, repo.blobstore()).await?;
|
||||
let l_skeleton_id =
|
||||
derive_skeleton_manifest(&ctx, manager, vec![], get_file_changes(&l_bcs)).await?;
|
||||
derive_skeleton_manifest(&ctx, &derivation_ctx, vec![], get_file_changes(&l_bcs))
|
||||
.await?;
|
||||
let l_skeleton = l_skeleton_id.load(&ctx, repo.blobstore()).await?;
|
||||
assert_eq!(
|
||||
l_skeleton
|
||||
@ -738,7 +769,7 @@ mod test {
|
||||
let m_bcs = changesets["M"].load(&ctx, repo.blobstore()).await?;
|
||||
let m_skeleton_id = derive_skeleton_manifest(
|
||||
&ctx,
|
||||
manager,
|
||||
&derivation_ctx,
|
||||
vec![h_skeleton_id, l_skeleton_id],
|
||||
get_file_changes(&m_bcs),
|
||||
)
|
||||
|
@ -14,7 +14,7 @@ mod batch;
|
||||
mod derive;
|
||||
mod mapping;
|
||||
|
||||
pub use mapping::{RootSkeletonManifestId, RootSkeletonManifestMapping};
|
||||
pub use mapping::RootSkeletonManifestId;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum SkeletonManifestDerivationError {
|
||||
|
@ -7,29 +7,23 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{Error, Result};
|
||||
use async_trait::async_trait;
|
||||
use blobrepo::BlobRepo;
|
||||
use blobstore::{Blobstore, BlobstoreGetData};
|
||||
use bytes::Bytes;
|
||||
use context::CoreContext;
|
||||
use derived_data::{
|
||||
impl_bonsai_derived_mapping, BlobstoreRootIdMapping, BonsaiDerivable,
|
||||
BonsaiDerivedMappingContainer, DerivedDataTypesConfig,
|
||||
};
|
||||
use futures::stream::{self, StreamExt, TryStreamExt};
|
||||
use derived_data::impl_bonsai_derived_via_manager;
|
||||
use derived_data_manager::{dependencies, BonsaiDerivable, DerivationContext};
|
||||
use mononoke_types::{
|
||||
BlobstoreBytes, BonsaiChangeset, ChangesetId, ContentId, FileType, MPath, SkeletonManifestId,
|
||||
};
|
||||
use repo_derived_data::RepoDerivedDataRef;
|
||||
|
||||
use crate::batch::derive_skeleton_manifests_in_batch;
|
||||
use crate::derive::derive_skeleton_manifest;
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||
pub struct RootSkeletonManifestId(SkeletonManifestId);
|
||||
pub struct RootSkeletonManifestId(pub(crate) SkeletonManifestId);
|
||||
|
||||
impl RootSkeletonManifestId {
|
||||
pub fn skeleton_manifest_id(&self) -> &SkeletonManifestId {
|
||||
@ -64,23 +58,25 @@ impl From<RootSkeletonManifestId> for BlobstoreBytes {
|
||||
}
|
||||
}
|
||||
|
||||
fn format_key(changeset_id: ChangesetId) -> String {
|
||||
format!("derived_root_skeletonmanifest.{}", changeset_id)
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl BonsaiDerivable for RootSkeletonManifestId {
|
||||
const NAME: &'static str = "skeleton_manifests";
|
||||
|
||||
type Options = ();
|
||||
type Dependencies = dependencies![];
|
||||
|
||||
async fn derive_from_parents_impl(
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
async fn derive_single(
|
||||
ctx: &CoreContext,
|
||||
derivation_ctx: &DerivationContext,
|
||||
bonsai: BonsaiChangeset,
|
||||
parents: Vec<Self>,
|
||||
_options: &Self::Options,
|
||||
) -> Result<Self, Error> {
|
||||
let manager = repo.repo_derived_data().manager();
|
||||
let skeleton_manifest_id = derive_skeleton_manifest(
|
||||
&ctx,
|
||||
manager,
|
||||
let id = derive_skeleton_manifest(
|
||||
ctx,
|
||||
derivation_ctx,
|
||||
parents
|
||||
.into_iter()
|
||||
.map(RootSkeletonManifestId::into_skeleton_manifest_id)
|
||||
@ -88,59 +84,50 @@ impl BonsaiDerivable for RootSkeletonManifestId {
|
||||
get_file_changes(&bonsai),
|
||||
)
|
||||
.await?;
|
||||
Ok(RootSkeletonManifestId(skeleton_manifest_id))
|
||||
Ok(RootSkeletonManifestId(id))
|
||||
}
|
||||
|
||||
async fn batch_derive_impl(
|
||||
async fn derive_batch(
|
||||
ctx: &CoreContext,
|
||||
repo: &BlobRepo,
|
||||
csids: Vec<ChangesetId>,
|
||||
mapping: &BonsaiDerivedMappingContainer<Self>,
|
||||
derivation_ctx: &DerivationContext,
|
||||
bonsais: Vec<BonsaiChangeset>,
|
||||
gap_size: Option<usize>,
|
||||
) -> Result<HashMap<ChangesetId, Self>, Error> {
|
||||
let derived =
|
||||
derive_skeleton_manifests_in_batch(ctx, repo, mapping, csids.clone(), gap_size).await?;
|
||||
|
||||
stream::iter(derived.into_iter().map(|(cs_id, derived)| async move {
|
||||
let derived = RootSkeletonManifestId(derived);
|
||||
mapping.put(ctx, cs_id, &derived).await?;
|
||||
Ok((cs_id, derived))
|
||||
}))
|
||||
.buffered(100)
|
||||
.try_collect::<HashMap<_, _>>()
|
||||
) -> Result<HashMap<ChangesetId, Self>> {
|
||||
derive_skeleton_manifests_in_batch(
|
||||
ctx,
|
||||
derivation_ctx,
|
||||
bonsais.into_iter().map(|b| b.get_changeset_id()).collect(),
|
||||
gap_size,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RootSkeletonManifestMapping {
|
||||
blobstore: Arc<dyn Blobstore>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl BlobstoreRootIdMapping for RootSkeletonManifestMapping {
|
||||
type Value = RootSkeletonManifestId;
|
||||
|
||||
fn new(blobstore: Arc<dyn Blobstore>, _config: &DerivedDataTypesConfig) -> Result<Self> {
|
||||
Ok(Self { blobstore })
|
||||
async fn store_mapping(
|
||||
self,
|
||||
ctx: &CoreContext,
|
||||
derivation_ctx: &DerivationContext,
|
||||
changeset_id: ChangesetId,
|
||||
) -> Result<()> {
|
||||
let key = format_key(changeset_id);
|
||||
derivation_ctx.blobstore().put(ctx, key, self.into()).await
|
||||
}
|
||||
|
||||
fn blobstore(&self) -> &dyn Blobstore {
|
||||
&self.blobstore
|
||||
async fn fetch(
|
||||
ctx: &CoreContext,
|
||||
derivation_ctx: &DerivationContext,
|
||||
changeset_id: ChangesetId,
|
||||
) -> Result<Option<Self>> {
|
||||
let key = format_key(changeset_id);
|
||||
Ok(derivation_ctx
|
||||
.blobstore()
|
||||
.get(ctx, &key)
|
||||
.await?
|
||||
.map(TryInto::try_into)
|
||||
.transpose()?)
|
||||
}
|
||||
|
||||
fn prefix(&self) -> &'static str {
|
||||
"derived_root_skeletonmanifest."
|
||||
}
|
||||
|
||||
fn options(&self) {}
|
||||
}
|
||||
|
||||
impl_bonsai_derived_mapping!(
|
||||
RootSkeletonManifestMapping,
|
||||
BlobstoreRootIdMapping,
|
||||
RootSkeletonManifestId
|
||||
);
|
||||
impl_bonsai_derived_via_manager!(RootSkeletonManifestId);
|
||||
|
||||
pub(crate) fn get_file_changes(
|
||||
bcs: &BonsaiChangeset,
|
||||
@ -160,11 +147,11 @@ pub(crate) fn get_file_changes(
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use blobrepo::BlobRepo;
|
||||
use blobrepo_hg::BlobRepoHg;
|
||||
use blobstore::Loadable;
|
||||
use bookmarks::BookmarkName;
|
||||
use borrowed::borrowed;
|
||||
use derived_data::BonsaiDerived;
|
||||
use derived_data_test_utils::iterate_all_manifest_entries;
|
||||
use fbinit::FacebookInit;
|
||||
use fixtures::{
|
||||
@ -178,6 +165,7 @@ mod test {
|
||||
use manifest::Entry;
|
||||
use mercurial_types::{HgChangesetId, HgManifestId};
|
||||
use mononoke_types::ChangesetId;
|
||||
use repo_derived_data::RepoDerivedDataRef;
|
||||
use revset::AncestorsNodeStream;
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
@ -195,7 +183,9 @@ mod test {
|
||||
bcs_id: ChangesetId,
|
||||
hg_cs_id: HgChangesetId,
|
||||
) -> Result<()> {
|
||||
let root_skeleton_manifest_id = RootSkeletonManifestId::derive(ctx, repo, bcs_id)
|
||||
let manager = repo.repo_derived_data().manager();
|
||||
let root_skeleton_manifest_id = manager
|
||||
.derive::<RootSkeletonManifestId>(ctx, bcs_id, None)
|
||||
.await?
|
||||
.into_skeleton_manifest_id();
|
||||
|
||||
|
@ -9,13 +9,12 @@ use std::cmp::Ordering as CmpOrdering;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use anyhow::Error;
|
||||
use blobstore::Loadable;
|
||||
use blobstore::{Blobstore, Loadable};
|
||||
use context::CoreContext;
|
||||
use futures::stream::{self, StreamExt, TryStreamExt};
|
||||
use itertools::Itertools;
|
||||
use mononoke_types::{BonsaiChangeset, FileChange, MPath};
|
||||
use mononoke_types::{ChangesetId, ContentId, FileType};
|
||||
use repo_blobstore::RepoBlobstore;
|
||||
|
||||
pub type FileToContent = BTreeMap<MPath, Option<(ContentId, FileType)>>;
|
||||
|
||||
@ -30,7 +29,7 @@ pub enum FileConflicts {
|
||||
|
||||
pub async fn split_batch_in_linear_stacks(
|
||||
ctx: &CoreContext,
|
||||
blobstore: &RepoBlobstore,
|
||||
blobstore: &impl Blobstore,
|
||||
batch: Vec<ChangesetId>,
|
||||
file_conflicts: FileConflicts,
|
||||
) -> Result<Vec<LinearStack>, Error> {
|
||||
|
@ -47,7 +47,7 @@ use mononoke_types::ChangesetId;
|
||||
use repo_blobstore::RepoBlobstoreRef;
|
||||
use repo_derived_data::RepoDerivedDataRef;
|
||||
use scuba_ext::MononokeScubaSampleBuilder;
|
||||
use skeleton_manifest::{RootSkeletonManifestId, RootSkeletonManifestMapping};
|
||||
use skeleton_manifest::RootSkeletonManifestId;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
hash::{Hash, Hasher},
|
||||
@ -729,14 +729,9 @@ fn derived_data_utils_impl(
|
||||
repo.clone(),
|
||||
)))
|
||||
}
|
||||
RootSkeletonManifestId::NAME => {
|
||||
let mapping = RootSkeletonManifestMapping::new(blobstore, config)?;
|
||||
Ok(Arc::new(DerivedUtilsFromMapping::new(
|
||||
fb,
|
||||
mapping,
|
||||
repo.clone(),
|
||||
)))
|
||||
}
|
||||
RootSkeletonManifestId::NAME => Ok(Arc::new(DerivedUtilsFromManager::<
|
||||
RootSkeletonManifestId,
|
||||
>::new(repo, config))),
|
||||
TreeHandle::NAME => {
|
||||
let mapping = TreeMapping::new(blobstore, config);
|
||||
Ok(Arc::new(DerivedUtilsFromMapping::new(
|
||||
|
@ -46,17 +46,7 @@ enable some more derived data types for normal usage and backfilling
|
||||
start the tailer with tailing and backfilling some different types
|
||||
normally the tailer runs forever, but for this test we will make it
|
||||
stop when it becomes idle.
|
||||
$ backfill_derived_data backfill-all --parallel --batch-size=10 --gap-size=3 2>&1 | grep -v count:8
|
||||
*] enabled stdlog with level: Error (set RUST_LOG to configure) (glob)
|
||||
*] Initializing tunables: * (glob)
|
||||
*] using repo "repo" repoid RepositoryId(0) (glob)
|
||||
*] derived data types: {*} (glob)
|
||||
*] Deriving 1 heads (glob)
|
||||
*] found changesets: 8 * (glob)
|
||||
*] deriving data 32 (glob)
|
||||
*] backfill unodes batch from 9feb8ddd3e8eddcfa3a4913b57df7842bedf84b8ea3b7b3fcb14c6424aa81fec to 8ea58cff262ad56732037fb42189d6262dacdaf8032c18ddebcb6b5b310d1298 (glob)
|
||||
*] backfill blame batch from 9feb8ddd3e8eddcfa3a4913b57df7842bedf84b8ea3b7b3fcb14c6424aa81fec to 8ea58cff262ad56732037fb42189d6262dacdaf8032c18ddebcb6b5b310d1298 (glob)
|
||||
*] derive blame batch at 9feb8ddd3e8eddcfa3a4913b57df7842bedf84b8ea3b7b3fcb14c6424aa81fec (stack of 8 from batch of 8) (glob)
|
||||
$ backfill_derived_data backfill-all --parallel --batch-size=10 --gap-size=3 &>/dev/null
|
||||
|
||||
Heads should all be derived
|
||||
$ mononoke_admin --log-level ERROR derived-data exists fsnodes main
|
||||
|
Loading…
Reference in New Issue
Block a user