diff --git a/eden/mononoke/derived_data/blame/batch_v2.rs b/eden/mononoke/derived_data/blame/batch_v2.rs index 970413a806..5e88857b5b 100644 --- a/eden/mononoke/derived_data/blame/batch_v2.rs +++ b/eden/mononoke/derived_data/blame/batch_v2.rs @@ -12,7 +12,7 @@ use anyhow::{anyhow, Error, Result}; use borrowed::borrowed; use cloned::cloned; use context::CoreContext; -use derived_data::batch::{split_bonsais_in_linear_stacks, FileChangeAggregation, FileConflicts}; +use derived_data::batch::{split_bonsais_in_linear_stacks, FileConflicts}; use derived_data_manager::DerivationContext; use futures::stream::{FuturesOrdered, TryStreamExt}; use lock_ext::LockExt; @@ -30,13 +30,7 @@ pub async fn derive_blame_v2_in_batch( ) -> Result, Error> { let batch_len = bonsais.len(); // We must split on any change as blame data must use the parent file. - let linear_stacks = split_bonsais_in_linear_stacks( - &bonsais, - FileConflicts::AnyChange, - // For blame we don't care about file changes, so FileChangeAggregation can be anything - FileChangeAggregation::Aggregate, - ) - .await?; + let linear_stacks = split_bonsais_in_linear_stacks(&bonsais, FileConflicts::AnyChange).await?; let bonsais = Mutex::new( bonsais .into_iter() @@ -47,11 +41,11 @@ pub async fn derive_blame_v2_in_batch( let mut res = HashMap::new(); for linear_stack in linear_stacks { - if let Some((cs_id, _fc)) = linear_stack.file_changes.first() { + if let Some(item) = linear_stack.file_changes.first() { debug!( ctx.logger(), "derive blame batch at {} (stack of {} from batch of {})", - cs_id.to_hex(), + item.cs_id.to_hex(), linear_stack.file_changes.len(), batch_len, ); @@ -60,10 +54,11 @@ pub async fn derive_blame_v2_in_batch( let new_blames = linear_stack .file_changes .into_iter() - .map(|(csid, _fc)| { + .map(|item| { // Clone owning copied to pass into the spawned future. cloned!(ctx, derivation_ctx); async move { + let csid = item.cs_id; let bonsai = bonsais .with(|bonsais| bonsais.remove(&csid)) .ok_or_else(|| anyhow!("changeset {} should be in bonsai batch", csid))?; diff --git a/eden/mononoke/derived_data/fsnodes/batch.rs b/eden/mononoke/derived_data/fsnodes/batch.rs index f271886490..e7dbc865f1 100644 --- a/eden/mononoke/derived_data/fsnodes/batch.rs +++ b/eden/mononoke/derived_data/fsnodes/batch.rs @@ -12,7 +12,7 @@ use blobrepo::BlobRepo; use borrowed::borrowed; use cloned::cloned; use context::CoreContext; -use derived_data::batch::{split_batch_in_linear_stacks, FileChangeAggregation, FileConflicts}; +use derived_data::batch::{split_batch_in_linear_stacks, FileConflicts}; use derived_data::{derive_impl, BonsaiDerivedMappingContainer}; use futures::stream::{FuturesOrdered, TryStreamExt}; use mononoke_types::{ChangesetId, FsnodeId}; @@ -62,7 +62,6 @@ pub async fn derive_fsnode_in_batch( manager.repo_blobstore(), batch, FileConflicts::ChangeDelete, - FileChangeAggregation::Aggregate, ) .await?; let mut res = HashMap::new(); @@ -101,15 +100,21 @@ pub async fn derive_fsnode_in_batch( let new_fsnodes = to_derive .into_iter() - .map(|(cs_id, fc)| { + .map(|item| { // Clone the values that we need owned copies of to move // into the future we are going to spawn, which means it // must have static lifetime. cloned!(ctx, manager, parent_fsnodes); async move { + let cs_id = item.cs_id; let derivation_fut = async move { - derive_fsnode(&ctx, &manager, parent_fsnodes, fc.into_iter().collect()) - .await + derive_fsnode( + &ctx, + &manager, + parent_fsnodes, + item.combined_file_changes.into_iter().collect(), + ) + .await }; let derivation_handle = tokio::spawn(derivation_fut); let fsnode_id: FsnodeId = derivation_handle.await??; diff --git a/eden/mononoke/derived_data/skeleton_manifest/batch.rs b/eden/mononoke/derived_data/skeleton_manifest/batch.rs index a38cec7d6e..a3d7c09bee 100644 --- a/eden/mononoke/derived_data/skeleton_manifest/batch.rs +++ b/eden/mononoke/derived_data/skeleton_manifest/batch.rs @@ -12,7 +12,7 @@ use blobrepo::BlobRepo; use borrowed::borrowed; use cloned::cloned; use context::CoreContext; -use derived_data::batch::{split_batch_in_linear_stacks, FileChangeAggregation, FileConflicts}; +use derived_data::batch::{split_batch_in_linear_stacks, FileConflicts}; use derived_data::{derive_impl, BonsaiDerivedMappingContainer}; use futures::stream::{FuturesOrdered, TryStreamExt}; use mononoke_types::{ChangesetId, SkeletonManifestId}; @@ -40,7 +40,6 @@ pub async fn derive_skeleton_manifests_in_batch( manager.repo_blobstore(), batch, FileConflicts::ChangeDelete, - FileChangeAggregation::Aggregate, ) .await?; let mut res = HashMap::new(); @@ -80,18 +79,19 @@ pub async fn derive_skeleton_manifests_in_batch( let new_skeleton_manifests = to_derive .into_iter() - .map(|(cs_id, fc)| { + .map(|item| { // Clone the values that we need owned copies of to move // into the future we are going to spawn, which means it // must have static lifetime. cloned!(ctx, manager, parent_skeleton_manifests); async move { + let cs_id = item.cs_id; let derivation_fut = async move { derive_skeleton_manifest( &ctx, &manager, parent_skeleton_manifests, - fc.into_iter().collect(), + item.combined_file_changes.into_iter().collect(), ) .await }; diff --git a/eden/mononoke/derived_data/src/batch.rs b/eden/mononoke/derived_data/src/batch.rs index 54cf7b7199..2922ec1264 100644 --- a/eden/mononoke/derived_data/src/batch.rs +++ b/eden/mononoke/derived_data/src/batch.rs @@ -28,23 +28,11 @@ pub enum FileConflicts { AnyChange, } -#[derive(Copy, Clone, Debug)] -pub enum FileChangeAggregation { - /// In a stack each commit will have the file changes from itself - /// and all its ancestors that are in the stack. E.g. first commit - /// has just its own file changes, second commit has its own changes and - /// file changes from first commit etc. - Aggregate, - /// Each commit in the stack has only its own file changes - DontAggregate, -} - pub async fn split_batch_in_linear_stacks( ctx: &CoreContext, blobstore: &RepoBlobstore, batch: Vec, file_conflicts: FileConflicts, - file_change_aggregation: FileChangeAggregation, ) -> Result, Error> { let bonsais = stream::iter( batch @@ -54,7 +42,7 @@ pub async fn split_batch_in_linear_stacks( .buffered(100) .try_collect::>() .await?; - split_bonsais_in_linear_stacks(&bonsais, file_conflicts, file_change_aggregation).await + split_bonsais_in_linear_stacks(&bonsais, file_conflicts).await } /// We follow a few rules when splitting a batch in the stacks: @@ -66,7 +54,6 @@ pub async fn split_batch_in_linear_stacks( pub async fn split_bonsais_in_linear_stacks( bonsais: &[BonsaiChangeset], file_conflicts: FileConflicts, - file_change_aggregation: FileChangeAggregation, ) -> Result, Error> { let start_bcs = match bonsais.first() { Some(val) => val, @@ -77,7 +64,7 @@ pub async fn split_bonsais_in_linear_stacks( let mut linear_stacks = Vec::new(); let mut cur_linear_stack = LinearStack::new(start_bcs.parents().collect::>()); - cur_linear_stack.push(start_bcs, file_change_aggregation); + cur_linear_stack.push(start_bcs); for (prev_bcs, bcs) in bonsais.iter().tuple_windows() { if !cur_linear_stack.can_be_in_same_linear_stack(prev_bcs, bcs, file_conflicts) { @@ -85,7 +72,7 @@ pub async fn split_bonsais_in_linear_stacks( cur_linear_stack = LinearStack::new(bcs.parents().collect::>()); } - cur_linear_stack.push(bcs, file_change_aggregation); + cur_linear_stack.push(bcs); } linear_stacks.push(cur_linear_stack); @@ -96,9 +83,17 @@ pub async fn split_bonsais_in_linear_stacks( // Stores a linear stack. // `file_changes` contains the list of file changes that need to be applied to `parents` // to generate derived data for a particular commit. +#[derive(Clone, Debug)] pub struct LinearStack { pub parents: Vec, - pub file_changes: Vec<(ChangesetId, FileToContent)>, + pub file_changes: Vec, +} + +#[derive(Clone, Debug)] +pub struct StackItem { + pub cs_id: ChangesetId, + pub combined_file_changes: FileToContent, + pub per_commit_file_changes: FileToContent, } impl LinearStack { @@ -109,26 +104,26 @@ impl LinearStack { } } - fn push(&mut self, bcs: &BonsaiChangeset, file_change_aggregation: FileChangeAggregation) { + fn push(&mut self, bcs: &BonsaiChangeset) { let cs_id = bcs.get_changeset_id(); - let file_changes = bcs.file_changes().map(|(path, fc)| { - ( - path.clone(), - fc.simplify().map(|bc| (bc.content_id(), bc.file_type())), - ) - }); + let file_changes = bcs + .file_changes() + .map(|(path, fc)| { + ( + path.clone(), + fc.simplify().map(|bc| (bc.content_id(), bc.file_type())), + ) + }) + .collect::>(); - use FileChangeAggregation::*; - match file_change_aggregation { - Aggregate => { - let mut fc = self.get_last_file_changes().cloned().unwrap_or_default(); - fc.extend(file_changes); - self.file_changes.push((cs_id, fc)); - } - DontAggregate => { - self.file_changes.push((cs_id, file_changes.collect())); - } - } + + let mut combined_file_changes = self.get_last_file_changes().cloned().unwrap_or_default(); + combined_file_changes.extend(file_changes.clone()); + self.file_changes.push(StackItem { + cs_id, + combined_file_changes, + per_commit_file_changes: file_changes, + }); } fn can_be_in_same_linear_stack( @@ -160,7 +155,9 @@ impl LinearStack { } fn get_last_file_changes(&self) -> Option<&FileToContent> { - self.file_changes.last().map(|(_, fc)| fc) + self.file_changes + .last() + .map(|item| &item.combined_file_changes) } } @@ -245,7 +242,6 @@ mod test { repo.blobstore(), vec![root], FileConflicts::ChangeDelete, - FileChangeAggregation::Aggregate, ) .await?; assert_linear_stacks( @@ -264,7 +260,6 @@ mod test { repo.blobstore(), vec![second], FileConflicts::ChangeDelete, - FileChangeAggregation::Aggregate, ) .await?; assert_linear_stacks( @@ -283,13 +278,12 @@ mod test { repo.blobstore(), vec![root, second], FileConflicts::ChangeDelete, - FileChangeAggregation::Aggregate, ) .await?; assert_linear_stacks( &ctx, &repo, - linear_stacks, + linear_stacks.clone(), vec![( vec![], vec![ @@ -303,30 +297,22 @@ mod test { ) .await?; - // Now without aggregation - let linear_stacks = split_batch_in_linear_stacks( - &ctx, - repo.blobstore(), - vec![root, second], - FileConflicts::ChangeDelete, - FileChangeAggregation::DontAggregate, - ) - .await?; - assert_linear_stacks( - &ctx, - &repo, - linear_stacks, - vec![( - vec![], - vec![ - btreemap! {file1 => Some(("content1".to_string(), FileType::Regular))}, - btreemap! { - file2 => Some(("content2".to_string(), FileType::Regular)), - }, - ], - )], - ) - .await?; + // Check that per_commit_file_changes are correct + assert_eq!( + linear_stacks[0].file_changes[0] + .per_commit_file_changes + .keys() + .collect::>(), + vec![&MPath::new(file1)?] + ); + + assert_eq!( + linear_stacks[0].file_changes[1] + .per_commit_file_changes + .keys() + .collect::>(), + vec![&MPath::new(file2)?] + ); Ok(()) } @@ -357,7 +343,6 @@ mod test { repo.blobstore(), vec![p1, merge], FileConflicts::ChangeDelete, - FileChangeAggregation::Aggregate, ) .await?; assert_linear_stacks( @@ -382,7 +367,6 @@ mod test { repo.blobstore(), vec![p1, p2], FileConflicts::ChangeDelete, - FileChangeAggregation::Aggregate, ) .await?; assert_linear_stacks( @@ -428,7 +412,6 @@ mod test { repo.blobstore(), vec![root, child], FileConflicts::ChangeDelete, - FileChangeAggregation::Aggregate, ) .await?; assert_linear_stacks( @@ -471,7 +454,6 @@ mod test { repo.blobstore(), vec![root, child], FileConflicts::ChangeDelete, - FileChangeAggregation::Aggregate, ) .await?; assert_linear_stacks( @@ -514,7 +496,6 @@ mod test { repo.blobstore(), vec![root, child], FileConflicts::ChangeDelete, - FileChangeAggregation::Aggregate, ) .await?; assert_linear_stacks( @@ -537,7 +518,6 @@ mod test { repo.blobstore(), vec![root, child], FileConflicts::AnyChange, - FileChangeAggregation::Aggregate, ) .await?; assert_linear_stacks( @@ -576,7 +556,8 @@ mod test { } = linear_stack; let mut paths_for_the_whole_stack = vec![]; - for (_, file_to_content) in file_changes { + for item in file_changes { + let file_to_content = item.combined_file_changes; let mut paths = btreemap![]; for (path, maybe_content) in file_to_content { let maybe_content = match maybe_content {