mononoke/blobrepo_hg: optimize case conflict check performance

Summary:
Our case conflict checking is very inefficient on large changesets. The root
cause is that we traverse the parent manifest for every single file we are
modifying in the new changeset.

This results in very poor performance on large changes since we end up
reparsing manifests and doing case comparisons a lot more than we should. In
some pathological cases, it results in us taking several *minutes* to do a case
conflict check, with all of that time being spent on CPU lower-casing strings
and deserializing manifests.

This is actually a step we do after having uploaded all the data for a commit,
so this is pure overhead that is being added to the push process (but note it's
not part of the pushrebase critical section).

I ended up looking at this issue because it is contributing to the high
latencies we are seeing in commit cloud right now. Some of the bundles I
checked had 300+ seconds of on-CPU time being spent to check for case
conflicts. The hope is that with this change, we'll get fewer pathological
cases, and might be able to root cause remaining instances of latency (or have
that finally fixed).

This is pretty easy to repro.

I added a binary that runs case conflict checks on an arbitrary commit, and
tested it on `38c845c90d59ba65e7954be001c1eda1eb76a87d` (a commit that I noted
was slow to ingest in commit cloud, despite all its data being present already,
meaning it was basically a no-op). The old code takes ~3 minutes. The new one
takes a second.

I also backtested this by rigging up the hook tailer to do case conflict checks
instead (P145550763). It is about the same speed for most commits (perhaps
marginally slower on some, but we're talking microseconds here), but for some
pathological commits, it is indeed much faster.

This notably revealed one interesting case:

473b6e21e910fcdf7338df66ee0cbeb4b8d311989385745151fa7ac38d1b46ef (~8K files)
took 118329us in the new code (~0.1s), and 86676677us in the old (~87 seconds).

There are also commits with more files in recent history, but they're
deletions, so they are just as fast in both (< 0.1 s).

Reviewed By: StanislavGlebik

Differential Revision: D24305563

fbshipit-source-id: eb548b54be14a846554fdf4c3194da8b8a466afe
This commit is contained in:
Thomas Orozco 2020-10-15 09:47:23 -07:00 committed by Facebook GitHub Bot
parent 1dc25648bf
commit 0b083a74b1
5 changed files with 180 additions and 194 deletions

View File

@ -13,6 +13,7 @@ blobrepo_errors = { path = "../errors" }
blobstore = { path = "../../blobstore" }
bonsai_hg_mapping = { path = "../../bonsai_hg_mapping" }
bookmarks = { path = "../../bookmarks" }
bounded_traversal = { path = "../../common/bounded_traversal" }
changeset_fetcher = { path = "../changeset_fetcher" }
changesets = { path = "../../changesets" }
context = { path = "../../server/context" }

View File

@ -13,9 +13,7 @@ pub mod repo_commit;
pub use crate::repo_commit::ChangesetHandle;
pub use changeset_fetcher::ChangesetFetcher;
// TODO: This is exported for testing - is this the right place for it?
pub use crate::repo_commit::{
check_case_conflict_in_manifest, check_case_conflicts, compute_changed_files, UploadEntries,
};
pub use crate::repo_commit::{check_case_conflicts, compute_changed_files, UploadEntries};
pub mod errors {
pub use blobrepo_errors::*;
}

View File

@ -6,19 +6,19 @@
*/
use crate::BlobRepoHg;
use anyhow::{format_err, Error, Result};
use anyhow::{format_err, Context, Error, Result};
use cloned::cloned;
use failure_ext::{Compat, FutureFailureErrorExt, StreamFailureErrorExt};
use futures::{
compat::{Future01CompatExt, Stream01CompatExt},
compat::Stream01CompatExt,
future::{FutureExt, TryFutureExt},
stream::{FuturesUnordered, StreamExt, TryStreamExt},
stream::{self, TryStreamExt},
};
use futures_ext::{
BoxFuture as OldBoxFuture, BoxStream as OldBoxStream, FutureExt as OldFutureExt,
};
use futures_old::future::{
self as old_future, loop_fn, result, Future as OldFuture, Loop, Shared, SharedError, SharedItem,
self as old_future, result, Future as OldFuture, Shared, SharedError, SharedItem,
};
use futures_old::stream::Stream as OldStream;
use futures_old::sync::oneshot;
@ -40,7 +40,7 @@ use mercurial_types::{
nodehash::{HgFileNodeId, HgManifestId},
HgChangesetId, HgNodeHash, HgNodeKey, HgParents, MPath, RepoPath, NULL_HASH,
};
use mononoke_types::{self, BonsaiChangeset, ChangesetId, FileType};
use mononoke_types::{self, BonsaiChangeset, ChangesetId};
use crate::errors::*;
use crate::BlobRepo;
@ -402,91 +402,6 @@ impl UploadEntries {
}
}
async fn compute_files_with_status(
ctx: &CoreContext,
repo: &BlobRepo,
child: HgManifestId,
parent: Option<HgManifestId>,
filter_map: impl Fn(Diff<Entry<HgManifestId, (FileType, HgFileNodeId)>>) -> Option<MPath>,
) -> Result<Vec<MPath>, Error> {
let s = match parent {
Some(parent) => parent
.diff(ctx.clone(), repo.get_blobstore(), child)
.compat()
.left_stream(),
None => child
.list_all_entries(ctx.clone(), repo.get_blobstore())
.map(|(path, entry)| Diff::Added(path, entry))
.compat()
.right_stream(),
};
s.try_filter_map(|e| async { Ok(filter_map(e)) })
.try_collect()
.await
}
/// Checks if new commit (or to be precise, it's manifest) introduces any new case conflicts
/// It does it in three stages:
/// 1) Checks that there are no case conflicts between added files
/// 2) Checks that added files do not create new case conflicts with already existing files
pub async fn check_case_conflicts(
ctx: &CoreContext,
repo: &BlobRepo,
child_root_mf: HgManifestId,
parent_root_mf: Option<HgManifestId>,
) -> Result<(), Error> {
let added_files =
compute_files_with_status(
ctx,
repo,
child_root_mf,
parent_root_mf,
|diff| match diff {
Diff::Added(path, _entry) => path,
_ => None,
},
)
.await?;
if let Some(conflict) = mononoke_types::check_case_conflicts(added_files.iter()) {
return Err(ErrorKind::InternalCaseConflict(conflict.0, conflict.1).into());
}
let parent_root_mf = match parent_root_mf {
Some(parent_root_mf) => parent_root_mf,
None => {
return Ok(());
}
};
let mut case_conflict_checks = added_files
.into_iter()
.map(|path| async move {
let conflicting = check_case_conflict_in_manifest(
repo.clone(),
ctx.clone(),
parent_root_mf,
child_root_mf,
path.clone(),
)
.compat()
.await?;
Result::<_, Error>::Ok((path, conflicting))
})
.collect::<FuturesUnordered<_>>();
while let Some(element) = case_conflict_checks.next().await {
let (path, conflicting) = element?;
if let Some(conflicting) = conflicting {
return Err(ErrorKind::ExternalCaseConflict(path, conflicting).into());
}
}
Ok(())
}
pub fn process_entries(
ctx: CoreContext,
entry_processor: &UploadEntries,
@ -679,85 +594,163 @@ pub fn make_new_changeset(
HgBlobChangeset::new(changeset)
}
/// Check if adding a single path to manifest would cause case-conflict
/// Checks if new commit (or to be precise, its manifest) introduces any new case conflicts. It
/// does so in three stages:
///
/// Implementation traverses manifest and checks if correspoinding path element is present,
/// if path element is not present, it lowercases current path element and checks if it
/// collides with any existing elements inside manifest. if so it also needs to check that
/// child manifest contains this entry, because it might have been removed.
pub fn check_case_conflict_in_manifest(
repo: BlobRepo,
ctx: CoreContext,
parent_mf_id: HgManifestId,
child_mf_id: HgManifestId,
path: MPath,
) -> impl OldFuture<Item = Option<MPath>, Error = Error> {
let child_mf_id = child_mf_id.clone();
parent_mf_id
.load(ctx.clone(), &repo.get_blobstore())
.compat()
.from_err()
.and_then(move |mf| {
loop_fn(
(None, mf, path.into_iter()),
move |(cur_path, mf, mut elements): (Option<MPath>, _, _)| {
let element = match elements.next() {
None => return old_future::ok(Loop::Break(None)).boxify(),
Some(element) => element,
};
/// - First, if there is no parent, we only check the manifest being added for conflicts.
/// - Second, we build a tree of lower cased paths, then visit the parent's manifest for branches
/// that overlap with this tree, and collect all the paths that do.
/// - Third, we check if there are any case conflicts in the union of the files added by this
/// change and all those paths we found in step 2 (minus those paths that were removed).
///
/// Note that this assumes there are no path conflicts in the parent_root_mf, if any is provided.
/// If there are path conflicts there, this function may report those path conflicts if any file
/// that is touched in one of the directories (case insensitively) with conflicts.
pub async fn check_case_conflicts(
ctx: &CoreContext,
repo: &BlobRepo,
child_root_mf: HgManifestId,
parent_root_mf: Option<HgManifestId>,
) -> Result<(), Error> {
let parent_root_mf = match parent_root_mf {
Some(parent_root_mf) => parent_root_mf,
None => {
// We don't have a parent, just check for internal case conflicts here.
let paths = child_root_mf
.list_leaf_entries(ctx.clone(), repo.get_blobstore())
.compat()
.map_ok(|(path, _)| path)
.try_collect::<Vec<_>>()
.await
.with_context(|| "Error loading manifest")?;
match mf.lookup(&element) {
Some(entry) => {
let cur_path = MPath::join_opt_element(cur_path.as_ref(), &element);
match entry {
Entry::Leaf(..) => old_future::ok(Loop::Break(None)).boxify(),
Entry::Tree(manifest_id) => manifest_id
.load(ctx.clone(), repo.blobstore())
.compat()
.from_err()
.map(move |mf| Loop::Continue((Some(cur_path), mf, elements)))
.boxify(),
}
}
None => {
let element_utf8 = String::from_utf8(Vec::from(element.as_ref()));
let mut potential_conflicts = vec![];
// Find all entries in the manifests that can potentially be a conflict.
// Entry can potentially be a conflict if its lowercased version
// is the same as lowercased version of the current element
if let Some(conflict) = mononoke_types::check_case_conflicts(&paths) {
return Err(ErrorKind::InternalCaseConflict(conflict.0, conflict.1).into());
}
for (basename, _) in mf.list() {
let path =
MPath::join_element_opt(cur_path.as_ref(), Some(&basename));
match (&element_utf8, std::str::from_utf8(basename.as_ref())) {
(Ok(ref element), Ok(ref basename)) => {
if basename.to_lowercase() == element.to_lowercase() {
potential_conflicts.extend(path);
}
}
_ => {}
}
}
return Ok(());
}
};
// For each potential conflict we need to check if it's present in
// child manifest. If it is, then we've got a conflict, otherwise
// this has been deleted and it's no longer a conflict.
child_mf_id
.find_entries(
ctx.clone(),
repo.get_blobstore(),
potential_conflicts,
)
.collect()
.map(|entries| {
// NOTE: We flatten here because we cannot have a conflict
// at the root.
Loop::Break(entries.into_iter().next().and_then(|x| x.0))
})
.boxify()
}
let mut added = Vec::new();
let mut deleted = HashSet::new();
let mut diff = parent_root_mf
.diff(ctx.clone(), repo.get_blobstore(), child_root_mf)
.compat();
while let Some(diff) = diff
.try_next()
.await
.with_context(|| "Error loading diff")?
{
match diff {
Diff::Added(Some(path), _) => {
added.push(path);
}
Diff::Removed(Some(path), _) => {
deleted.insert(path);
}
_ => {}
};
}
// Check if there any conflicts internal to the change being landed. Past this point, the
// conflicts we'll report are external (i.e. they are dependent on the parent commit).
if let Some(conflict) = mononoke_types::check_case_conflicts(added.iter()) {
return Err(ErrorKind::InternalCaseConflict(conflict.0, conflict.1).into());
}
fn lowercase_mpath(e: &MPath) -> Option<Vec<String>> {
e.into_iter()
.map(|e| mononoke_types::path::lowercase_mpath_element(e))
.collect()
}
let mut path_tree_builder = PathTreeBuilder::default();
for path in added.iter() {
let path = match lowercase_mpath(&path) {
Some(path) => path,
None => continue, // We ignore non-UTF8 paths
};
path_tree_builder.insert(path);
}
let path_tree = Arc::new(path_tree_builder.freeze());
let candidates = bounded_traversal::bounded_traversal_stream(
256,
Some((parent_root_mf, path_tree, None)),
|(mf_id, path_tree, path)| async move {
let mf = mf_id.load(ctx.clone(), repo.blobstore()).await?;
let mut output = vec![];
let mut recurse = vec![];
for (name, entry) in mf.list() {
let lowered_el = match mononoke_types::path::lowercase_mpath_element(&name) {
Some(lowered_el) => lowered_el,
None => continue,
};
if let Some(subtree) = path_tree.as_ref().subentries.get(&lowered_el) {
let path = MPath::join_opt_element(path.as_ref(), &name);
if let Entry::Tree(sub_mf_id) = entry {
recurse.push((sub_mf_id, subtree.clone(), Some(path.clone())));
}
},
)
})
output.push(path);
};
}
Result::<_, Error>::Ok((output, recurse))
},
)
.map_ok(|entries| stream::iter(entries.into_iter().map(Result::<_, Error>::Ok)))
.try_flatten()
.try_collect::<Vec<_>>()
.await
.with_context(|| "Error scanning for conflicting paths")?;
let files = added
.iter()
.chain(candidates.iter().filter(|c| !deleted.contains(c)));
if let Some((child, parent)) = mononoke_types::check_case_conflicts(files) {
return Err(ErrorKind::ExternalCaseConflict(child, parent).into());
}
Ok(())
}
#[derive(Default)]
struct PathTreeBuilder {
pub subentries: HashMap<String, Self>,
}
impl PathTreeBuilder {
pub fn insert(&mut self, path: Vec<String>) {
path.into_iter().fold(self, |node, element| {
node.subentries
.entry(element)
.or_insert_with(Default::default)
});
}
pub fn freeze(self) -> PathTree {
let subentries = self
.subentries
.into_iter()
.map(|(el, t)| (el, Arc::new(t.freeze())))
.collect();
PathTree { subentries }
}
}
struct PathTree {
pub subentries: HashMap<String, Arc<Self>>,
}

View File

@ -17,8 +17,7 @@ use assert_matches::assert_matches;
use blobrepo::BlobRepo;
use blobrepo_errors::ErrorKind;
use blobrepo_hg::{
check_case_conflict_in_manifest,
repo_commit::{compute_changed_files, UploadEntries},
repo_commit::{check_case_conflicts, compute_changed_files, UploadEntries},
BlobRepoHg,
};
use blobstore::{Loadable, Storable};
@ -850,7 +849,7 @@ fn test_case_conflict_in_manifest(fb: FacebookInit) {
.unwrap()
.unwrap();
for (path, result) in &[
for (path, expect_conflict) in &[
("dir1/file_1_in_dir1", false),
("dir1/file_1_IN_dir1", true),
("DiR1/file_1_in_dir1", true),
@ -878,23 +877,18 @@ fn test_case_conflict_in_manifest(fb: FacebookInit) {
.await
.unwrap()
.manifestid();
assert_eq!(
(check_case_conflict_in_manifest(
repo.clone(),
ctx.clone(),
mf,
child_mf,
MPath::new(path).unwrap()
))
.compat()
.await
.unwrap()
.is_some(),
*result,
"{} expected to {} cause conflict",
path,
if *result { "" } else { "not" },
);
let conflicts = check_case_conflicts(&ctx, &repo, child_mf, Some(mf)).await;
if *expect_conflict {
assert_matches!(
conflicts
.as_ref()
.map_err(|e| e.downcast_ref::<ErrorKind>()),
Err(Some(ErrorKind::ExternalCaseConflict(..)))
);
} else {
assert_matches!(conflicts, Ok(()));
}
}
});
}

View File

@ -1026,7 +1026,7 @@ impl CaseConflictTrie {
});
}
if let Ok(lower) = lowercase_mpath_element(&element) {
if let Some(lower) = lowercase_mpath_element(&element) {
if let Some(conflict) = self.lowercase_to_original.get(&lower) {
return Err(ReverseMPath {
elements: vec![conflict.clone()],
@ -1059,7 +1059,7 @@ impl CaseConflictTrie {
if remove {
self.children.remove(&element);
if let Ok(lower) = lowercase_mpath_element(&element) {
if let Some(lower) = lowercase_mpath_element(&element) {
self.lowercase_to_original.remove(&lower);
}
}
@ -1156,10 +1156,10 @@ where
}
}
pub fn lowercase_mpath_element(e: &MPathElement) -> Result<String, Error> {
let s = std::str::from_utf8(e.as_ref())?;
pub fn lowercase_mpath_element(e: &MPathElement) -> Option<String> {
let s = std::str::from_utf8(e.as_ref()).ok()?;
let s = s.to_lowercase();
Ok(s)
Some(s)
}
#[cfg(test)]