mononoke: remove FileAggregation from split_batch_in_linear_stacks

Summary:
I recently added this feature but it had a bug - when DontAggregate mode was
used it compared file changes of a new commit with the previous commit only
instead of all changes in the stack.

Since FileAggregation is broken let's remove it and collect both file changes
for the whole stack and for a given commit

Reviewed By: mitrandir77

Differential Revision: D31145055

fbshipit-source-id: 99dbedb919fb9edbdfaeaa658d49a08d008bd282
This commit is contained in:
Stanislau Hlebik 2021-09-23 23:23:19 -07:00 committed by Facebook GitHub Bot
parent 1477f6a520
commit 6760edfd9d
4 changed files with 72 additions and 91 deletions

View File

@ -12,7 +12,7 @@ use anyhow::{anyhow, Error, Result};
use borrowed::borrowed;
use cloned::cloned;
use context::CoreContext;
use derived_data::batch::{split_bonsais_in_linear_stacks, FileChangeAggregation, FileConflicts};
use derived_data::batch::{split_bonsais_in_linear_stacks, FileConflicts};
use derived_data_manager::DerivationContext;
use futures::stream::{FuturesOrdered, TryStreamExt};
use lock_ext::LockExt;
@ -30,13 +30,7 @@ pub async fn derive_blame_v2_in_batch(
) -> Result<HashMap<ChangesetId, RootBlameV2>, Error> {
let batch_len = bonsais.len();
// We must split on any change as blame data must use the parent file.
let linear_stacks = split_bonsais_in_linear_stacks(
&bonsais,
FileConflicts::AnyChange,
// For blame we don't care about file changes, so FileChangeAggregation can be anything
FileChangeAggregation::Aggregate,
)
.await?;
let linear_stacks = split_bonsais_in_linear_stacks(&bonsais, FileConflicts::AnyChange).await?;
let bonsais = Mutex::new(
bonsais
.into_iter()
@ -47,11 +41,11 @@ pub async fn derive_blame_v2_in_batch(
let mut res = HashMap::new();
for linear_stack in linear_stacks {
if let Some((cs_id, _fc)) = linear_stack.file_changes.first() {
if let Some(item) = linear_stack.file_changes.first() {
debug!(
ctx.logger(),
"derive blame batch at {} (stack of {} from batch of {})",
cs_id.to_hex(),
item.cs_id.to_hex(),
linear_stack.file_changes.len(),
batch_len,
);
@ -60,10 +54,11 @@ pub async fn derive_blame_v2_in_batch(
let new_blames = linear_stack
.file_changes
.into_iter()
.map(|(csid, _fc)| {
.map(|item| {
// Clone owning copied to pass into the spawned future.
cloned!(ctx, derivation_ctx);
async move {
let csid = item.cs_id;
let bonsai = bonsais
.with(|bonsais| bonsais.remove(&csid))
.ok_or_else(|| anyhow!("changeset {} should be in bonsai batch", csid))?;

View File

@ -12,7 +12,7 @@ use blobrepo::BlobRepo;
use borrowed::borrowed;
use cloned::cloned;
use context::CoreContext;
use derived_data::batch::{split_batch_in_linear_stacks, FileChangeAggregation, FileConflicts};
use derived_data::batch::{split_batch_in_linear_stacks, FileConflicts};
use derived_data::{derive_impl, BonsaiDerivedMappingContainer};
use futures::stream::{FuturesOrdered, TryStreamExt};
use mononoke_types::{ChangesetId, FsnodeId};
@ -62,7 +62,6 @@ pub async fn derive_fsnode_in_batch(
manager.repo_blobstore(),
batch,
FileConflicts::ChangeDelete,
FileChangeAggregation::Aggregate,
)
.await?;
let mut res = HashMap::new();
@ -101,14 +100,20 @@ pub async fn derive_fsnode_in_batch(
let new_fsnodes = to_derive
.into_iter()
.map(|(cs_id, fc)| {
.map(|item| {
// Clone the values that we need owned copies of to move
// into the future we are going to spawn, which means it
// must have static lifetime.
cloned!(ctx, manager, parent_fsnodes);
async move {
let cs_id = item.cs_id;
let derivation_fut = async move {
derive_fsnode(&ctx, &manager, parent_fsnodes, fc.into_iter().collect())
derive_fsnode(
&ctx,
&manager,
parent_fsnodes,
item.combined_file_changes.into_iter().collect(),
)
.await
};
let derivation_handle = tokio::spawn(derivation_fut);

View File

@ -12,7 +12,7 @@ use blobrepo::BlobRepo;
use borrowed::borrowed;
use cloned::cloned;
use context::CoreContext;
use derived_data::batch::{split_batch_in_linear_stacks, FileChangeAggregation, FileConflicts};
use derived_data::batch::{split_batch_in_linear_stacks, FileConflicts};
use derived_data::{derive_impl, BonsaiDerivedMappingContainer};
use futures::stream::{FuturesOrdered, TryStreamExt};
use mononoke_types::{ChangesetId, SkeletonManifestId};
@ -40,7 +40,6 @@ pub async fn derive_skeleton_manifests_in_batch(
manager.repo_blobstore(),
batch,
FileConflicts::ChangeDelete,
FileChangeAggregation::Aggregate,
)
.await?;
let mut res = HashMap::new();
@ -80,18 +79,19 @@ pub async fn derive_skeleton_manifests_in_batch(
let new_skeleton_manifests = to_derive
.into_iter()
.map(|(cs_id, fc)| {
.map(|item| {
// Clone the values that we need owned copies of to move
// into the future we are going to spawn, which means it
// must have static lifetime.
cloned!(ctx, manager, parent_skeleton_manifests);
async move {
let cs_id = item.cs_id;
let derivation_fut = async move {
derive_skeleton_manifest(
&ctx,
&manager,
parent_skeleton_manifests,
fc.into_iter().collect(),
item.combined_file_changes.into_iter().collect(),
)
.await
};

View File

@ -28,23 +28,11 @@ pub enum FileConflicts {
AnyChange,
}
#[derive(Copy, Clone, Debug)]
pub enum FileChangeAggregation {
/// In a stack each commit will have the file changes from itself
/// and all its ancestors that are in the stack. E.g. first commit
/// has just its own file changes, second commit has its own changes and
/// file changes from first commit etc.
Aggregate,
/// Each commit in the stack has only its own file changes
DontAggregate,
}
pub async fn split_batch_in_linear_stacks(
ctx: &CoreContext,
blobstore: &RepoBlobstore,
batch: Vec<ChangesetId>,
file_conflicts: FileConflicts,
file_change_aggregation: FileChangeAggregation,
) -> Result<Vec<LinearStack>, Error> {
let bonsais = stream::iter(
batch
@ -54,7 +42,7 @@ pub async fn split_batch_in_linear_stacks(
.buffered(100)
.try_collect::<Vec<_>>()
.await?;
split_bonsais_in_linear_stacks(&bonsais, file_conflicts, file_change_aggregation).await
split_bonsais_in_linear_stacks(&bonsais, file_conflicts).await
}
/// We follow a few rules when splitting a batch in the stacks:
@ -66,7 +54,6 @@ pub async fn split_batch_in_linear_stacks(
pub async fn split_bonsais_in_linear_stacks(
bonsais: &[BonsaiChangeset],
file_conflicts: FileConflicts,
file_change_aggregation: FileChangeAggregation,
) -> Result<Vec<LinearStack>, Error> {
let start_bcs = match bonsais.first() {
Some(val) => val,
@ -77,7 +64,7 @@ pub async fn split_bonsais_in_linear_stacks(
let mut linear_stacks = Vec::new();
let mut cur_linear_stack = LinearStack::new(start_bcs.parents().collect::<Vec<_>>());
cur_linear_stack.push(start_bcs, file_change_aggregation);
cur_linear_stack.push(start_bcs);
for (prev_bcs, bcs) in bonsais.iter().tuple_windows() {
if !cur_linear_stack.can_be_in_same_linear_stack(prev_bcs, bcs, file_conflicts) {
@ -85,7 +72,7 @@ pub async fn split_bonsais_in_linear_stacks(
cur_linear_stack = LinearStack::new(bcs.parents().collect::<Vec<_>>());
}
cur_linear_stack.push(bcs, file_change_aggregation);
cur_linear_stack.push(bcs);
}
linear_stacks.push(cur_linear_stack);
@ -96,9 +83,17 @@ pub async fn split_bonsais_in_linear_stacks(
// Stores a linear stack.
// `file_changes` contains the list of file changes that need to be applied to `parents`
// to generate derived data for a particular commit.
#[derive(Clone, Debug)]
pub struct LinearStack {
pub parents: Vec<ChangesetId>,
pub file_changes: Vec<(ChangesetId, FileToContent)>,
pub file_changes: Vec<StackItem>,
}
#[derive(Clone, Debug)]
pub struct StackItem {
pub cs_id: ChangesetId,
pub combined_file_changes: FileToContent,
pub per_commit_file_changes: FileToContent,
}
impl LinearStack {
@ -109,26 +104,26 @@ impl LinearStack {
}
}
fn push(&mut self, bcs: &BonsaiChangeset, file_change_aggregation: FileChangeAggregation) {
fn push(&mut self, bcs: &BonsaiChangeset) {
let cs_id = bcs.get_changeset_id();
let file_changes = bcs.file_changes().map(|(path, fc)| {
let file_changes = bcs
.file_changes()
.map(|(path, fc)| {
(
path.clone(),
fc.simplify().map(|bc| (bc.content_id(), bc.file_type())),
)
});
})
.collect::<BTreeMap<_, _>>();
use FileChangeAggregation::*;
match file_change_aggregation {
Aggregate => {
let mut fc = self.get_last_file_changes().cloned().unwrap_or_default();
fc.extend(file_changes);
self.file_changes.push((cs_id, fc));
}
DontAggregate => {
self.file_changes.push((cs_id, file_changes.collect()));
}
}
let mut combined_file_changes = self.get_last_file_changes().cloned().unwrap_or_default();
combined_file_changes.extend(file_changes.clone());
self.file_changes.push(StackItem {
cs_id,
combined_file_changes,
per_commit_file_changes: file_changes,
});
}
fn can_be_in_same_linear_stack(
@ -160,7 +155,9 @@ impl LinearStack {
}
fn get_last_file_changes(&self) -> Option<&FileToContent> {
self.file_changes.last().map(|(_, fc)| fc)
self.file_changes
.last()
.map(|item| &item.combined_file_changes)
}
}
@ -245,7 +242,6 @@ mod test {
repo.blobstore(),
vec![root],
FileConflicts::ChangeDelete,
FileChangeAggregation::Aggregate,
)
.await?;
assert_linear_stacks(
@ -264,7 +260,6 @@ mod test {
repo.blobstore(),
vec![second],
FileConflicts::ChangeDelete,
FileChangeAggregation::Aggregate,
)
.await?;
assert_linear_stacks(
@ -283,13 +278,12 @@ mod test {
repo.blobstore(),
vec![root, second],
FileConflicts::ChangeDelete,
FileChangeAggregation::Aggregate,
)
.await?;
assert_linear_stacks(
&ctx,
&repo,
linear_stacks,
linear_stacks.clone(),
vec![(
vec![],
vec![
@ -303,30 +297,22 @@ mod test {
)
.await?;
// Now without aggregation
let linear_stacks = split_batch_in_linear_stacks(
&ctx,
repo.blobstore(),
vec![root, second],
FileConflicts::ChangeDelete,
FileChangeAggregation::DontAggregate,
)
.await?;
assert_linear_stacks(
&ctx,
&repo,
linear_stacks,
vec![(
vec![],
vec![
btreemap! {file1 => Some(("content1".to_string(), FileType::Regular))},
btreemap! {
file2 => Some(("content2".to_string(), FileType::Regular)),
},
],
)],
)
.await?;
// Check that per_commit_file_changes are correct
assert_eq!(
linear_stacks[0].file_changes[0]
.per_commit_file_changes
.keys()
.collect::<Vec<_>>(),
vec![&MPath::new(file1)?]
);
assert_eq!(
linear_stacks[0].file_changes[1]
.per_commit_file_changes
.keys()
.collect::<Vec<_>>(),
vec![&MPath::new(file2)?]
);
Ok(())
}
@ -357,7 +343,6 @@ mod test {
repo.blobstore(),
vec![p1, merge],
FileConflicts::ChangeDelete,
FileChangeAggregation::Aggregate,
)
.await?;
assert_linear_stacks(
@ -382,7 +367,6 @@ mod test {
repo.blobstore(),
vec![p1, p2],
FileConflicts::ChangeDelete,
FileChangeAggregation::Aggregate,
)
.await?;
assert_linear_stacks(
@ -428,7 +412,6 @@ mod test {
repo.blobstore(),
vec![root, child],
FileConflicts::ChangeDelete,
FileChangeAggregation::Aggregate,
)
.await?;
assert_linear_stacks(
@ -471,7 +454,6 @@ mod test {
repo.blobstore(),
vec![root, child],
FileConflicts::ChangeDelete,
FileChangeAggregation::Aggregate,
)
.await?;
assert_linear_stacks(
@ -514,7 +496,6 @@ mod test {
repo.blobstore(),
vec![root, child],
FileConflicts::ChangeDelete,
FileChangeAggregation::Aggregate,
)
.await?;
assert_linear_stacks(
@ -537,7 +518,6 @@ mod test {
repo.blobstore(),
vec![root, child],
FileConflicts::AnyChange,
FileChangeAggregation::Aggregate,
)
.await?;
assert_linear_stacks(
@ -576,7 +556,8 @@ mod test {
} = linear_stack;
let mut paths_for_the_whole_stack = vec![];
for (_, file_to_content) in file_changes {
for item in file_changes {
let file_to_content = item.combined_file_changes;
let mut paths = btreemap![];
for (path, maybe_content) in file_to_content {
let maybe_content = match maybe_content {