filenodes: switch to manager-based derivation

Summary: Same as D30974102 (91c4748c5b) but for mercurial filenodes.

Reviewed By: markbt

Differential Revision: D31170597

fbshipit-source-id: fda62e251f9eb0e1b6b4aa950d93560b1ff81f67
This commit is contained in:
Yan Soares Couto 2021-10-05 06:25:28 -07:00 committed by Facebook GitHub Bot
parent ddba827364
commit 075a4a1148
10 changed files with 267 additions and 290 deletions

View File

@ -23,7 +23,6 @@ use changeset_info::ChangesetInfo;
use cloned::cloned; use cloned::cloned;
use context::{CoreContext, SessionClass}; use context::{CoreContext, SessionClass};
use deleted_files_manifest::RootDeletedManifestId; use deleted_files_manifest::RootDeletedManifestId;
use derived_data::BonsaiDerivable;
use derived_data_filenodes::FilenodesOnlyPublic; use derived_data_filenodes::FilenodesOnlyPublic;
use derived_data_manager::BonsaiDerivable as NewBonsaiDerivable; use derived_data_manager::BonsaiDerivable as NewBonsaiDerivable;
use fastlog::RootFastlog; use fastlog::RootFastlog;

View File

@ -18,8 +18,8 @@ use cmdlib::{
helpers::block_execute, helpers::block_execute,
}; };
use context::{CoreContext, SessionContainer}; use context::{CoreContext, SessionContainer};
use derived_data::BonsaiDerivable;
use derived_data_filenodes::FilenodesOnlyPublic; use derived_data_filenodes::FilenodesOnlyPublic;
use derived_data_manager::BonsaiDerivable;
use derived_data_utils::POSSIBLE_DERIVED_TYPES; use derived_data_utils::POSSIBLE_DERIVED_TYPES;
use failure_ext::SlogKVError; use failure_ext::SlogKVError;
use fbinit::FacebookInit; use fbinit::FacebookInit;

View File

