From 0b083a74b1ed2c6ae70151e526dc0087da15ab0a Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Thu, 15 Oct 2020 09:47:23 -0700 Subject: [PATCH] mononoke/blobrepo_hg: optimize case conflict check performance Summary: Our case conflict checking is very inefficient on large changesets. The root cause is that we traverse the parent manifest for every single file we are modifying in the new changeset. This results in very poor performance on large changes since we end up reparsing manifests and doing case comparisons a lot more than we should. In some pathological cases, it results in us taking several *minutes* to do a case conflict check, with all of that time being spent on CPU lower-casing strings and deserializing manifests. This is actually a step we do after having uploaded all the data for a commit, so this is pure overhead that is being added to the push process (but note it's not part of the pushrebase critical section). I ended up looking at this issue because it is contributing to the high latencies we are seeing in commit cloud right now. Some of the bundles I checked had 300+ seconds of on-CPU time being spent to check for case conflicts. The hope is that with this change, we'll get fewer pathological cases, and might be able to root cause remaining instances of latency (or have that finally fixed). This is pretty easy to repro. I added a binary that runs case conflict checks on an arbitrary commit, and tested it on `38c845c90d59ba65e7954be001c1eda1eb76a87d` (a commit that I noted was slow to ingest in commit cloud, despite all its data being present already, meaning it was basically a no-op). The old code takes ~3 minutes. The new one takes a second. I also backtested this by rigging up the hook tailer to do case conflict checks instead (P145550763). It is about the same speed for most commits (perhaps marginally slower on some, but we're talking microseconds here), but for some pathological commits, it is indeed much faster. This notably revealed one interesting case: 473b6e21e910fcdf7338df66ee0cbeb4b8d311989385745151fa7ac38d1b46ef (~8K files) took 118329us in the new code (~0.1s), and 86676677us in the old (~87 seconds). There are also commits with more files in recent history, but they're deletions, so they are just as fast in both (< 0.1 s). Reviewed By: StanislavGlebik Differential Revision: D24305563 fbshipit-source-id: eb548b54be14a846554fdf4c3194da8b8a466afe --- eden/mononoke/blobrepo/blobrepo_hg/Cargo.toml | 1 + eden/mononoke/blobrepo/blobrepo_hg/src/lib.rs | 4 +- .../blobrepo/blobrepo_hg/src/repo_commit.rs | 325 +++++++++--------- eden/mononoke/blobrepo/test/main.rs | 34 +- eden/mononoke/mononoke_types/src/path.rs | 10 +- 5 files changed, 180 insertions(+), 194 deletions(-) diff --git a/eden/mononoke/blobrepo/blobrepo_hg/Cargo.toml b/eden/mononoke/blobrepo/blobrepo_hg/Cargo.toml index a6e8ba4aec..8337413a85 100644 --- a/eden/mononoke/blobrepo/blobrepo_hg/Cargo.toml +++ b/eden/mononoke/blobrepo/blobrepo_hg/Cargo.toml @@ -13,6 +13,7 @@ blobrepo_errors = { path = "../errors" } blobstore = { path = "../../blobstore" } bonsai_hg_mapping = { path = "../../bonsai_hg_mapping" } bookmarks = { path = "../../bookmarks" } +bounded_traversal = { path = "../../common/bounded_traversal" } changeset_fetcher = { path = "../changeset_fetcher" } changesets = { path = "../../changesets" } context = { path = "../../server/context" } diff --git a/eden/mononoke/blobrepo/blobrepo_hg/src/lib.rs b/eden/mononoke/blobrepo/blobrepo_hg/src/lib.rs index c92dbc2bc9..fce90d8bc9 100644 --- a/eden/mononoke/blobrepo/blobrepo_hg/src/lib.rs +++ b/eden/mononoke/blobrepo/blobrepo_hg/src/lib.rs @@ -13,9 +13,7 @@ pub mod repo_commit; pub use crate::repo_commit::ChangesetHandle; pub use changeset_fetcher::ChangesetFetcher; // TODO: This is exported for testing - is this the right place for it? -pub use crate::repo_commit::{ - check_case_conflict_in_manifest, check_case_conflicts, compute_changed_files, UploadEntries, -}; +pub use crate::repo_commit::{check_case_conflicts, compute_changed_files, UploadEntries}; pub mod errors { pub use blobrepo_errors::*; } diff --git a/eden/mononoke/blobrepo/blobrepo_hg/src/repo_commit.rs b/eden/mononoke/blobrepo/blobrepo_hg/src/repo_commit.rs index bf49fc90b9..d61f8087d7 100644 --- a/eden/mononoke/blobrepo/blobrepo_hg/src/repo_commit.rs +++ b/eden/mononoke/blobrepo/blobrepo_hg/src/repo_commit.rs @@ -6,19 +6,19 @@ */ use crate::BlobRepoHg; -use anyhow::{format_err, Error, Result}; +use anyhow::{format_err, Context, Error, Result}; use cloned::cloned; use failure_ext::{Compat, FutureFailureErrorExt, StreamFailureErrorExt}; use futures::{ - compat::{Future01CompatExt, Stream01CompatExt}, + compat::Stream01CompatExt, future::{FutureExt, TryFutureExt}, - stream::{FuturesUnordered, StreamExt, TryStreamExt}, + stream::{self, TryStreamExt}, }; use futures_ext::{ BoxFuture as OldBoxFuture, BoxStream as OldBoxStream, FutureExt as OldFutureExt, }; use futures_old::future::{ - self as old_future, loop_fn, result, Future as OldFuture, Loop, Shared, SharedError, SharedItem, + self as old_future, result, Future as OldFuture, Shared, SharedError, SharedItem, }; use futures_old::stream::Stream as OldStream; use futures_old::sync::oneshot; @@ -40,7 +40,7 @@ use mercurial_types::{ nodehash::{HgFileNodeId, HgManifestId}, HgChangesetId, HgNodeHash, HgNodeKey, HgParents, MPath, RepoPath, NULL_HASH, }; -use mononoke_types::{self, BonsaiChangeset, ChangesetId, FileType}; +use mononoke_types::{self, BonsaiChangeset, ChangesetId}; use crate::errors::*; use crate::BlobRepo; @@ -402,91 +402,6 @@ impl UploadEntries { } } -async fn compute_files_with_status( - ctx: &CoreContext, - repo: &BlobRepo, - child: HgManifestId, - parent: Option, - filter_map: impl Fn(Diff>) -> Option, -) -> Result, Error> { - let s = match parent { - Some(parent) => parent - .diff(ctx.clone(), repo.get_blobstore(), child) - .compat() - .left_stream(), - None => child - .list_all_entries(ctx.clone(), repo.get_blobstore()) - .map(|(path, entry)| Diff::Added(path, entry)) - .compat() - .right_stream(), - }; - - s.try_filter_map(|e| async { Ok(filter_map(e)) }) - .try_collect() - .await -} - -/// Checks if new commit (or to be precise, it's manifest) introduces any new case conflicts -/// It does it in three stages: -/// 1) Checks that there are no case conflicts between added files -/// 2) Checks that added files do not create new case conflicts with already existing files -pub async fn check_case_conflicts( - ctx: &CoreContext, - repo: &BlobRepo, - child_root_mf: HgManifestId, - parent_root_mf: Option, -) -> Result<(), Error> { - let added_files = - compute_files_with_status( - ctx, - repo, - child_root_mf, - parent_root_mf, - |diff| match diff { - Diff::Added(path, _entry) => path, - _ => None, - }, - ) - .await?; - - if let Some(conflict) = mononoke_types::check_case_conflicts(added_files.iter()) { - return Err(ErrorKind::InternalCaseConflict(conflict.0, conflict.1).into()); - } - - let parent_root_mf = match parent_root_mf { - Some(parent_root_mf) => parent_root_mf, - None => { - return Ok(()); - } - }; - - let mut case_conflict_checks = added_files - .into_iter() - .map(|path| async move { - let conflicting = check_case_conflict_in_manifest( - repo.clone(), - ctx.clone(), - parent_root_mf, - child_root_mf, - path.clone(), - ) - .compat() - .await?; - - Result::<_, Error>::Ok((path, conflicting)) - }) - .collect::>(); - - while let Some(element) = case_conflict_checks.next().await { - let (path, conflicting) = element?; - if let Some(conflicting) = conflicting { - return Err(ErrorKind::ExternalCaseConflict(path, conflicting).into()); - } - } - - Ok(()) -} - pub fn process_entries( ctx: CoreContext, entry_processor: &UploadEntries, @@ -679,85 +594,163 @@ pub fn make_new_changeset( HgBlobChangeset::new(changeset) } -/// Check if adding a single path to manifest would cause case-conflict +/// Checks if new commit (or to be precise, its manifest) introduces any new case conflicts. It +/// does so in three stages: /// -/// Implementation traverses manifest and checks if correspoinding path element is present, -/// if path element is not present, it lowercases current path element and checks if it -/// collides with any existing elements inside manifest. if so it also needs to check that -/// child manifest contains this entry, because it might have been removed. -pub fn check_case_conflict_in_manifest( - repo: BlobRepo, - ctx: CoreContext, - parent_mf_id: HgManifestId, - child_mf_id: HgManifestId, - path: MPath, -) -> impl OldFuture, Error = Error> { - let child_mf_id = child_mf_id.clone(); - parent_mf_id - .load(ctx.clone(), &repo.get_blobstore()) - .compat() - .from_err() - .and_then(move |mf| { - loop_fn( - (None, mf, path.into_iter()), - move |(cur_path, mf, mut elements): (Option, _, _)| { - let element = match elements.next() { - None => return old_future::ok(Loop::Break(None)).boxify(), - Some(element) => element, - }; +/// - First, if there is no parent, we only check the manifest being added for conflicts. +/// - Second, we build a tree of lower cased paths, then visit the parent's manifest for branches +/// that overlap with this tree, and collect all the paths that do. +/// - Third, we check if there are any case conflicts in the union of the files added by this +/// change and all those paths we found in step 2 (minus those paths that were removed). +/// +/// Note that this assumes there are no path conflicts in the parent_root_mf, if any is provided. +/// If there are path conflicts there, this function may report those path conflicts if any file +/// that is touched in one of the directories (case insensitively) with conflicts. +pub async fn check_case_conflicts( + ctx: &CoreContext, + repo: &BlobRepo, + child_root_mf: HgManifestId, + parent_root_mf: Option, +) -> Result<(), Error> { + let parent_root_mf = match parent_root_mf { + Some(parent_root_mf) => parent_root_mf, + None => { + // We don't have a parent, just check for internal case conflicts here. + let paths = child_root_mf + .list_leaf_entries(ctx.clone(), repo.get_blobstore()) + .compat() + .map_ok(|(path, _)| path) + .try_collect::>() + .await + .with_context(|| "Error loading manifest")?; - match mf.lookup(&element) { - Some(entry) => { - let cur_path = MPath::join_opt_element(cur_path.as_ref(), &element); - match entry { - Entry::Leaf(..) => old_future::ok(Loop::Break(None)).boxify(), - Entry::Tree(manifest_id) => manifest_id - .load(ctx.clone(), repo.blobstore()) - .compat() - .from_err() - .map(move |mf| Loop::Continue((Some(cur_path), mf, elements))) - .boxify(), - } - } - None => { - let element_utf8 = String::from_utf8(Vec::from(element.as_ref())); - let mut potential_conflicts = vec![]; - // Find all entries in the manifests that can potentially be a conflict. - // Entry can potentially be a conflict if its lowercased version - // is the same as lowercased version of the current element + if let Some(conflict) = mononoke_types::check_case_conflicts(&paths) { + return Err(ErrorKind::InternalCaseConflict(conflict.0, conflict.1).into()); + } - for (basename, _) in mf.list() { - let path = - MPath::join_element_opt(cur_path.as_ref(), Some(&basename)); - match (&element_utf8, std::str::from_utf8(basename.as_ref())) { - (Ok(ref element), Ok(ref basename)) => { - if basename.to_lowercase() == element.to_lowercase() { - potential_conflicts.extend(path); - } - } - _ => {} - } - } + return Ok(()); + } + }; - // For each potential conflict we need to check if it's present in - // child manifest. If it is, then we've got a conflict, otherwise - // this has been deleted and it's no longer a conflict. - child_mf_id - .find_entries( - ctx.clone(), - repo.get_blobstore(), - potential_conflicts, - ) - .collect() - .map(|entries| { - // NOTE: We flatten here because we cannot have a conflict - // at the root. - Loop::Break(entries.into_iter().next().and_then(|x| x.0)) - }) - .boxify() - } + let mut added = Vec::new(); + let mut deleted = HashSet::new(); + + let mut diff = parent_root_mf + .diff(ctx.clone(), repo.get_blobstore(), child_root_mf) + .compat(); + + while let Some(diff) = diff + .try_next() + .await + .with_context(|| "Error loading diff")? + { + match diff { + Diff::Added(Some(path), _) => { + added.push(path); + } + Diff::Removed(Some(path), _) => { + deleted.insert(path); + } + _ => {} + }; + } + + + // Check if there any conflicts internal to the change being landed. Past this point, the + // conflicts we'll report are external (i.e. they are dependent on the parent commit). + if let Some(conflict) = mononoke_types::check_case_conflicts(added.iter()) { + return Err(ErrorKind::InternalCaseConflict(conflict.0, conflict.1).into()); + } + + fn lowercase_mpath(e: &MPath) -> Option> { + e.into_iter() + .map(|e| mononoke_types::path::lowercase_mpath_element(e)) + .collect() + } + + let mut path_tree_builder = PathTreeBuilder::default(); + + for path in added.iter() { + let path = match lowercase_mpath(&path) { + Some(path) => path, + None => continue, // We ignore non-UTF8 paths + }; + path_tree_builder.insert(path); + } + + let path_tree = Arc::new(path_tree_builder.freeze()); + + let candidates = bounded_traversal::bounded_traversal_stream( + 256, + Some((parent_root_mf, path_tree, None)), + |(mf_id, path_tree, path)| async move { + let mf = mf_id.load(ctx.clone(), repo.blobstore()).await?; + let mut output = vec![]; + let mut recurse = vec![]; + + for (name, entry) in mf.list() { + let lowered_el = match mononoke_types::path::lowercase_mpath_element(&name) { + Some(lowered_el) => lowered_el, + None => continue, + }; + + if let Some(subtree) = path_tree.as_ref().subentries.get(&lowered_el) { + let path = MPath::join_opt_element(path.as_ref(), &name); + + if let Entry::Tree(sub_mf_id) = entry { + recurse.push((sub_mf_id, subtree.clone(), Some(path.clone()))); } - }, - ) - }) + + output.push(path); + }; + } + + Result::<_, Error>::Ok((output, recurse)) + }, + ) + .map_ok(|entries| stream::iter(entries.into_iter().map(Result::<_, Error>::Ok))) + .try_flatten() + .try_collect::>() + .await + .with_context(|| "Error scanning for conflicting paths")?; + + let files = added + .iter() + .chain(candidates.iter().filter(|c| !deleted.contains(c))); + + if let Some((child, parent)) = mononoke_types::check_case_conflicts(files) { + return Err(ErrorKind::ExternalCaseConflict(child, parent).into()); + } + + + Ok(()) +} + +#[derive(Default)] +struct PathTreeBuilder { + pub subentries: HashMap, +} + +impl PathTreeBuilder { + pub fn insert(&mut self, path: Vec) { + path.into_iter().fold(self, |node, element| { + node.subentries + .entry(element) + .or_insert_with(Default::default) + }); + } + + pub fn freeze(self) -> PathTree { + let subentries = self + .subentries + .into_iter() + .map(|(el, t)| (el, Arc::new(t.freeze()))) + .collect(); + + PathTree { subentries } + } +} + +struct PathTree { + pub subentries: HashMap>, } diff --git a/eden/mononoke/blobrepo/test/main.rs b/eden/mononoke/blobrepo/test/main.rs index 35b9f9bcb6..a7a0bb721d 100644 --- a/eden/mononoke/blobrepo/test/main.rs +++ b/eden/mononoke/blobrepo/test/main.rs @@ -17,8 +17,7 @@ use assert_matches::assert_matches; use blobrepo::BlobRepo; use blobrepo_errors::ErrorKind; use blobrepo_hg::{ - check_case_conflict_in_manifest, - repo_commit::{compute_changed_files, UploadEntries}, + repo_commit::{check_case_conflicts, compute_changed_files, UploadEntries}, BlobRepoHg, }; use blobstore::{Loadable, Storable}; @@ -850,7 +849,7 @@ fn test_case_conflict_in_manifest(fb: FacebookInit) { .unwrap() .unwrap(); - for (path, result) in &[ + for (path, expect_conflict) in &[ ("dir1/file_1_in_dir1", false), ("dir1/file_1_IN_dir1", true), ("DiR1/file_1_in_dir1", true), @@ -878,23 +877,18 @@ fn test_case_conflict_in_manifest(fb: FacebookInit) { .await .unwrap() .manifestid(); - assert_eq!( - (check_case_conflict_in_manifest( - repo.clone(), - ctx.clone(), - mf, - child_mf, - MPath::new(path).unwrap() - )) - .compat() - .await - .unwrap() - .is_some(), - *result, - "{} expected to {} cause conflict", - path, - if *result { "" } else { "not" }, - ); + + let conflicts = check_case_conflicts(&ctx, &repo, child_mf, Some(mf)).await; + if *expect_conflict { + assert_matches!( + conflicts + .as_ref() + .map_err(|e| e.downcast_ref::()), + Err(Some(ErrorKind::ExternalCaseConflict(..))) + ); + } else { + assert_matches!(conflicts, Ok(())); + } } }); } diff --git a/eden/mononoke/mononoke_types/src/path.rs b/eden/mononoke/mononoke_types/src/path.rs index 8618359267..1d3f658a1c 100644 --- a/eden/mononoke/mononoke_types/src/path.rs +++ b/eden/mononoke/mononoke_types/src/path.rs @@ -1026,7 +1026,7 @@ impl CaseConflictTrie { }); } - if let Ok(lower) = lowercase_mpath_element(&element) { + if let Some(lower) = lowercase_mpath_element(&element) { if let Some(conflict) = self.lowercase_to_original.get(&lower) { return Err(ReverseMPath { elements: vec![conflict.clone()], @@ -1059,7 +1059,7 @@ impl CaseConflictTrie { if remove { self.children.remove(&element); - if let Ok(lower) = lowercase_mpath_element(&element) { + if let Some(lower) = lowercase_mpath_element(&element) { self.lowercase_to_original.remove(&lower); } } @@ -1156,10 +1156,10 @@ where } } -pub fn lowercase_mpath_element(e: &MPathElement) -> Result { - let s = std::str::from_utf8(e.as_ref())?; +pub fn lowercase_mpath_element(e: &MPathElement) -> Option { + let s = std::str::from_utf8(e.as_ref()).ok()?; let s = s.to_lowercase(); - Ok(s) + Some(s) } #[cfg(test)]