mononoke: change required_checks logic

Summary:
UploadEntries::finalize() function is called during pushes and blobimports i.e.
when new commit is created from hg changeset. It does a few checks, one of them
is making sure that all entries referenced by a manifest are present in the
blobstore.

The problem is that it's actually quite wasteful - if a single file was
modified in a directory then it'll check that all of the entries in this
directory are present.

Let's change the logic and check that only entries that were added by this
commit are present in the blobstore (this is what find_intersection_of_diffs is
doing). That doesn't make it less correct - if an entry is referenced in a
manifest but not added in this commit then it will be checked in one of the
parent commits.

Reviewed By: farnz

Differential Revision: D17664062

fbshipit-source-id: 6e7e16084c9126bdb757793d441707fada5052ff
This commit is contained in:
Stanislau Hlebik 2019-09-30 07:19:45 -07:00 committed by Facebook Github Bot
parent 19331de1c1
commit 71378bb0c1
4 changed files with 141 additions and 83 deletions

View File

@ -24,7 +24,7 @@ pub use crate::repo::{save_bonsai_changesets, BlobRepo, CreateChangeset};
pub use crate::repo_commit::ChangesetHandle;
pub use changeset_fetcher::ChangesetFetcher;
// TODO: This is exported for testing - is this the right place for it?
pub use crate::repo_commit::compute_changed_files;
pub use crate::repo_commit::{compute_changed_files, UploadEntries};
pub use utils::DangerousOverride;
pub mod internal {

View File

@ -1854,7 +1854,7 @@ impl CreateChangeset {
make_new_changeset(parents, root_mf_id, cs_metadata, files)
})
.and_then({
cloned!(ctx);
cloned!(ctx, parent_manifest_hashes);
move |hg_cs| {
create_bonsai_changeset_object(
ctx,
@ -1929,7 +1929,13 @@ impl CreateChangeset {
.context("While writing to blobstore")
.join(
entry_processor
.finalize(ctx, filenodes, cs_id)
.finalize(
ctx,
filenodes,
cs_id,
root_mf_id,
parent_manifest_hashes,
)
.context("While finalizing processing"),
)
.from_err()

View File

@ -10,8 +10,7 @@ use std::sync::{Arc, Mutex};
use cloned::cloned;
use failure_ext::{
err_msg, format_err, prelude::*, Compat, Error, FutureFailureErrorExt, Result,
StreamFailureErrorExt,
format_err, prelude::*, Compat, Error, FutureFailureErrorExt, Result, StreamFailureErrorExt,
};
use futures::future::{self, ok, Future, Shared, SharedError, SharedItem};
use futures::stream::{self, Stream};
@ -23,16 +22,17 @@ use scuba_ext::{ScubaSampleBuilder, ScubaSampleBuilderExt};
use stats::Timeseries;
use tracing::{trace_args, Traced};
use ::manifest::{find_intersection_of_diffs, Entry};
use blobstore::Blobstore;
use context::CoreContext;
use filenodes::{FilenodeInfo, Filenodes};
use mercurial_types::{
blobs::{ChangesetMetadata, HgBlobChangeset, HgBlobEntry, HgBlobEnvelope, HgChangesetContent},
manifest::{self, Content},
manifest,
manifest_utils::{changed_entry_stream, ChangedEntry, EntryStatus},
nodehash::{HgFileNodeId, HgManifestId},
Changeset, HgChangesetId, HgEntry, HgEntryId, HgManifest, HgNodeHash, HgNodeKey, HgParents,
MPath, RepoPath, NULL_HASH,
Changeset, HgChangesetId, HgEntry, HgManifest, HgNodeHash, HgNodeKey, HgParents, MPath,
RepoPath, NULL_HASH,
};
use mononoke_types::{self, BonsaiChangeset, ChangesetId, RepositoryId};
use stats::define_stats;
@ -46,8 +46,6 @@ define_stats! {
process_file_entry: timeseries(RATE, SUM),
process_tree_entry: timeseries(RATE, SUM),
finalize_required: timeseries(RATE, AVG, SUM),
finalize_required_found: timeseries(RATE, AVG, SUM),
finalize_required_uploading: timeseries(RATE, AVG, SUM),
finalize_parent: timeseries(RATE, AVG, SUM),
finalize_uploaded: timeseries(RATE, AVG, SUM),
finalize_uploaded_filenodes: timeseries(RATE, AVG, SUM),
@ -133,9 +131,6 @@ impl ChangesetHandle {
/// State used while tracking uploaded entries, to ensure that a changeset ends up with the right
/// set of blobs uploaded, and all filenodes present.
struct UploadEntriesState {
/// Listing of blobs that we need, based on parsing the root manifest and all the newly
/// uploaded child manifests
required_entries: HashMap<RepoPath, HgEntryId>,
/// All the blobs that have been uploaded in this changeset
uploaded_entries: HashMap<RepoPath, HgBlobEntry>,
/// Parent hashes (if any) of the blobs that have been uploaded in this changeset. Used for
@ -164,7 +159,6 @@ impl UploadEntries {
Self {
scuba_logger,
inner: Arc::new(Mutex::new(UploadEntriesState {
required_entries: HashMap::new(),
uploaded_entries: HashMap::new(),
parents: HashSet::new(),
blobstore,
@ -186,40 +180,14 @@ impl UploadEntries {
entry: &HgBlobEntry,
path: RepoPath,
) -> BoxFuture<(), Error> {
let inner_mutex = self.inner.clone();
let parents_found = self.find_parents(ctx.clone(), entry, path.clone());
let entry_hash = entry.get_hash().into_nodehash();
let entry_type = entry.get_type();
entry
.get_content(ctx)
.and_then(move |content| match content {
Content::Tree(manifest) => {
for entry in manifest.list() {
let mpath = MPath::join_element_opt(path.mpath(), entry.get_name());
let mpath = match mpath {
Some(mpath) => mpath,
None => {
return future::err(err_msg(
"internal error: unexpected empty MPath",
))
.boxify();
}
};
let path = match entry.get_type() {
manifest::Type::File(_) => RepoPath::FilePath(mpath),
manifest::Type::Tree => RepoPath::DirectoryPath(mpath),
};
let mut inner = inner_mutex.lock().expect("Lock poisoned");
inner.required_entries.insert(path, entry.get_hash());
}
future::ok(()).boxify()
}
_ => future::err(ErrorKind::NotAManifest(entry_hash, entry_type).into()).boxify(),
})
.join(parents_found)
.map(|_| ())
if entry.get_type() != manifest::Type::Tree {
future::err(
ErrorKind::NotAManifest(entry.get_hash().into_nodehash(), entry.get_type()).into(),
)
.boxify()
} else {
self.find_parents(ctx.clone(), entry, path.clone())
}
}
fn find_parents(
@ -261,12 +229,6 @@ impl UploadEntries {
)
.boxify();
}
{
let mut inner = self.inner.lock().expect("Lock poisoned");
inner
.required_entries
.insert(RepoPath::root(), entry.get_hash());
}
self.process_one_entry(ctx, entry, RepoPath::root())
}
@ -326,39 +288,41 @@ impl UploadEntries {
ctx: CoreContext,
filenodes: Arc<dyn Filenodes>,
cs_id: HgNodeHash,
mf_id: HgManifestId,
parent_manifest_ids: Vec<HgManifestId>,
) -> BoxFuture<(), Error> {
let required_checks = {
let inner = self.inner.lock().expect("Lock poisoned");
let required_len = inner.required_entries.len();
let blobstore = inner.blobstore.clone();
let boxed_blobstore = blobstore.boxed();
find_intersection_of_diffs(
ctx.clone(),
boxed_blobstore.clone(),
mf_id,
parent_manifest_ids,
)
.map({
cloned!(ctx);
move |(path, entry)| {
let (node, is_tree) = match entry {
Entry::Tree(mf_id) => (mf_id.into_nodehash(), true),
Entry::Leaf((_, file_id)) => (file_id.into_nodehash(), false),
};
let checks: Vec<_> = inner
.required_entries
.iter()
.filter_map(|(path, entryid)| {
if inner.uploaded_entries.contains_key(path) {
None
} else {
let path = path.clone();
let assert = Self::assert_in_blobstore(
ctx.clone(),
inner.blobstore.clone(),
entryid.into_nodehash(),
path.is_tree(),
);
Some(
assert
.with_context(move |_| format!("While checking for path: {}", path))
.from_err(),
)
}
})
.collect();
let assert =
Self::assert_in_blobstore(ctx.clone(), blobstore.clone(), node, is_tree);
STATS::finalize_required.add_value(required_len as i64);
STATS::finalize_required_found.add_value((required_len - checks.len()) as i64);
STATS::finalize_required_uploading.add_value(checks.len() as i64);
future::join_all(checks).timed({
assert
.with_context(move |_| format!("While checking for path: {:?}", path))
.map_err(Error::from)
}
})
.buffer_unordered(100)
.collect()
.map(|checks| {
STATS::finalize_required.add_value(checks.len() as i64);
})
.timed({
let mut scuba_logger = self.scuba_logger();
move |stats, result| {
if result.is_ok() {

View File

@ -10,7 +10,7 @@ mod tracing_blobstore;
mod utils;
use benchmark_lib::{new_benchmark_repo, DelaySettings, GenManifest};
use blobrepo::{compute_changed_files, BlobRepo};
use blobrepo::{compute_changed_files, BlobRepo, UploadEntries};
use blobstore::Storable;
use cloned::cloned;
use context::CoreContext;
@ -26,13 +26,16 @@ use mercurial_types::{
manifest, Changeset, FileType, HgChangesetId, HgEntry, HgFileNodeId, HgManifestId, HgParents,
MPath, MPathElement, RepoPath,
};
use mercurial_types_mocks::nodehash::ONES_FNID;
use mercurial_types_mocks::nodehash::{ONES_CSID, ONES_FNID};
use mononoke_types::bonsai_changeset::BonsaiChangesetMut;
use mononoke_types::{
blob::BlobstoreValue, BonsaiChangeset, ChangesetId, DateTime, FileChange, FileContents,
};
use rand::{distributions::Normal, SeedableRng};
use rand_xorshift::XorShiftRng;
use scuba_ext::ScubaSampleBuilder;
use sql_ext::SqlConstructors;
use sqlfilenodes::SqlFilenodes;
use std::{
collections::{BTreeMap, HashMap, HashSet},
iter::FromIterator,
@ -432,6 +435,91 @@ test_both_repotypes!(
create_bad_changeset_eager
);
fn upload_entries_finalize_success(fb: FacebookInit, repo: BlobRepo) {
let ctx = CoreContext::test_mock(fb);
let fake_file_path = RepoPath::file("file").expect("Can't generate fake RepoPath");
let (filehash, file_future) =
upload_file_no_parents(ctx.clone(), &repo, "blob", &fake_file_path);
let (roothash, root_manifest_future) = upload_manifest_no_parents(
ctx.clone(),
&repo,
format!("file\0{}\n", filehash),
&RepoPath::root(),
);
let (file_blob, _) = run_future(file_future).unwrap();
let (root_mf_blob, _) = run_future(root_manifest_future).unwrap();
let entries = UploadEntries::new(
repo.get_blobstore(),
repo.get_repoid(),
ScubaSampleBuilder::with_discard(),
false, /* draft */
);
run_future(entries.process_root_manifest(ctx.clone(), &root_mf_blob)).unwrap();
run_future(entries.process_one_entry(ctx.clone(), &file_blob, fake_file_path)).unwrap();
let filenodes = Arc::new(SqlFilenodes::with_sqlite_in_memory().unwrap());
run_future(entries.finalize(
ctx.clone(),
filenodes,
ONES_CSID.into_nodehash(),
HgManifestId::new(roothash),
vec![],
))
.unwrap();
}
test_both_repotypes!(
upload_entries_finalize_success,
upload_entries_finalize_success_lazy,
upload_entries_finalize_success_eager
);
fn upload_entries_finalize_fail(fb: FacebookInit, repo: BlobRepo) {
let entries = UploadEntries::new(
repo.get_blobstore(),
repo.get_repoid(),
ScubaSampleBuilder::with_discard(),
false, /* draft */
);
let ctx = CoreContext::test_mock(fb);
let dirhash = string_to_nodehash("c2d60b35a8e7e034042a9467783bbdac88a0d219");
let (_, root_manifest_future) = upload_manifest_no_parents(
ctx.clone(),
&repo,
format!("dir\0{}t\n", dirhash),
&RepoPath::root(),
);
let (root_mf_blob, _) = run_future(root_manifest_future).unwrap();
run_future(entries.process_root_manifest(ctx.clone(), &root_mf_blob)).unwrap();
let filenodes = Arc::new(SqlFilenodes::with_sqlite_in_memory().unwrap());
let res = run_future(entries.finalize(
ctx.clone(),
filenodes,
ONES_CSID.into_nodehash(),
HgManifestId::new(root_mf_blob.get_hash().into_nodehash()),
vec![],
));
assert!(res.is_err());
}
test_both_repotypes!(
upload_entries_finalize_fail,
upload_entries_finalize_fail_lazy,
upload_entries_finalize_fail_eager
);
fn create_double_linknode(fb: FacebookInit, repo: BlobRepo) {
let ctx = CoreContext::test_mock(fb);
let fake_file_path = RepoPath::file("dir/file").expect("Can't generate fake RepoPath");