@ -6,49 +6,59 @@
*/ */
use crate::mapping::{FilenodesOnlyPublic, PreparedRootFilenode}; use crate::mapping::{FilenodesOnlyPublic, PreparedRootFilenode};
use anyhow::{format_err, Error}; use anyhow::{format_err, Result};
use blobrepo::BlobRepo;
use blobrepo_hg::BlobRepoHg;
use blobstore::Loadable; use blobstore::Loadable;
use context::CoreContext; use context::CoreContext;
use derived_data_manager::DerivationContext;
use filenodes::{FilenodeInfo, FilenodeResult, PreparedFilenode}; use filenodes::{FilenodeInfo, FilenodeResult, PreparedFilenode};
use futures::{ use futures::{
future::try_join_all, pin_mut, stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join_all, pin_mut, stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt,
}; };
use futures_util::try_join;
use itertools::{Either, Itertools}; use itertools::{Either, Itertools};
use manifest::{find_intersection_of_diffs_and_parents, Entry}; use manifest::{find_intersection_of_diffs_and_parents, Entry};
use mercurial_derived_data::MappedHgChangesetId;
use mercurial_types::{ use mercurial_types::{
blobs::File, fetch_manifest_envelope, HgChangesetId, HgFileEnvelope, HgFileNodeId, blobs::File, fetch_manifest_envelope, HgChangesetId, HgFileEnvelope, HgFileNodeId,
HgManifestEnvelope, HgManifestId, HgManifestEnvelope,
}; };
use mononoke_types::{BonsaiChangeset, ChangesetId, MPath, RepoPath}; use mononoke_types::{BonsaiChangeset, ChangesetId, MPath, RepoPath};
pub async fn derive_filenodes( pub async fn derive_filenodes(
ctx: &CoreContext, ctx: &CoreContext,
repo: &BlobRepo, derivation_ctx: &DerivationContext,
bcs: BonsaiChangeset, bcs: BonsaiChangeset,
) -> Result<FilenodesOnlyPublic, Error> { ) -> Result<FilenodesOnlyPublic> {
let (_, public_filenode, non_roots) = prepare_filenodes_for_cs(ctx, repo, bcs).await?; if tunables::tunables().get_filenodes_disabled() {
return Ok(FilenodesOnlyPublic::Disabled);
}
let (_, public_filenode, non_roots) =
prepare_filenodes_for_cs(ctx, derivation_ctx, bcs).await?;
if !non_roots.is_empty() { if !non_roots.is_empty() {
if let FilenodeResult::Disabled = repo.get_filenodes().add_filenodes(ctx, non_roots).await? if let FilenodeResult::Disabled = derivation_ctx
.filenodes()
.add_filenodes(ctx, non_roots)
.await?
{ {
return Ok(FilenodesOnlyPublic::Disabled); return Ok(FilenodesOnlyPublic::Disabled);
} }
} }
// In case it got updated while deriving
if tunables::tunables().get_filenodes_disabled() {
return Ok(FilenodesOnlyPublic::Disabled);
}
Ok(public_filenode) Ok(public_filenode)
} }
pub async fn derive_filenodes_in_batch( pub async fn derive_filenodes_in_batch(
ctx: &CoreContext, ctx: &CoreContext,
repo: &BlobRepo, derivation_ctx: &DerivationContext,
batch: Vec<BonsaiChangeset>, batch: Vec<BonsaiChangeset>,
) -> Result<Vec<(ChangesetId, FilenodesOnlyPublic, Vec<PreparedFilenode>)>, Error> { ) -> Result<Vec<(ChangesetId, FilenodesOnlyPublic, Vec<PreparedFilenode>)>> {
stream::iter( stream::iter(
batch batch
.clone() .clone()
.into_iter() .into_iter()
.map(|bcs| async move { prepare_filenodes_for_cs(ctx, repo, bcs).await }), .map(|bcs| async move { prepare_filenodes_for_cs(ctx, derivation_ctx, bcs).await }),
) )
.buffered(100) .buffered(100)
.try_collect() .try_collect()
@ -57,10 +67,10 @@ pub async fn derive_filenodes_in_batch(
pub async fn prepare_filenodes_for_cs( pub async fn prepare_filenodes_for_cs(
ctx: &CoreContext, ctx: &CoreContext,
repo: &BlobRepo, derivation_ctx: &DerivationContext,
bcs: BonsaiChangeset, bcs: BonsaiChangeset,
) -> Result<(ChangesetId, FilenodesOnlyPublic, Vec<PreparedFilenode>), Error> { ) -> Result<(ChangesetId, FilenodesOnlyPublic, Vec<PreparedFilenode>)> {
let filenodes = generate_all_filenodes(ctx, repo, &bcs).await?; let filenodes = generate_all_filenodes(ctx, derivation_ctx, &bcs).await?;
if filenodes.is_empty() { if filenodes.is_empty() {
// This commit didn't create any new filenodes, and it's root manifest is the // This commit didn't create any new filenodes, and it's root manifest is the
// same as one of the parents (that can happen if this commit is empty). // same as one of the parents (that can happen if this commit is empty).
@ -98,21 +108,30 @@ pub async fn prepare_filenodes_for_cs(
pub async fn generate_all_filenodes( pub async fn generate_all_filenodes(
ctx: &CoreContext, ctx: &CoreContext,
repo: &BlobRepo, derivation_ctx: &DerivationContext,
bcs: &BonsaiChangeset, bcs: &BonsaiChangeset,
) -> Result<Vec<PreparedFilenode>, Error> { ) -> Result<Vec<PreparedFilenode>> {
let blobstore = repo.blobstore(); let blobstore = derivation_ctx.blobstore();
let root_mf = fetch_root_manifest_id(&ctx, bcs.get_changeset_id(), &repo); let hg_id = derivation_ctx
.derive_dependency::<MappedHgChangesetId>(ctx, bcs.get_changeset_id())
.await?
.0;
let root_mf = hg_id.load(ctx, &blobstore).await?.manifestid();
// Bonsai might have > 2 parents, while mercurial supports at most 2. // Bonsai might have > 2 parents, while mercurial supports at most 2.
// That's fine for us - we just won't generate filenodes for paths that came from // That's fine for us - we just won't generate filenodes for paths that came from
// stepparents. That means that linknode for these filenodes will point to a stepparent // stepparents. That means that linknode for these filenodes will point to a stepparent
let parents = try_join_all( let parents = try_join_all(
bcs.parents() derivation_ctx
.map(|p| fetch_root_manifest_id(&ctx, p, &repo)), .fetch_parents::<MappedHgChangesetId>(ctx, &bcs)
); .await?
let linknode = repo.get_hg_from_bonsai_changeset(ctx.clone(), bcs.get_changeset_id()); .into_iter()
.map(
|id| async move { Result::<_>::Ok(id.0.load(ctx, &blobstore).await?.manifestid()) },
),
)
.await?;
let linknode = hg_id;
let (root_mf, parents, linknode) = try_join!(root_mf, parents, linknode)?;
(async_stream::stream! { (async_stream::stream! {
let s = find_intersection_of_diffs_and_parents( let s = find_intersection_of_diffs_and_parents(
ctx.clone(), ctx.clone(),
@ -214,7 +233,7 @@ fn create_file_filenode(
path: Option<MPath>, path: Option<MPath>,
envelope: HgFileEnvelope, envelope: HgFileEnvelope,
linknode: HgChangesetId, linknode: HgChangesetId,
) -> Result<PreparedFilenode, Error> { ) -> Result<PreparedFilenode> {
let path = match path { let path = match path {
Some(path) => RepoPath::FilePath(path), Some(path) => RepoPath::FilePath(path),
None => { None => {
@ -238,27 +257,15 @@ fn create_file_filenode(
}) })
} }
async fn fetch_root_manifest_id(
ctx: &CoreContext,
cs_id: ChangesetId,
repo: &BlobRepo,
) -> Result<HgManifestId, Error> {
let hg_cs_id = repo
.get_hg_from_bonsai_changeset(ctx.clone(), cs_id)
.await?;
let hg_cs = hg_cs_id.load(ctx, repo.blobstore()).await?;
Ok(hg_cs.manifestid())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
use blobrepo::BlobRepo;
use blobrepo_hg::BlobRepoHg;
use cloned::cloned; use cloned::cloned;
use derived_data::{ use derived_data_manager::BatchDeriveOptions;
BonsaiDerivable, BonsaiDerivedMapping, BonsaiDerivedMappingContainer, BonsaiDerivedOld,
};
use fbinit::FacebookInit; use fbinit::FacebookInit;
use filenodes::{FilenodeRangeResult, Filenodes}; use filenodes::{FilenodeRangeResult, Filenodes};
use fixtures::linear; use fixtures::linear;
@ -266,6 +273,7 @@ mod tests {
use manifest::ManifestOps; use manifest::ManifestOps;
use maplit::hashmap; use maplit::hashmap;
use mononoke_types::FileType; use mononoke_types::FileType;
use repo_derived_data::RepoDerivedDataRef;
use revset::AncestorsNodeStream; use revset::AncestorsNodeStream;
use slog::info; use slog::info;
use std::{ use std::{
@ -282,9 +290,14 @@ mod tests {
repo: &BlobRepo, repo: &BlobRepo,
cs_id: ChangesetId, cs_id: ChangesetId,
expected_paths: Vec<RepoPath>, expected_paths: Vec<RepoPath>,
) -> Result<(), Error> { ) -> Result<()> {
let bonsai = cs_id.load(ctx, repo.blobstore()).await?; let bonsai = cs_id.load(ctx, repo.blobstore()).await?;
let filenodes = generate_all_filenodes(&ctx, &repo, &bonsai).await?; let filenodes = generate_all_filenodes(
&ctx,
&repo.repo_derived_data().manager().derivation_context(None),
&bonsai,
)
.await?;
assert_eq!(filenodes.len(), expected_paths.len()); assert_eq!(filenodes.len(), expected_paths.len());
for path in expected_paths { for path in expected_paths {
@ -306,7 +319,7 @@ mod tests {
Ok(()) Ok(())
} }
async fn test_generate_filenodes_simple(fb: FacebookInit) -> Result<(), Error> { async fn test_generate_filenodes_simple(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb); let ctx = CoreContext::test_mock(fb);
let repo: BlobRepo = test_repo_factory::build_empty()?; let repo: BlobRepo = test_repo_factory::build_empty()?;
let filename = "path"; let filename = "path";
@ -328,12 +341,12 @@ mod tests {
} }
#[fbinit::test] #[fbinit::test]
fn generate_filenodes_simple(fb: FacebookInit) -> Result<(), Error> { fn generate_filenodes_simple(fb: FacebookInit) -> Result<()> {
let runtime = tokio::runtime::Runtime::new()?; let runtime = tokio::runtime::Runtime::new()?;
runtime.block_on(test_generate_filenodes_simple(fb)) runtime.block_on(test_generate_filenodes_simple(fb))
} }
async fn test_generate_filenodes_merge(fb: FacebookInit) -> Result<(), Error> { async fn test_generate_filenodes_merge(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb); let ctx = CoreContext::test_mock(fb);
let repo: BlobRepo = test_repo_factory::build_empty()?; let repo: BlobRepo = test_repo_factory::build_empty()?;
let first_p1 = CreateCommitContext::new_root(&ctx, &repo) let first_p1 = CreateCommitContext::new_root(&ctx, &repo)
@ -357,12 +370,12 @@ mod tests {
} }
#[fbinit::test] #[fbinit::test]
fn generate_filenodes_merge(fb: FacebookInit) -> Result<(), Error> { fn generate_filenodes_merge(fb: FacebookInit) -> Result<()> {
let runtime = tokio::runtime::Runtime::new()?; let runtime = tokio::runtime::Runtime::new()?;
runtime.block_on(test_generate_filenodes_merge(fb)) runtime.block_on(test_generate_filenodes_merge(fb))
} }
async fn test_generate_type_change(fb: FacebookInit) -> Result<(), Error> { async fn test_generate_type_change(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb); let ctx = CoreContext::test_mock(fb);
let repo: BlobRepo = test_repo_factory::build_empty()?; let repo: BlobRepo = test_repo_factory::build_empty()?;
let parent = CreateCommitContext::new_root(&ctx, &repo) let parent = CreateCommitContext::new_root(&ctx, &repo)
@ -382,12 +395,12 @@ mod tests {
} }
#[fbinit::test] #[fbinit::test]
fn generate_filenodes_type_change(fb: FacebookInit) -> Result<(), Error> { fn generate_filenodes_type_change(fb: FacebookInit) -> Result<()> {
let runtime = tokio::runtime::Runtime::new()?; let runtime = tokio::runtime::Runtime::new()?;
runtime.block_on(test_generate_type_change(fb)) runtime.block_on(test_generate_type_change(fb))
} }
async fn test_many_parents(fb: FacebookInit) -> Result<(), Error> { async fn test_many_parents(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb); let ctx = CoreContext::test_mock(fb);
let repo: BlobRepo = test_repo_factory::build_empty()?; let repo: BlobRepo = test_repo_factory::build_empty()?;
let p1 = CreateCommitContext::new_root(&ctx, &repo) let p1 = CreateCommitContext::new_root(&ctx, &repo)
@ -423,12 +436,12 @@ mod tests {
} }
#[fbinit::test] #[fbinit::test]
fn many_parents(fb: FacebookInit) -> Result<(), Error> { fn many_parents(fb: FacebookInit) -> Result<()> {
let runtime = tokio::runtime::Runtime::new()?; let runtime = tokio::runtime::Runtime::new()?;
runtime.block_on(test_many_parents(fb)) runtime.block_on(test_many_parents(fb))
} }
async fn test_derive_empty_commits(fb: FacebookInit) -> Result<(), Error> { async fn test_derive_empty_commits(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb); let ctx = CoreContext::test_mock(fb);
let repo: BlobRepo = test_repo_factory::build_empty()?; let repo: BlobRepo = test_repo_factory::build_empty()?;
let parent_empty = CreateCommitContext::new_root(&ctx, &repo).commit().await?; let parent_empty = CreateCommitContext::new_root(&ctx, &repo).commit().await?;
@ -438,11 +451,15 @@ mod tests {
.commit() .commit()
.await?; .await?;
FilenodesOnlyPublic::derive(&ctx, &repo, child_empty).await?; let manager = repo.repo_derived_data().manager();
manager
.derive::<FilenodesOnlyPublic>(&ctx, child_empty, None)
.await?;
// Make sure they are in the mapping // Make sure they are in the mapping
let maps = FilenodesOnlyPublic::default_mapping(&ctx, &repo)? let maps = manager
.get(&ctx, vec![parent_empty, child_empty]) .fetch_derived_batch::<FilenodesOnlyPublic>(&ctx, vec![parent_empty, child_empty], None)
.await?; .await?;
assert_eq!(maps.len(), 2); assert_eq!(maps.len(), 2);
@ -450,12 +467,12 @@ mod tests {
} }
#[fbinit::test] #[fbinit::test]
fn derive_empty_commits(fb: FacebookInit) -> Result<(), Error> { fn derive_empty_commits(fb: FacebookInit) -> Result<()> {
let runtime = tokio::runtime::Runtime::new()?; let runtime = tokio::runtime::Runtime::new()?;
runtime.block_on(test_derive_empty_commits(fb)) runtime.block_on(test_derive_empty_commits(fb))
} }
async fn test_derive_only_empty_commits(fb: FacebookInit) -> Result<(), Error> { async fn test_derive_only_empty_commits(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb); let ctx = CoreContext::test_mock(fb);
let repo: BlobRepo = test_repo_factory::build_empty()?; let repo: BlobRepo = test_repo_factory::build_empty()?;
@ -464,49 +481,61 @@ mod tests {
.commit() .commit()
.await?; .await?;
let mapping = FilenodesOnlyPublic::default_mapping(&ctx, &repo)?; let manager = repo.repo_derived_data().manager();
FilenodesOnlyPublic::derive(&ctx, &repo, child_empty).await?; manager
.derive::<FilenodesOnlyPublic>(&ctx, child_empty, None)
.await?;
// Make sure they are in the mapping // Make sure they are in the mapping
let maps = mapping.get(&ctx, vec![child_empty, parent_empty]).await?; let maps = manager
.fetch_derived_batch::<FilenodesOnlyPublic>(&ctx, vec![child_empty, parent_empty], None)
.await?;
assert_eq!(maps.len(), 2); assert_eq!(maps.len(), 2);
Ok(()) Ok(())
} }
#[fbinit::test] #[fbinit::test]
fn derive_only_empty_commits(fb: FacebookInit) -> Result<(), Error> { fn derive_only_empty_commits(fb: FacebookInit) -> Result<()> {
let runtime = tokio::runtime::Runtime::new()?; let runtime = tokio::runtime::Runtime::new()?;
runtime.block_on(test_derive_only_empty_commits(fb)) runtime.block_on(test_derive_only_empty_commits(fb))
} }
#[fbinit::test] #[fbinit::test]
fn derive_disabled_filenodes(fb: FacebookInit) -> Result<(), Error> { fn derive_disabled_filenodes(fb: FacebookInit) -> Result<()> {
let tunables = MononokeTunables::default(); let tunables = MononokeTunables::default();
tunables.update_bools(&hashmap! {"filenodes_disabled".to_string() => true}); tunables.update_bools(&hashmap! {"filenodes_disabled".to_string() => true});
with_tunables(tunables, || { with_tunables(tunables, || {
let runtime = tokio::runtime::Runtime::new()?; let runtime = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()?;
runtime.block_on(test_derive_disabled_filenodes(fb)) runtime.block_on(test_derive_disabled_filenodes(fb))
}) })
} }
async fn test_derive_disabled_filenodes(fb: FacebookInit) -> Result<(), Error> { async fn test_derive_disabled_filenodes(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb); let ctx = CoreContext::test_mock(fb);
let repo: BlobRepo = test_repo_factory::build_empty()?; let repo: BlobRepo = test_repo_factory::build_empty()?;
let cs = CreateCommitContext::new_root(&ctx, &repo).commit().await?; let cs = CreateCommitContext::new_root(&ctx, &repo).commit().await?;
let derived = FilenodesOnlyPublic::derive(&ctx, &repo, cs).await?; let derived = repo
.repo_derived_data()
.derive::<FilenodesOnlyPublic>(&ctx, cs)
.await?;
assert_eq!(derived, FilenodesOnlyPublic::Disabled); assert_eq!(derived, FilenodesOnlyPublic::Disabled);
let mapping = FilenodesOnlyPublic::default_mapping(&ctx, &repo)?; assert_eq!(
let res = mapping.get(&ctx, vec![cs]).await?; repo.repo_derived_data()
.fetch_derived::<FilenodesOnlyPublic>(&ctx, cs)
assert_eq!(res.get(&cs).unwrap(), &FilenodesOnlyPublic::Disabled); .await?
.unwrap(),
FilenodesOnlyPublic::Disabled
);
Ok(()) Ok(())
} }
#[fbinit::test] #[fbinit::test]
async fn verify_batch_and_sequential_derive(fb: FacebookInit) -> Result<(), Error> { async fn verify_batch_and_sequential_derive(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb); let ctx = CoreContext::test_mock(fb);
let repo1 = linear::getrepo(fb).await; let repo1 = linear::getrepo(fb).await;
let repo2 = linear::getrepo(fb).await; let repo2 = linear::getrepo(fb).await;
@ -518,19 +547,26 @@ mod tests {
.await?; .await?;
cs_ids.reverse(); cs_ids.reverse();
let mapping = BonsaiDerivedMappingContainer::new( let manager1 = repo1.repo_derived_data().manager();
ctx.fb, manager1
repo1.name(), .backfill_batch::<FilenodesOnlyPublic>(
repo1.get_derived_data_config().scuba_table.as_deref(), &ctx,
Arc::new(FilenodesOnlyPublic::default_mapping(&ctx, &repo1)?), cs_ids.clone(),
); BatchDeriveOptions::Parallel { gap_size: None },
let batch = None,
FilenodesOnlyPublic::batch_derive(&ctx, &repo1, cs_ids.clone(), &mapping, None).await?; )
.await?;
let batch = manager1
.fetch_derived_batch::<FilenodesOnlyPublic>(&ctx, cs_ids.clone(), None)
.await?;
let sequential = { let sequential = {
let mut res = HashMap::new(); let mut res = HashMap::new();
for cs in cs_ids.clone() { for cs in cs_ids.clone() {
let root_filenode = FilenodesOnlyPublic::derive(&ctx, &repo2, cs).await?; let root_filenode = repo2
.repo_derived_data()
.derive::<FilenodesOnlyPublic>(&ctx, cs)
.await?;
res.insert(cs, root_filenode); res.insert(cs, root_filenode);
} }
res res
@ -544,7 +580,7 @@ mod tests {
} }
#[fbinit::test] #[fbinit::test]
async fn derive_parents_before_children(fb: FacebookInit) -> Result<(), Error> { async fn derive_parents_before_children(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb); let ctx = CoreContext::test_mock(fb);
let filenodes_cs_id = Arc::new(Mutex::new(None)); let filenodes_cs_id = Arc::new(Mutex::new(None));
let mut factory = TestRepoFactory::new()?; let mut factory = TestRepoFactory::new()?;
@ -574,13 +610,17 @@ mod tests {
.await?; .await?;
cs_ids.reverse(); cs_ids.reverse();
let mapping = BonsaiDerivedMappingContainer::new( let manager = repo.repo_derived_data().manager();
ctx.fb,
repo.name(), match manager
repo.get_derived_data_config().scuba_table.as_deref(), .backfill_batch::<FilenodesOnlyPublic>(
Arc::new(FilenodesOnlyPublic::default_mapping(&ctx, &repo)?), &ctx,
); cs_ids.clone(),
match FilenodesOnlyPublic::batch_derive(&ctx, &repo, cs_ids.clone(), &mapping, None).await { BatchDeriveOptions::Parallel { gap_size: None },
None,
)
.await
{
Ok(_) => {} Ok(_) => {}
Err(_) => {} Err(_) => {}
}; };
@ -588,8 +628,10 @@ mod tests {
// FilenodesWrapper prevents writing of root filenode for a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157 (8th commit in repo) // FilenodesWrapper prevents writing of root filenode for a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157 (8th commit in repo)
// so all children (9, 10, 11) should not have root_filenodes written // so all children (9, 10, 11) should not have root_filenodes written
for cs_id in cs_ids.into_iter().skip(8) { for cs_id in cs_ids.into_iter().skip(8) {
let filenodes = mapping.get(&ctx, vec![cs_id]).await?; let filenode = manager
assert_eq!(filenodes.len(), 0); .fetch_derived::<FilenodesOnlyPublic>(&ctx, cs_id, None)
.await?;
assert_eq!(filenode, None);
} }
Ok(()) Ok(())
} }
@ -653,12 +695,16 @@ mod tests {
repo: &BlobRepo, repo: &BlobRepo,
backup_repo: &BlobRepo, backup_repo: &BlobRepo,
cs: ChangesetId, cs: ChangesetId,
) -> Result<(), Error> { ) -> Result<()> {
let prod_filenodes = repo.get_filenodes(); let prod_filenodes = repo.get_filenodes();
let backup_filenodes = backup_repo.get_filenodes(); let backup_filenodes = backup_repo.get_filenodes();
let manifest = fetch_root_manifest_id(ctx, cs, repo) let manifest = repo
.get_hg_from_bonsai_changeset(ctx.clone(), cs)
.await?
.load(ctx, repo.blobstore())
.await .await
.with_context(|| format!("while fetching manifest from prod for cs {:?}", cs))?; .with_context(|| format!("while fetching manifest from prod for cs {:?}", cs))?
.manifestid();
manifest manifest
.list_all_entries(ctx.clone(), repo.get_blobstore()) .list_all_entries(ctx.clone(), repo.get_blobstore())
.map_ok(|(path, entry)| { .map_ok(|(path, entry)| {

View File

@ -11,4 +11,4 @@ mod derive;
mod mapping; mod mapping;
pub use derive::generate_all_filenodes; pub use derive::generate_all_filenodes;
pub use mapping::{FilenodesOnlyPublic, FilenodesOnlyPublicMapping, PreparedRootFilenode}; pub use mapping::{FilenodesOnlyPublic, PreparedRootFilenode};

View File

@ -5,24 +5,16 @@
* GNU General Public License version 2. * GNU General Public License version 2.
*/ */
use anyhow::{format_err, Error}; use anyhow::{bail, format_err, Error, Result};
use async_trait::async_trait; use async_trait::async_trait;
use blobrepo::BlobRepo; use blobstore::Loadable;
use blobrepo_hg::BlobRepoHg;
use blobstore::{Blobstore, Loadable};
use bonsai_hg_mapping::{BonsaiHgMapping, BonsaiHgMappingArc};
use context::CoreContext; use context::CoreContext;
use derived_data::{ use derived_data::impl_bonsai_derived_via_manager;
BonsaiDerivable, BonsaiDerivedMapping, BonsaiDerivedMappingContainer, BonsaiDerivedOld, use derived_data_manager::{dependencies, BonsaiDerivable, DerivationContext};
DeriveError, DerivedDataTypesConfig, use filenodes::{FilenodeInfo, FilenodeResult, PreparedFilenode};
}; use mercurial_derived_data::MappedHgChangesetId;
use filenodes::{FilenodeInfo, FilenodeResult, Filenodes, FilenodesArc, PreparedFilenode};
use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
use mercurial_types::{HgChangesetId, HgFileNodeId, NULL_HASH}; use mercurial_types::{HgChangesetId, HgFileNodeId, NULL_HASH};
use mononoke_types::{BonsaiChangeset, ChangesetId, RepoPath, RepositoryId}; use mononoke_types::{BonsaiChangeset, ChangesetId, RepoPath};
use repo_blobstore::RepoBlobstoreRef;
use repo_identity::RepoIdentityRef;
use std::sync::Arc;
use std::{collections::HashMap, convert::TryFrom}; use std::{collections::HashMap, convert::TryFrom};
use crate::derive::{derive_filenodes, derive_filenodes_in_batch}; use crate::derive::{derive_filenodes, derive_filenodes_in_batch};
@ -62,7 +54,7 @@ impl From<PreparedRootFilenode> for PreparedFilenode {
impl TryFrom<PreparedFilenode> for PreparedRootFilenode { impl TryFrom<PreparedFilenode> for PreparedRootFilenode {
type Error = Error; type Error = Error;
fn try_from(filenode: PreparedFilenode) -> Result<Self, Self::Error> { fn try_from(filenode: PreparedFilenode) -> Result<Self> {
let PreparedFilenode { path, info } = filenode; let PreparedFilenode { path, info } = filenode;
let FilenodeInfo { let FilenodeInfo {
@ -103,36 +95,25 @@ pub enum FilenodesOnlyPublic {
impl BonsaiDerivable for FilenodesOnlyPublic { impl BonsaiDerivable for FilenodesOnlyPublic {
const NAME: &'static str = "filenodes"; const NAME: &'static str = "filenodes";
type Options = (); type Dependencies = dependencies![MappedHgChangesetId];
async fn derive_from_parents_impl( async fn derive_single(
ctx: CoreContext, ctx: &CoreContext,
repo: BlobRepo, derivation_ctx: &DerivationContext,
bonsai: BonsaiChangeset, bonsai: BonsaiChangeset,
_parents: Vec<Self>, _parents: Vec<Self>,
_options: &Self::Options, ) -> Result<Self> {
) -> Result<Self, Error> { derive_filenodes(ctx, derivation_ctx, bonsai).await
derive_filenodes(&ctx, &repo, bonsai).await
} }
async fn batch_derive_impl( async fn derive_batch(
ctx: &CoreContext, ctx: &CoreContext,
repo: &BlobRepo, derivation_ctx: &DerivationContext,
csids: Vec<ChangesetId>, bonsais: Vec<BonsaiChangeset>,
mapping: &BonsaiDerivedMappingContainer<Self>,
_gap_size: Option<usize>, _gap_size: Option<usize>,
) -> Result<HashMap<ChangesetId, Self>, Error> { ) -> Result<HashMap<ChangesetId, Self>> {
let filenodes = repo.get_filenodes(); let filenodes = derivation_ctx.filenodes();
let blobstore = repo.blobstore(); let prepared = derive_filenodes_in_batch(ctx, derivation_ctx, bonsais).await?;
let bonsais = stream::iter(
csids
.into_iter()
.map(|bcs_id| async move { bcs_id.load(ctx, blobstore).await }),
)
.buffered(100)
.try_collect::<Vec<_>>()
.await?;
let prepared = derive_filenodes_in_batch(ctx, repo, bonsais).await?;
let mut res = HashMap::with_capacity(prepared.len()); let mut res = HashMap::with_capacity(prepared.len());
for (cs_id, public_filenode, non_roots) in prepared.into_iter() { for (cs_id, public_filenode, non_roots) in prepared.into_iter() {
let filenode = match public_filenode { let filenode = match public_filenode {
@ -150,166 +131,118 @@ impl BonsaiDerivable for FilenodesOnlyPublic {
FilenodesOnlyPublic::Disabled => FilenodesOnlyPublic::Disabled, FilenodesOnlyPublic::Disabled => FilenodesOnlyPublic::Disabled,
}; };
res.insert(cs_id, filenode.clone()); res.insert(cs_id, filenode.clone());
if let FilenodesOnlyPublic::Disabled = filenode {
continue;
}
mapping.put(ctx, cs_id, &filenode).await?;
} }
Ok(res) Ok(res)
} }
}
#[derive(Clone)] async fn store_mapping(
pub struct FilenodesOnlyPublicMapping { self,
repo_id: RepositoryId,
bonsai_hg_mapping: Arc<dyn BonsaiHgMapping>,
filenodes: Arc<dyn Filenodes>,
blobstore: Arc<dyn Blobstore>,
}
impl FilenodesOnlyPublicMapping {
pub fn new(
repo: &(impl RepoIdentityRef + BonsaiHgMappingArc + FilenodesArc + RepoBlobstoreRef),
_config: &DerivedDataTypesConfig,
) -> Result<Self, DeriveError> {
Ok(Self {
repo_id: repo.repo_identity().id(),
bonsai_hg_mapping: repo.bonsai_hg_mapping_arc(),
filenodes: repo.filenodes_arc(),
blobstore: repo.repo_blobstore().boxed(),
})
}
}
#[async_trait]
impl BonsaiDerivedOld for FilenodesOnlyPublic {
type DefaultMapping = FilenodesOnlyPublicMapping;
fn default_mapping(
_ctx: &CoreContext,
repo: &BlobRepo,
) -> Result<Self::DefaultMapping, DeriveError> {
let config = derived_data::enabled_type_config(repo, Self::NAME)?;
FilenodesOnlyPublicMapping::new(repo, config)
}
}
#[async_trait]
impl BonsaiDerivedMapping for FilenodesOnlyPublicMapping {
type Value = FilenodesOnlyPublic;
async fn get(
&self,
ctx: &CoreContext, ctx: &CoreContext,
csids: Vec<ChangesetId>, derivation_ctx: &DerivationContext,
) -> Result<HashMap<ChangesetId, Self::Value>, Error> { _changeset_id: ChangesetId,
stream::iter(csids.into_iter()) ) -> Result<()> {
.map({ let root_filenode = match self {
move |cs_id| async move { FilenodesOnlyPublic::Present { root_filenode } => match root_filenode {
let filenode_res = self.fetch_root_filenode(ctx, cs_id).await?; Some(root_filenode) => root_filenode,
let maybe_root_filenode = match filenode_res { None => return Ok(()),
FilenodeResult::Present(maybe_root_filenode) => maybe_root_filenode, },
FilenodeResult::Disabled => { FilenodesOnlyPublic::Disabled => return Ok(()),
return Ok(Some((cs_id, FilenodesOnlyPublic::Disabled)));
}
};
Ok(maybe_root_filenode.map(move |filenode| {
(
cs_id,
FilenodesOnlyPublic::Present {
root_filenode: Some(filenode),
},
)
}))
}
})
.buffer_unordered(100)
.try_filter_map(|x| async { Ok(x) })
.try_collect()
.await
}
async fn put(
&self,
ctx: &CoreContext,
_csid: ChangesetId,
id: &Self::Value,
) -> Result<(), Error> {
let root_filenode = match id {
FilenodesOnlyPublic::Present { root_filenode } => root_filenode.as_ref(),
FilenodesOnlyPublic::Disabled => None,
}; };
match root_filenode { match derivation_ctx
Some(root_filenode) => { .filenodes()
self.filenodes .add_filenodes(ctx, vec![root_filenode.into()])
.add_filenodes(ctx, vec![root_filenode.clone().into()]) .await?
.map_ok(|res| match res { {
// If filenodes are disabled then just return success FilenodeResult::Present(()) => Ok(()),
// but use explicit match here in case we add more variants FilenodeResult::Disabled => {
// to FilenodeResult enum // Filenodes got disabled just after we finished deriving them
FilenodeResult::Present(()) | FilenodeResult::Disabled => {} // but before we stored the mapping. Ideally we would return
}) // FilenodesMaybePublic::Disabled to the caller, but in this
.await // very small window there is no way to do that. Instead we
// must fail the request.
bail!("filenodes were disabled after being successfully derived")
} }
None => Ok(()),
} }
} }
fn options(&self) {} async fn fetch(
ctx: &CoreContext,
derivation_ctx: &DerivationContext,
changeset_id: ChangesetId,
) -> Result<Option<Self>> {
if tunables::tunables().get_filenodes_disabled() {
return Ok(Some(FilenodesOnlyPublic::Disabled));
}
let filenode_res = fetch_root_filenode(ctx, derivation_ctx, changeset_id).await?;
let maybe_root_filenode = match filenode_res {
FilenodeResult::Present(maybe_root_filenode) => maybe_root_filenode,
FilenodeResult::Disabled => {
return Ok(Some(FilenodesOnlyPublic::Disabled));
}
};
Ok(
maybe_root_filenode.map(move |filenode| FilenodesOnlyPublic::Present {
root_filenode: Some(filenode),
}),
)
}
} }
impl FilenodesOnlyPublicMapping { async fn fetch_root_filenode(
async fn fetch_root_filenode( ctx: &CoreContext,
&self, derivation_ctx: &DerivationContext,
ctx: &CoreContext, cs_id: ChangesetId,
cs_id: ChangesetId, ) -> Result<FilenodeResult<Option<PreparedRootFilenode>>> {
) -> Result<FilenodeResult<Option<PreparedRootFilenode>>, Error> { // If hg changeset is not generated, then root filenode can't possible be generated
// If hg changeset is not generated, then root filenode can't possible be generated // Check it and return None if hg changeset is not generated
// Check it and return None if hg changeset is not generated let maybe_hg_cs_id = derivation_ctx
let maybe_hg_cs_id = self .bonsai_hg_mapping()
.bonsai_hg_mapping .get_hg_from_bonsai(ctx, derivation_ctx.repo_id(), cs_id)
.get_hg_from_bonsai(ctx, self.repo_id, cs_id) .await?;
let hg_cs_id = if let Some(hg_cs_id) = maybe_hg_cs_id {
hg_cs_id
} else {
return Ok(FilenodeResult::Present(None));
};
let mf_id = hg_cs_id
.load(ctx, &derivation_ctx.blobstore())
.await?
.manifestid();
// Special case null manifest id if we run into it
let mf_id = mf_id.into_nodehash();
if mf_id == NULL_HASH {
Ok(FilenodeResult::Present(Some(PreparedRootFilenode {
filenode: HgFileNodeId::new(NULL_HASH),
p1: None,
p2: None,
copyfrom: None,
linknode: HgChangesetId::new(NULL_HASH),
})))
} else {
let filenode_res = derivation_ctx
.filenodes()
.get_filenode(ctx, &RepoPath::RootPath, HgFileNodeId::new(mf_id))
.await?; .await?;
let hg_cs_id = if let Some(hg_cs_id) = maybe_hg_cs_id {
hg_cs_id
} else {
return Ok(FilenodeResult::Present(None));
};
let mf_id = hg_cs_id.load(ctx, &self.blobstore).await?.manifestid(); match filenode_res {
FilenodeResult::Present(maybe_info) => {
// Special case null manifest id if we run into it let info = maybe_info
let mf_id = mf_id.into_nodehash(); .map(|info| {
if mf_id == NULL_HASH { PreparedRootFilenode::try_from(PreparedFilenode {
Ok(FilenodeResult::Present(Some(PreparedRootFilenode { path: RepoPath::RootPath,
filenode: HgFileNodeId::new(NULL_HASH), info,
p1: None,
p2: None,
copyfrom: None,
linknode: HgChangesetId::new(NULL_HASH),
})))
} else {
let filenode_res = self
.filenodes
.get_filenode(ctx, &RepoPath::RootPath, HgFileNodeId::new(mf_id))
.await?;
match filenode_res {
FilenodeResult::Present(maybe_info) => {
let info = maybe_info
.map(|info| {
PreparedRootFilenode::try_from(PreparedFilenode {
path: RepoPath::RootPath,
info,
})
}) })
.transpose()?; })
Ok(FilenodeResult::Present(info)) .transpose()?;
} Ok(FilenodeResult::Present(info))
FilenodeResult::Disabled => Ok(FilenodeResult::Disabled),
} }
FilenodeResult::Disabled => Ok(FilenodeResult::Disabled),
} }
} }
} }
impl_bonsai_derived_via_manager!(FilenodesOnlyPublic);

View File

@ -25,7 +25,7 @@ use derived_data::{
derive_impl, BonsaiDerivable, BonsaiDerivedMapping, BonsaiDerivedMappingContainer, derive_impl, BonsaiDerivable, BonsaiDerivedMapping, BonsaiDerivedMappingContainer,
DerivedDataTypesConfig, RegenerateMapping, DerivedDataTypesConfig, RegenerateMapping,
}; };
use derived_data_filenodes::{FilenodesOnlyPublic, FilenodesOnlyPublicMapping}; use derived_data_filenodes::FilenodesOnlyPublic;
use derived_data_manager::{ use derived_data_manager::{
BatchDeriveOptions, BatchDeriveStats, BonsaiDerivable as NewBonsaiDerivable, BatchDeriveOptions, BatchDeriveStats, BonsaiDerivable as NewBonsaiDerivable,
DerivedDataManager, Rederivation, DerivedDataManager, Rederivation,
@ -216,6 +216,7 @@ where
Derivable: BonsaiDerivable, Derivable: BonsaiDerivable,
Mapping: BonsaiDerivedMapping<Value = Derivable> + 'static, Mapping: BonsaiDerivedMapping<Value = Derivable> + 'static,
{ {
#[allow(dead_code)]
fn new(fb: FacebookInit, mapping: Mapping, repo: BlobRepo) -> Self { fn new(fb: FacebookInit, mapping: Mapping, repo: BlobRepo) -> Self {
let orig_mapping = Arc::new(RegenerateMapping::new(mapping)); let orig_mapping = Arc::new(RegenerateMapping::new(mapping));
let mapping = BonsaiDerivedMappingContainer::new( let mapping = BonsaiDerivedMappingContainer::new(
@ -672,7 +673,7 @@ pub fn derived_data_utils_for_backfill(
} }
fn derived_data_utils_impl( fn derived_data_utils_impl(
fb: FacebookInit, _fb: FacebookInit,
repo: &BlobRepo, repo: &BlobRepo,
name: &str, name: &str,
config: &DerivedDataTypesConfig, config: &DerivedDataTypesConfig,
@ -704,14 +705,9 @@ fn derived_data_utils_impl(
RootDeletedManifestId::NAME => Ok(Arc::new( RootDeletedManifestId::NAME => Ok(Arc::new(
DerivedUtilsFromManager::<RootDeletedManifestId>::new(repo, config), DerivedUtilsFromManager::<RootDeletedManifestId>::new(repo, config),
)), )),
FilenodesOnlyPublic::NAME => { FilenodesOnlyPublic::NAME => Ok(Arc::new(
let mapping = FilenodesOnlyPublicMapping::new(repo, config)?; DerivedUtilsFromManager::<FilenodesOnlyPublic>::new(repo, config),
Ok(Arc::new(DerivedUtilsFromMapping::new( )),
fb,
mapping,
repo.clone(),
)))
}
RootSkeletonManifestId::NAME => Ok(Arc::new(DerivedUtilsFromManager::< RootSkeletonManifestId::NAME => Ok(Arc::new(DerivedUtilsFromManager::<
RootSkeletonManifestId, RootSkeletonManifestId,
>::new(repo, config))), >::new(repo, config))),

View File

@ -20,6 +20,7 @@ use derived_data_filenodes::generate_all_filenodes;
use fbinit::FacebookInit; use fbinit::FacebookInit;
use futures::future::{join_all, FutureExt}; use futures::future::{join_all, FutureExt};
use mercurial_types::{HgChangesetId, HgNodeHash}; use mercurial_types::{HgChangesetId, HgNodeHash};
use repo_derived_data::RepoDerivedDataRef;
use slog::info; use slog::info;
use std::fs::File; use std::fs::File;
use std::str::FromStr; use std::str::FromStr;
@ -52,7 +53,12 @@ async fn regenerate_single_manifest(
let cs_id = maybe_cs_id.ok_or(format_err!("changeset not found {}", hg_cs))?; let cs_id = maybe_cs_id.ok_or(format_err!("changeset not found {}", hg_cs))?;
let bonsai = cs_id.load(&ctx, repo.blobstore()).await?; let bonsai = cs_id.load(&ctx, repo.blobstore()).await?;
let toinsert = generate_all_filenodes(&ctx, &repo, &bonsai).await?; let toinsert = generate_all_filenodes(
&ctx,
&repo.repo_derived_data().manager().derivation_context(None),
&bonsai,
)
.await?;
repo.get_filenodes() repo.get_filenodes()
.add_or_replace_filenodes(&ctx, toinsert) .add_or_replace_filenodes(&ctx, toinsert)

View File

@ -15,7 +15,6 @@ use bookmarks::BookmarkName;
use changeset_info::ChangesetInfo; use changeset_info::ChangesetInfo;
use context::CoreContext; use context::CoreContext;
use deleted_files_manifest::RootDeletedManifestId; use deleted_files_manifest::RootDeletedManifestId;
use derived_data::BonsaiDerivable;
use derived_data_filenodes::FilenodesOnlyPublic; use derived_data_filenodes::FilenodesOnlyPublic;
use derived_data_manager::BonsaiDerivable as NewBonsaiDerivable; use derived_data_manager::BonsaiDerivable as NewBonsaiDerivable;
use fastlog::{unode_entry_to_fastlog_batch_key, RootFastlog}; use fastlog::{unode_entry_to_fastlog_batch_key, RootFastlog};

View File

@ -32,7 +32,6 @@ use cmdlib::args::{
self, ArgType, CachelibSettings, MononokeClapApp, MononokeMatches, RepoRequirement, self, ArgType, CachelibSettings, MononokeClapApp, MononokeMatches, RepoRequirement,
ResolvedRepo, ResolvedRepo,
}; };
use derived_data::BonsaiDerivable;
use derived_data_filenodes::FilenodesOnlyPublic; use derived_data_filenodes::FilenodesOnlyPublic;
use derived_data_manager::BonsaiDerivable as NewBonsaiDerivable; use derived_data_manager::BonsaiDerivable as NewBonsaiDerivable;
use fbinit::FacebookInit; use fbinit::FacebookInit;

View File

@ -20,7 +20,6 @@ use bonsai_hg_mapping::BonsaiOrHgChangesetIds;
use bulkops::{Direction, PublicChangesetBulkFetch, MAX_FETCH_STEP}; use bulkops::{Direction, PublicChangesetBulkFetch, MAX_FETCH_STEP};
use cloned::cloned; use cloned::cloned;
use context::CoreContext; use context::CoreContext;
use derived_data::BonsaiDerivable;
use derived_data_filenodes::FilenodesOnlyPublic; use derived_data_filenodes::FilenodesOnlyPublic;
use derived_data_manager::BonsaiDerivable as NewBonsaiDerivable; use derived_data_manager::BonsaiDerivable as NewBonsaiDerivable;
use fbinit::FacebookInit; use fbinit::FacebookInit;