convert changeset creation to new type futures

Summary: Convert changeset creation to new type futures

Reviewed By: krallin

Differential Revision: D25430405

fbshipit-source-id: 64eb6dbc324846408e60c77e273c5d5edfd59318
This commit is contained in:
Pavel Aslanov 2020-12-11 13:54:07 -08:00 committed by Facebook GitHub Bot
parent 49de273c77
commit 32585287f1
11 changed files with 439 additions and 571 deletions

View File

@ -315,7 +315,7 @@ impl UploadChangesets {
changesets: impl Stream<Item = (RevIdx, HgNodeHash), Error = Error> + Send + 'static,
is_import_from_beggining: bool,
origin_repo: Option<BlobRepo>,
) -> BoxStream<(RevIdx, SharedItem<(BonsaiChangeset, HgBlobChangeset)>), Error> {
) -> BoxStream<(RevIdx, (BonsaiChangeset, HgBlobChangeset)), Error> {
let Self {
ctx,
blobrepo,
@ -531,6 +531,7 @@ impl UploadChangesets {
oneshot::spawn(
cshandle
.get_completed_changeset()
.compat()
.with_context(move || format!("While uploading changeset: {}", csid))
.from_err(),
&executor,

View File

@ -26,12 +26,12 @@ mononoke_types = { path = "../../mononoke_types" }
repo_blobstore = { path = "../repo_blobstore" }
scuba_ext = { path = "../../common/scuba_ext" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
failure_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { package = "futures_01_ext", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext_old = { package = "futures_01_ext", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_stats = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
stats = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
tracing = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
futures = { version = "0.3.5", features = ["async-await", "compat"] }
futures-old = { package = "futures", version = "0.1" }
tokio = { version = "=0.2.13", features = ["full"] }
uuid = { version = "0.8.1", features = ["v4"] }

View File

@ -9,22 +9,20 @@ use crate::bonsai_generation::{create_bonsai_changeset_object, save_bonsai_chang
use crate::repo_commit::*;
use crate::{BlobRepoHg, ErrorKind};
use ::manifest::Entry;
use anyhow::{format_err, Error, Result};
use anyhow::{format_err, Context, Error, Result};
use blobrepo::BlobRepo;
use bonsai_hg_mapping::{BonsaiHgMapping, BonsaiHgMappingEntry};
use changesets::{ChangesetInsert, Changesets};
use cloned::cloned;
use context::CoreContext;
use failure_ext::{Compat, FutureFailureErrorExt, FutureFailureExt};
use futures::{
future::{BoxFuture, FutureExt, TryFutureExt},
channel::oneshot,
compat::Future01CompatExt,
future::{self, BoxFuture, FutureExt, TryFutureExt},
stream::BoxStream,
};
use futures_ext::{spawn_future, BoxFuture as OldBoxFuture, FutureExt as _};
use futures_old::future::{self, Future};
use futures_old::sync::oneshot;
use futures_old::IntoFuture;
use futures_stats::Timed;
use futures_ext::FbFutureExt;
use futures_stats::TimedTryFutureExt;
use mercurial_types::{
blobs::{ChangesetMetadata, HgBlobChangeset},
HgFileNodeId, HgManifestId, HgNodeHash, RepoPath,
@ -32,11 +30,7 @@ use mercurial_types::{
use mononoke_types::{BlobstoreValue, BonsaiChangeset, ChangesetId, MPath};
use scuba_ext::MononokeScubaSampleBuilder;
use stats::prelude::*;
use std::{
convert::From,
sync::{Arc, Mutex},
};
use tracing::{trace_args, EventId, Traced};
use std::sync::{Arc, Mutex};
use uuid::Uuid;
define_stats! {
@ -85,7 +79,6 @@ impl CreateChangeset {
// the final commit hash
let uuid = Uuid::new_v4();
scuba_logger.add("changeset_uuid", format!("{}", uuid));
let event_id = EventId::new();
let entry_processor = UploadEntries::new(repo.get_blobstore(), scuba_logger.clone());
let (signal_parent_ready, can_be_parent) = oneshot::channel();
@ -96,22 +89,30 @@ impl CreateChangeset {
cloned!(ctx, entry_processor);
let root_manifest = self.root_manifest;
let sub_entries = self.sub_entries;
async move { process_entries(&ctx, &entry_processor, root_manifest, sub_entries).await }
}
.boxed()
.compat()
.context("While processing entries")
.traced_with_id(&ctx.trace(), "uploading entries", trace_args!(), event_id);
async move {
process_entries(&ctx, &entry_processor, root_manifest, sub_entries)
.await
.context("While processing entries")
}
};
let parents_complete = extract_parents_complete(&self.p1, &self.p2);
let parents_complete = extract_parents_complete(&self.p1, &self.p2)
.try_timed()
.map({
let mut scuba_logger = scuba_logger.clone();
move |result| match result {
Err(err) => Err(err.context("While waiting for parents to complete")),
Ok((stats, result)) => {
scuba_logger
.add_future_stats(&stats)
.log_with_msg("Parents completed", None);
Ok(result)
}
}
});
let parents_data = handle_parents(scuba_logger.clone(), self.p1, self.p2)
.context("While waiting for parents to upload data")
.traced_with_id(
&ctx.trace(),
"waiting for parents data",
trace_args!(),
event_id,
);
.map_err(|err| err.context("While waiting for parents to upload data"));
let must_check_case_conflicts = self.must_check_case_conflicts.clone();
let create_bonsai_changeset_object = match self.create_bonsai_changeset_hook {
Some(hook) => Arc::clone(&hook),
@ -137,285 +138,193 @@ impl CreateChangeset {
},
),
};
let changeset = {
let mut scuba_logger = scuba_logger.clone();
upload_entries
.join(parents_data)
.from_err()
.and_then({
cloned!(ctx, repo, mut scuba_logger, signal_parent_ready);
let expected_files = self.expected_files;
let cs_metadata = self.cs_metadata;
let blobstore = repo.get_blobstore();
cloned!(ctx, repo, signal_parent_ready, mut scuba_logger);
let expected_files = self.expected_files;
let cs_metadata = self.cs_metadata;
let blobstore = repo.get_blobstore();
move |(root_mf_id, (parents, parent_manifest_hashes, bonsai_parents))| {
let files = if let Some(expected_files) = expected_files {
STATS::create_changeset_expected_cf.add_value(1);
// We are trusting the callee to provide a list of changed files, used
// by the import job
future::ok(expected_files).boxify()
} else {
STATS::create_changeset_compute_cf.add_value(1);
compute_changed_files(
ctx.clone(),
repo.clone(),
root_mf_id,
parent_manifest_hashes.get(0).cloned(),
parent_manifest_hashes.get(1).cloned(),
)
.boxed()
.compat()
.boxify()
};
let p1_mf = parent_manifest_hashes.get(0).cloned();
let check_case_conflicts = if must_check_case_conflicts {
cloned!(ctx, repo);
async move {
check_case_conflicts(&ctx, &repo, root_mf_id, p1_mf).await
}
.boxed()
.compat()
.left_future()
} else {
future::ok(()).right_future()
};
let changesets = files
.join(check_case_conflicts)
.and_then(move |(files, ())| {
STATS::create_changeset_cf_count.add_value(files.len() as i64);
make_new_changeset(parents, root_mf_id, cs_metadata, files)
})
.and_then({
cloned!(ctx, parent_manifest_hashes);
move |hg_cs| {
create_bonsai_changeset_object(
ctx,
hg_cs.clone(),
parent_manifest_hashes,
bonsai_parents,
repo.clone(),
)
.map_ok(|bonsai_cs| (hg_cs, bonsai_cs))
.compat()
}
});
changesets
.context("While computing changed files")
.and_then({
cloned!(ctx);
move |(blobcs, bonsai_cs)| {
let fut: OldBoxFuture<
(HgBlobChangeset, BonsaiChangeset),
Error,
> = (move || {
let bonsai_blob = bonsai_cs.clone().into_blob();
let bcs_id = bonsai_blob.id().clone();
let cs_id = blobcs.get_changeset_id().into_nodehash();
let manifest_id = blobcs.manifestid();
if let Some(expected_nodeid) = expected_nodeid {
if cs_id != expected_nodeid {
return future::err(
ErrorKind::InconsistentChangesetHash(
expected_nodeid,
cs_id,
blobcs,
)
.into(),
)
.boxify();
}
}
scuba_logger
.add("changeset_id", format!("{}", cs_id))
.log_with_msg("Changeset uuid to hash mapping", None);
// NOTE(luk): an attempt was made in D8187210 to split the
// upload_entries signal into upload_entries and
// processed_entries and to signal_parent_ready after
// upload_entries, so that one doesn't need to wait for the
// entries to be processed. There were no performance gains
// from that experiment
//
// We deliberately eat this error - this is only so that
// another changeset can start verifying data in the blob
// store while we verify this one
let _ = signal_parent_ready
.lock()
.expect("poisoned lock")
.take()
.expect("signal_parent_ready cannot be taken yet")
.send(Ok((bcs_id, cs_id, manifest_id)));
let bonsai_cs_fut = {
cloned!(ctx, blobstore, bonsai_cs);
async move {
save_bonsai_changeset_object(
&ctx,
&blobstore,
bonsai_cs.clone(),
)
.await
}
}
.boxed()
.compat();
{
cloned!(ctx, blobcs);
async move { blobcs.save(&ctx, &blobstore).await }
}
.boxed()
.compat()
.join(bonsai_cs_fut)
.context("While writing to blobstore")
.join(
entry_processor
.finalize(ctx, root_mf_id, parent_manifest_hashes)
.context("While finalizing processing"),
)
.from_err()
.map(move |_| (blobcs, bonsai_cs))
.boxify()
})();
fut.context(
"While creating and verifying Changeset for blobstore",
)
}
})
.traced_with_id(
&ctx.trace(),
"uploading changeset",
trace_args!(),
event_id,
)
.from_err()
async move {
let (root_mf_id, (parents, parent_manifest_hashes, bonsai_parents)) =
future::try_join(upload_entries, parents_data).await?;
let files = async {
if let Some(expected_files) = expected_files {
STATS::create_changeset_expected_cf.add_value(1);
// We are trusting the callee to provide a list of changed files, used
// by the import job
Ok(expected_files)
} else {
STATS::create_changeset_compute_cf.add_value(1);
compute_changed_files(
ctx.clone(),
repo.clone(),
root_mf_id,
parent_manifest_hashes.get(0).cloned(),
parent_manifest_hashes.get(1).cloned(),
)
.await
}
})
.timed(move |stats, result| {
if result.is_ok() {
};
let p1_mf = parent_manifest_hashes.get(0).cloned();
let check_case_conflicts = async {
if must_check_case_conflicts {
check_case_conflicts(&ctx, &repo, root_mf_id, p1_mf).await?;
}
Ok::<_, Error>(())
};
let (files, ()) = future::try_join(files, check_case_conflicts).await?;
STATS::create_changeset_cf_count.add_value(files.len() as i64);
let hg_cs = make_new_changeset(parents, root_mf_id, cs_metadata, files)?;
let bonsai_cs = create_bonsai_changeset_object(
ctx.clone(),
hg_cs.clone(),
parent_manifest_hashes.clone(),
bonsai_parents,
repo.clone(),
)
.await?;
let bonsai_blob = bonsai_cs.clone().into_blob();
let bcs_id = bonsai_blob.id().clone();
let cs_id = hg_cs.get_changeset_id().into_nodehash();
let manifest_id = hg_cs.manifestid();
if let Some(expected_nodeid) = expected_nodeid {
if cs_id != expected_nodeid {
return Err(ErrorKind::InconsistentChangesetHash(
expected_nodeid,
cs_id,
hg_cs,
)
.into());
}
}
scuba_logger
.add("changeset_id", format!("{}", cs_id))
.log_with_msg("Changeset uuid to hash mapping", None);
// NOTE(luk): an attempt was made in D8187210 to split the
// upload_entries signal into upload_entries and
// processed_entries and to signal_parent_ready after
// upload_entries, so that one doesn't need to wait for the
// entries to be processed. There were no performance gains
// from that experiment
//
// We deliberately eat this error - this is only so that
// another changeset can start verifying data in the blob
// store while we verify this one
let _ = signal_parent_ready
.lock()
.expect("poisoned lock")
.take()
.expect("signal_parent_ready cannot be taken yet")
.send(Ok((bcs_id, cs_id, manifest_id)));
futures::try_join!(
save_bonsai_changeset_object(&ctx, &blobstore, bonsai_cs.clone()),
hg_cs.save(&ctx, &blobstore),
entry_processor
.finalize(ctx.clone(), root_mf_id, parent_manifest_hashes)
.map_err(|err| err.context("While finalizing processing")),
)?;
Ok::<_, Error>((hg_cs, bonsai_cs))
}
}
.try_timed()
.map({
cloned!(mut scuba_logger);
move |result| {
match result {
Ok((stats, result)) => {
scuba_logger
.add_future_stats(&stats)
.log_with_msg("Changeset created", None);
Ok(result)
}
Ok(())
})
.inspect_err({
cloned!(signal_parent_ready);
move |e| {
Err(err) => {
let err =
err.context("While creating and verifying Changeset for blobstore");
let trigger = signal_parent_ready.lock().expect("poisoned lock").take();
if let Some(trigger) = trigger {
// Ignore errors if the receiving end has gone away.
let e = format_err!("signal_parent_ready failed: {:?}", e);
let e = format_err!("signal_parent_ready failed: {:?}", err);
let _ = trigger.send(Err(e));
}
Err(err)
}
})
};
let parents_complete = parents_complete
.context("While waiting for parents to complete")
.traced_with_id(
&ctx.trace(),
"waiting for parents complete",
trace_args!(),
event_id,
)
.timed({
let mut scuba_logger = scuba_logger.clone();
move |stats, result| {
if result.is_ok() {
scuba_logger
.add_future_stats(&stats)
.log_with_msg("Parents completed", None);
}
Ok(())
}
});
}
});
let complete_changesets = repo.get_changesets_object();
cloned!(repo);
let repoid = repo.get_repoid();
let changeset_complete_fut = changeset
.join(parents_complete)
.and_then({
cloned!(ctx);
let bonsai_hg_mapping = repo.get_bonsai_hg_mapping().clone();
move |((hg_cs, bonsai_cs), _)| {
let bcs_id = bonsai_cs.get_changeset_id();
let bonsai_hg_entry = BonsaiHgMappingEntry {
repo_id: repoid.clone(),
hg_cs_id: hg_cs.get_changeset_id(),
bcs_id,
};
let complete_changesets = repo.get_changesets_object();
let bonsai_hg_mapping = repo.get_bonsai_hg_mapping().clone();
cloned!(repo);
let changeset_complete_fut = async move {
let ((hg_cs, bonsai_cs), _) = future::try_join(changeset, parents_complete).await?;
bonsai_hg_mapping
.add(ctx.clone(), bonsai_hg_entry)
.map(move |_| (hg_cs, bonsai_cs))
.context("While inserting mapping")
.traced_with_id(
&ctx.trace(),
"uploading bonsai hg mapping",
trace_args!(),
event_id,
)
// update bonsai mapping
let bcs_id = bonsai_cs.get_changeset_id();
let bonsai_hg_entry = BonsaiHgMappingEntry {
repo_id: repoid.clone(),
hg_cs_id: hg_cs.get_changeset_id(),
bcs_id,
};
bonsai_hg_mapping
.add(ctx.clone(), bonsai_hg_entry)
.compat()
.await
.context("While inserting mapping")?;
// update changeset mapping
let completion_record = ChangesetInsert {
repo_id: repo.get_repoid(),
cs_id: bonsai_cs.get_changeset_id(),
parents: bonsai_cs.parents().into_iter().collect(),
};
complete_changesets
.add(ctx.clone(), completion_record)
.compat()
.await
.context("While inserting into changeset table")?;
Ok::<_, Error>((bonsai_cs, hg_cs))
}
.try_timed()
.map({
cloned!(mut scuba_logger);
move |result| match result {
Ok((stats, result)) => {
scuba_logger
.add_future_stats(&stats)
.log_with_msg("CreateChangeset Finished", None);
Ok(result)
}
})
.and_then(move |(hg_cs, bonsai_cs)| {
let completion_record = ChangesetInsert {
repo_id: repo.get_repoid(),
cs_id: bonsai_cs.get_changeset_id(),
parents: bonsai_cs.parents().into_iter().collect(),
};
complete_changesets
.add(ctx.clone(), completion_record)
.map(|_| (bonsai_cs, hg_cs))
.context("While inserting into changeset table")
.traced_with_id(
&ctx.trace(),
"uploading final changeset",
trace_args!(),
event_id,
)
})
.with_context(move || {
format!(
Err(err) => Err(err.context(format!(
"While creating Changeset {:?}, uuid: {}",
expected_nodeid, uuid
)
})
.timed({
move |stats, result| {
if result.is_ok() {
scuba_logger
.add_future_stats(&stats)
.log_with_msg("CreateChangeset Finished", None);
}
Ok(())
}
});
))),
}
});
let can_be_parent = can_be_parent
.into_future()
.then(|r| match r {
.map(|r| match r {
Ok(res) => res,
Err(e) => Err(format_err!("can_be_parent: {:?}", e)),
})
.map_err(Compat)
.boxify()
.shared();
.boxed()
.try_shared();
ChangesetHandle::new_pending(
can_be_parent,
spawn_future(changeset_complete_fut)
.map_err(Compat)
.boxify()
.shared(),
)
let completion_future = tokio::spawn(changeset_complete_fut)
.map(|result| result?)
.boxed()
.try_shared();
ChangesetHandle::new_pending(can_be_parent, completion_future)
}
}

View File

@ -34,7 +34,7 @@ use cloned::cloned;
use context::CoreContext;
use filenodes::{FilenodeInfo, FilenodeRangeResult, FilenodeResult, Filenodes};
use futures::{FutureExt, TryFutureExt, TryStreamExt};
use futures_ext::{BoxFuture, BoxStream, FutureExt as _, StreamExt};
use futures_ext_old::{BoxFuture, BoxStream, FutureExt as _, StreamExt};
use futures_old::{
future,
stream::{self, futures_unordered},

View File

@ -8,19 +8,14 @@
use crate::BlobRepoHg;
use anyhow::{format_err, Context, Error, Result};
use cloned::cloned;
use failure_ext::{Compat, FutureFailureErrorExt};
use futures::{
channel::oneshot,
compat::Future01CompatExt,
future::{self, BoxFuture, FutureExt, TryFutureExt},
stream::{self, BoxStream, StreamExt, TryStreamExt},
stream::{self, BoxStream, TryStreamExt},
};
use futures_ext::{BoxFuture as OldBoxFuture, FutureExt as OldFutureExt};
use futures_old::future::{
self as old_future, result, Future as OldFuture, Shared, SharedError, SharedItem,
};
use futures_old::stream::Stream as OldStream;
use futures_old::sync::oneshot;
use futures_old::IntoFuture;
use futures_stats::{Timed, TimedTryFutureExt};
use futures_ext::{future::TryShared, FbFutureExt};
use futures_stats::TimedTryFutureExt;
use scuba_ext::MononokeScubaSampleBuilder;
use stats::prelude::*;
use std::collections::{HashMap, HashSet};
@ -46,7 +41,6 @@ define_stats! {
prefix = "mononoke.blobrepo_commit";
process_file_entry: timeseries(Rate, Sum),
process_tree_entry: timeseries(Rate, Sum),
finalize_required: timeseries(Rate, Average, Sum),
finalize_parent: timeseries(Rate, Average, Sum),
finalize_uploaded: timeseries(Rate, Average, Sum),
finalize_uploaded_filenodes: timeseries(Rate, Average, Sum),
@ -61,19 +55,25 @@ define_stats! {
/// See `get_completed_changeset()` for the public API you can use to extract the final changeset
#[derive(Clone)]
pub struct ChangesetHandle {
can_be_parent: Shared<OldBoxFuture<(ChangesetId, HgNodeHash, HgManifestId), Compat<Error>>>,
can_be_parent:
TryShared<BoxFuture<'static, Result<(ChangesetId, HgNodeHash, HgManifestId), Error>>>,
// * Shared is required here because a single changeset can have more than one child, and
// all of those children will want to refer to the corresponding future for their parents.
// * The Compat<Error> here is because the error type for Shared (a cloneable wrapper called
// SharedError) doesn't implement Fail, and only implements Error if the wrapped type
// implements Error.
completion_future: Shared<OldBoxFuture<(BonsaiChangeset, HgBlobChangeset), Compat<Error>>>,
completion_future:
TryShared<BoxFuture<'static, Result<(BonsaiChangeset, HgBlobChangeset), Error>>>,
}
impl ChangesetHandle {
pub fn new_pending(
can_be_parent: Shared<OldBoxFuture<(ChangesetId, HgNodeHash, HgManifestId), Compat<Error>>>,
completion_future: Shared<OldBoxFuture<(BonsaiChangeset, HgBlobChangeset), Compat<Error>>>,
can_be_parent: TryShared<
BoxFuture<'static, Result<(ChangesetId, HgNodeHash, HgManifestId), Error>>,
>,
completion_future: TryShared<
BoxFuture<'static, Result<(BonsaiChangeset, HgBlobChangeset), Error>>,
>,
) -> Self {
Self {
can_be_parent,
@ -82,47 +82,40 @@ impl ChangesetHandle {
}
pub fn ready_cs_handle(ctx: CoreContext, repo: BlobRepo, hg_cs: HgChangesetId) -> Self {
let bonsai_cs = repo
.get_bonsai_from_hg(ctx.clone(), hg_cs)
.and_then(move |bonsai_id| {
bonsai_id.ok_or(ErrorKind::BonsaiMappingNotFound(hg_cs).into())
})
.and_then({
cloned!(ctx, repo);
move |csid| {
async move { csid.load(&ctx, repo.blobstore()).await }
.boxed()
.compat()
.from_err()
}
});
let (trigger, can_be_parent) = oneshot::channel();
let can_be_parent = can_be_parent
.into_future()
.map_err(|e| format_err!("can_be_parent: {:?}", e))
.map_err(Compat)
.boxify()
.shared();
.boxed()
.try_shared();
let completion_future = bonsai_cs
.join(
async move { hg_cs.load(&ctx, repo.blobstore()).await }
.boxed()
let bonsai_cs = {
cloned!(ctx, repo);
async move {
let csid = repo
.get_bonsai_from_hg(ctx.clone(), hg_cs)
.compat()
.from_err(),
.await?
.ok_or(ErrorKind::BonsaiMappingNotFound(hg_cs))?;
let bonsai_cs = csid.load(&ctx, repo.blobstore()).await?;
Ok::<_, Error>(bonsai_cs)
}
};
let completion_future = async move {
let (bonsai_cs, hg_cs) = future::try_join(
bonsai_cs,
hg_cs.load(&ctx, repo.blobstore()).map_err(Error::from),
)
.map_err(Compat)
.inspect(move |(bonsai_cs, hg_cs)| {
let _ = trigger.send((
bonsai_cs.get_changeset_id(),
hg_cs.get_changeset_id().into_nodehash(),
hg_cs.manifestid(),
));
})
.boxify()
.shared();
.await?;
let _ = trigger.send((
bonsai_cs.get_changeset_id(),
hg_cs.get_changeset_id().into_nodehash(),
hg_cs.manifestid(),
));
Ok((bonsai_cs, hg_cs))
}
.boxed()
.try_shared();
Self {
can_be_parent,
@ -132,7 +125,7 @@ impl ChangesetHandle {
pub fn get_completed_changeset(
self,
) -> Shared<OldBoxFuture<(BonsaiChangeset, HgBlobChangeset), Compat<Error>>> {
) -> TryShared<BoxFuture<'static, Result<(BonsaiChangeset, HgBlobChangeset), Error>>> {
self.completion_future
}
}
@ -241,14 +234,14 @@ impl UploadEntries {
}
// Check the blobstore to see whether a particular node is present.
fn assert_in_blobstore(
async fn assert_in_blobstore(
ctx: CoreContext,
blobstore: RepoBlobstore,
node_id: HgNodeHash,
is_tree: bool,
) -> OldBoxFuture<(), Error> {
) -> Result<(), Error> {
if node_id == NULL_HASH {
return result(Ok(())).boxify();
return Ok(());
}
let key = if is_tree {
HgManifestId::new(node_id).blobstore_key()
@ -256,67 +249,59 @@ impl UploadEntries {
HgFileNodeId::new(node_id).blobstore_key()
};
async move {
if blobstore.is_present(&ctx, &key).await? {
Ok(())
} else {
Err(BlobstoreError::NotFound(key).into())
}
if blobstore.is_present(&ctx, &key).await? {
Ok(())
} else {
Err(BlobstoreError::NotFound(key).into())
}
.boxed()
.compat()
.boxify()
}
pub fn finalize(
pub async fn finalize(
self,
ctx: CoreContext,
mf_id: HgManifestId,
parent_manifest_ids: Vec<HgManifestId>,
) -> OldBoxFuture<(), Error> {
) -> Result<(), Error> {
let required_checks = {
let blobstore = self.blobstore.clone();
let boxed_blobstore = blobstore.boxed();
find_intersection_of_diffs(
ctx.clone(),
boxed_blobstore.clone(),
mf_id,
parent_manifest_ids,
)
.boxed()
.compat()
.map({
cloned!(ctx);
move |(path, entry)| {
let (node, is_tree) = match entry {
Entry::Tree(mf_id) => (mf_id.into_nodehash(), true),
Entry::Leaf((_, file_id)) => (file_id.into_nodehash(), false),
};
let assert =
Self::assert_in_blobstore(ctx.clone(), blobstore.clone(), node, is_tree);
assert
.with_context(move || format!("While checking for path: {:?}", path))
.map_err(Error::from)
}
})
.buffer_unordered(100)
.collect()
.map(|checks| {
STATS::finalize_required.add_value(checks.len() as i64);
})
.timed({
let mut scuba_logger = self.scuba_logger();
move |stats, result| {
if result.is_ok() {
scuba_logger
.add_future_stats(&stats)
.log_with_msg("Required checks", None);
let mut scuba_logger = self.scuba_logger();
cloned!(ctx);
async move {
let (stats, ()) = find_intersection_of_diffs(
ctx.clone(),
boxed_blobstore.clone(),
mf_id,
parent_manifest_ids,
)
.try_for_each_concurrent(100, {
cloned!(ctx);
move |(path, entry)| {
cloned!(ctx, blobstore);
async move {
let (node, is_tree) = match entry {
Entry::Tree(mf_id) => (mf_id.into_nodehash(), true),
Entry::Leaf((_, file_id)) => (file_id.into_nodehash(), false),
};
Self::assert_in_blobstore(ctx, blobstore, node, is_tree)
.await
.with_context(move || {
format!("While checking for path: {:?}", path)
})?;
Ok(())
}
.boxed()
}
Ok(())
}
})
})
.try_timed()
.await?;
scuba_logger
.add_future_stats(&stats)
.log_with_msg("Required checks", None);
Ok::<_, Error>(())
}
};
let parent_checks = {
@ -326,32 +311,31 @@ impl UploadEntries {
.parents
.iter()
.map(|node_key| {
let assert = Self::assert_in_blobstore(
ctx.clone(),
blobstore.clone(),
node_key.hash,
node_key.path.is_tree(),
);
let node_key = node_key.clone();
assert
cloned!(ctx, blobstore, node_key);
async move {
Self::assert_in_blobstore(
ctx,
blobstore,
node_key.hash,
node_key.path.is_tree(),
)
.await
.with_context(move || {
format!("While checking for a parent node: {}", node_key)
})
.from_err()
})?;
Ok(())
}
})
.collect();
STATS::finalize_parent.add_value(checks.len() as i64);
old_future::join_all(checks).timed({
future::try_join_all(checks).try_timed().map_ok({
let mut scuba_logger = self.scuba_logger();
move |stats, result| {
if result.is_ok() {
scuba_logger
.add_future_stats(&stats)
.log_with_msg("Parent checks", None);
}
Ok(())
move |(stats, _)| {
scuba_logger
.add_future_stats(&stats)
.log_with_msg("Parent checks", None);
}
})
};
@ -379,7 +363,8 @@ impl UploadEntries {
.log_with_msg("Size of changeset", None);
}
parent_checks.join(required_checks).map(|_| ()).boxify()
future::try_join(parent_checks, required_checks).await?;
Ok(())
}
}
@ -436,62 +421,7 @@ pub async fn process_entries<'a>(
pub fn extract_parents_complete(
p1: &Option<ChangesetHandle>,
p2: &Option<ChangesetHandle>,
) -> OldBoxFuture<SharedItem<()>, SharedError<Compat<Error>>> {
match (p1.as_ref(), p2.as_ref()) {
(None, None) => old_future::ok(()).shared().boxify(),
(Some(p), None) | (None, Some(p)) => p
.completion_future
.clone()
.and_then(|_| old_future::ok(()).shared())
.boxify(),
(Some(p1), Some(p2)) => p1
.completion_future
.clone()
// DO NOT replace and_then() with join() or futures_ordered()!
// It may result in a combinatoral explosion in mergy repos, like the following:
// o
// |\
// | o
// |/|
// o |
// |\|
// | o
// |/|
// o |
// |\|
// ...
// |/|
// | ~
// o
// |\
// ~ ~
//
.and_then({
let p2_completion_future = p2.completion_future.clone();
move |_| p2_completion_future
})
.and_then(|_| old_future::ok(()).shared())
.boxify(),
}
.boxify()
}
pub fn handle_parents(
mut scuba_logger: MononokeScubaSampleBuilder,
p1: Option<ChangesetHandle>,
p2: Option<ChangesetHandle>,
) -> OldBoxFuture<(HgParents, Vec<HgManifestId>, Vec<ChangesetId>), Error> {
let p1 = p1.map(|cs| cs.can_be_parent);
let p2 = p2.map(|cs| cs.can_be_parent);
let p1 = match p1 {
Some(p1) => p1.map(Some).boxify(),
None => old_future::ok(None).boxify(),
};
let p2 = match p2 {
Some(p2) => p2.map(Some).boxify(),
None => old_future::ok(None).boxify(),
};
) -> BoxFuture<'static, Result<(), Error>> {
// DO NOT replace and_then() with join() or futures_ordered()!
// It may result in a combinatoral explosion in mergy repos, like the following:
// o
@ -511,50 +441,74 @@ pub fn handle_parents(
// |\
// ~ ~
//
p1.and_then(|p1| p2.map(|p2| (p1, p2)))
.and_then(|(p1, p2)| {
let mut bonsai_parents = vec![];
let p1 = match p1 {
Some(item) => {
let (bonsai_cs_id, hash, manifest) = *item;
bonsai_parents.push(bonsai_cs_id);
(Some(hash), Some(manifest))
}
None => (None, None),
};
let p2 = match p2 {
Some(item) => {
let (bonsai_cs_id, hash, manifest) = *item;
bonsai_parents.push(bonsai_cs_id);
(Some(hash), Some(manifest))
}
None => (None, None),
};
Ok((p1, p2, bonsai_parents))
})
.map_err(|e| Error::from(e))
.map(
move |((p1_hash, p1_manifest), (p2_hash, p2_manifest), bonsai_parents)| {
let parents = HgParents::new(p1_hash, p2_hash);
let mut parent_manifest_hashes = vec![];
if let Some(p1_manifest) = p1_manifest {
parent_manifest_hashes.push(p1_manifest);
}
if let Some(p2_manifest) = p2_manifest {
parent_manifest_hashes.push(p2_manifest);
}
(parents, parent_manifest_hashes, bonsai_parents)
},
)
.timed(move |stats, result| {
if result.is_ok() {
scuba_logger
.add_future_stats(&stats)
.log_with_msg("Wait for parents ready", None);
let p1 = p1.as_ref().map(|p1| p1.completion_future.clone());
let p2 = p2.as_ref().map(|p2| p2.completion_future.clone());
async move {
if let Some(p1) = p1 {
p1.await?;
}
if let Some(p2) = p2 {
p2.await?;
}
Ok::<(), Error>(())
}
.boxed()
}
pub async fn handle_parents(
mut scuba_logger: MononokeScubaSampleBuilder,
p1: Option<ChangesetHandle>,
p2: Option<ChangesetHandle>,
) -> Result<(HgParents, Vec<HgManifestId>, Vec<ChangesetId>), Error> {
// DO NOT replace and_then() with join() or futures_ordered()!
// It may result in a combinatoral explosion in mergy repos, like the following:
// o
// |\
// | o
// |/|
// o |
// |\|
// | o
// |/|
// o |
// |\|
// ...
// |/|
// | ~
// o
// |\
// ~ ~
//
let (stats, result) = async move {
let mut bonsai_parents = Vec::new();
let mut parent_manifest_hashes = Vec::new();
let p1_hash = match p1 {
Some(p1) => {
let (bonsai_cs_id, hash, manifest) = p1.can_be_parent.await?;
bonsai_parents.push(bonsai_cs_id);
parent_manifest_hashes.push(manifest);
Some(hash)
}
Ok(())
})
.boxify()
None => None,
};
let p2_hash = match p2 {
Some(p2) => {
let (bonsai_cs_id, hash, manifest) = p2.can_be_parent.await?;
bonsai_parents.push(bonsai_cs_id);
parent_manifest_hashes.push(manifest);
Some(hash)
}
None => None,
};
let parents = HgParents::new(p1_hash, p2_hash);
Ok::<_, Error>((parents, parent_manifest_hashes, bonsai_parents))
}
.try_timed()
.await?;
scuba_logger
.add_future_stats(&stats)
.log_with_msg("Wait for parents ready", None);
Ok(result)
}
pub fn make_new_changeset(

View File

@ -26,7 +26,7 @@ use cloned::cloned;
use context::CoreContext;
use fbinit::FacebookInit;
use fixtures::{create_bonsai_changeset, many_files_dirs, merge_uneven};
use futures::{compat::Future01CompatExt, FutureExt, TryFutureExt};
use futures::{compat::Future01CompatExt, future, FutureExt, TryFutureExt};
use futures_ext::{BoxFuture, FutureExt as _};
use futures_old::Future;
use maplit::btreemap;
@ -167,7 +167,7 @@ async fn create_one_changeset(fb: FacebookInit) {
vec![to_leaf(file_future), to_tree(manifest_dir_future)],
);
let bonsai_hg = commit.get_completed_changeset().compat().await.unwrap();
let bonsai_hg = commit.get_completed_changeset().await.unwrap();
let cs = &bonsai_hg.1;
assert!(cs.manifestid() == root_mfid);
assert!(cs.user() == author.as_bytes());
@ -236,7 +236,8 @@ async fn create_two_changesets(fb: FacebookInit) {
let (commit1, commit2) = (commit1
.get_completed_changeset()
.join(commit2.get_completed_changeset()))
.compat()
.join(commit2.get_completed_changeset().compat()))
.compat()
.await
.unwrap();
@ -289,7 +290,7 @@ async fn check_bonsai_creation(fb: FacebookInit) {
vec![to_leaf(file_future), to_tree(manifest_dir_future)],
);
let commit = commit.get_completed_changeset().compat().await.unwrap();
let commit = commit.get_completed_changeset().await.unwrap();
let commit = &commit.1;
let bonsai_cs_id = repo
.get_bonsai_from_hg(ctx.clone(), commit.get_changeset_id())
@ -363,9 +364,9 @@ async fn check_bonsai_creation_with_rename(fb: FacebookInit) {
)
};
let parent_cs = parent.get_completed_changeset().compat().await.unwrap();
let parent_cs = parent.get_completed_changeset().await.unwrap();
let parent_cs = &parent_cs.1;
let child_cs = child.get_completed_changeset().compat().await.unwrap();
let child_cs = child.get_completed_changeset().await.unwrap();
let child_cs = &child_cs.1;
let parent_bonsai_cs_id = repo
@ -414,7 +415,6 @@ async fn create_bad_changeset(fb: FacebookInit) {
commit
.get_completed_changeset()
.compat()
.await
.expect_err("This should fail");
}
@ -453,7 +453,6 @@ async fn upload_entries_finalize_success(fb: FacebookInit) {
.unwrap();
(entries.finalize(ctx.clone(), roothash, vec![]))
.compat()
.await
.unwrap();
}
@ -481,9 +480,7 @@ async fn upload_entries_finalize_fail(fb: FacebookInit) {
.await
.unwrap();
let res = (entries.finalize(ctx.clone(), root_mfid, vec![]))
.compat()
.await;
let res = (entries.finalize(ctx.clone(), root_mfid, vec![])).await;
assert!(res.is_err());
}
@ -871,12 +868,12 @@ async fn test_case_conflict_two_changeset(fb: FacebookInit) {
};
assert!(
commit1
.get_completed_changeset()
.join(commit2.get_completed_changeset())
.compat()
.await
.is_err()
future::try_join(
commit1.get_completed_changeset(),
commit2.get_completed_changeset(),
)
.await
.is_err()
);
}
@ -906,7 +903,7 @@ async fn test_case_conflict_inside_one_changeset(fb: FacebookInit) {
vec![to_leaf(file_future_1), to_leaf(file_future_2)],
);
assert!((commit1.get_completed_changeset()).compat().await.is_err());
assert!((commit1.get_completed_changeset()).await.is_err());
}
#[fbinit::compat_test]
@ -953,10 +950,10 @@ async fn test_no_case_conflict_removal(fb: FacebookInit) {
};
assert!(
(commit1
.get_completed_changeset()
.join(commit2.get_completed_changeset()))
.compat()
future::try_join(
commit1.get_completed_changeset(),
commit2.get_completed_changeset()
)
.await
.is_ok()
);
@ -1025,10 +1022,10 @@ async fn test_no_case_conflict_removal_dir(fb: FacebookInit) {
};
assert!(
(commit1
.get_completed_changeset()
.join(commit2.get_completed_changeset()))
.compat()
future::try_join(
commit1.get_completed_changeset(),
commit2.get_completed_changeset()
)
.await
.is_ok()
);

View File

@ -20,7 +20,7 @@ use futures::{
compat::{Future01CompatExt, Stream01CompatExt},
future::{self, try_join_all},
stream::{self, BoxStream},
try_join, Future, StreamExt, TryFutureExt, TryStreamExt,
try_join, Future, StreamExt, TryStreamExt,
};
use hooks::HookRejectionInfo;
use lazy_static::lazy_static;
@ -1246,11 +1246,7 @@ impl<'r> Bundle2Resolver<'r> {
let uploaded: Vec<(BonsaiChangeset, HgChangesetId)> = stream::iter(uploaded_changesets)
.map(move |(hg_cs_id, handle): (HgChangesetId, _)| async move {
let shared_item_bcs_and_something = handle
.get_completed_changeset()
.compat()
.map_err(Error::from)
.await?;
let shared_item_bcs_and_something = handle.get_completed_changeset().await?;
let bcs = shared_item_bcs_and_something.0.clone();
Result::<_, Error>::Ok((bcs, hg_cs_id))

View File

@ -49,6 +49,7 @@
* lfs_upload: importing blob Sha256(cc216c8df3beca4da80c551d178260b2cb844e04f7f7aa943d8c665162abca14) (glob)
* failed to blobimport: While uploading changeset: 527169d71e0eac8abd0a25d18520cb3b8371edb5 (glob)
* cause: While creating Changeset Some(HgNodeHash(Sha1(527169d71e0eac8abd0a25d18520cb3b8371edb5))), uuid: * (glob)
* [main] eden/mononoke/blobimport_lib/src/lib.rs:163] cause: While creating and verifying Changeset for blobstore (glob)
* cause: While processing entries (glob)
* cause: While uploading child entries (glob)
* cause: Error starting lfs_helper: "$TESTTMP/lfs" (glob)

View File

@ -75,6 +75,8 @@ filenode won't be send at all
remote: Caused by:
remote: While creating Changeset Some(HgNodeHash(Sha1(cb67355f234869bb9bf94787d5a69e21e23a8c9b))), uuid: * (glob)
remote: Caused by:
remote: While creating and verifying Changeset for blobstore
remote: Caused by:
remote: While processing entries
remote: Caused by:
remote: While uploading child entries
@ -90,23 +92,26 @@ filenode won't be send at all
remote: error: Error {
remote: context: "While creating Changeset Some(HgNodeHash(Sha1(cb67355f234869bb9bf94787d5a69e21e23a8c9b))), uuid: *", (glob)
remote: source: Error {
remote: context: "While processing entries",
remote: context: "While creating and verifying Changeset for blobstore",
remote: source: Error {
remote: context: "While uploading child entries",
remote: context: "While processing entries",
remote: source: Error {
remote: context: "While walking dependencies of Root Manifest with id HgManifestId(HgNodeHash(Sha1(314550e1ace48fe6245515c137b38ea8aeb04c7d)))",
remote: source: SharedError {
remote: error: InconsistentEntryHashForPath(
remote: FilePath(
remote: MPath("file"),
remote: context: "While uploading child entries",
remote: source: Error {
remote: context: "While walking dependencies of Root Manifest with id HgManifestId(HgNodeHash(Sha1(314550e1ace48fe6245515c137b38ea8aeb04c7d)))",
remote: source: SharedError {
remote: error: InconsistentEntryHashForPath(
remote: FilePath(
remote: MPath("file"),
remote: ),
remote: HgNodeHash(
remote: Sha1(979d39e9dea4d1f3f1fea701fd4d3bae43eef76b),
remote: ),
remote: HgNodeHash(
remote: Sha1(d159b93d975921924ad128d6a46ef8b1b8f28ba5),
remote: ),
remote: ),
remote: HgNodeHash(
remote: Sha1(979d39e9dea4d1f3f1fea701fd4d3bae43eef76b),
remote: ),
remote: HgNodeHash(
remote: Sha1(d159b93d975921924ad128d6a46ef8b1b8f28ba5),
remote: ),
remote: ),
remote: },
remote: },
remote: },
remote: },

View File

@ -212,6 +212,8 @@ Change "sha256:oid" to an another valid oid to check sha1 consisnency
remote: Caused by:
remote: While creating Changeset Some(HgNodeHash(Sha1(77f499cb064550703c65d943b8ce1b982a1293cd))), uuid: * (glob)
remote: Caused by:
remote: While creating and verifying Changeset for blobstore
remote: Caused by:
remote: While processing entries
remote: Caused by:
remote: While uploading child entries
@ -227,23 +229,26 @@ Change "sha256:oid" to an another valid oid to check sha1 consisnency
remote: error: Error {
remote: context: "While creating Changeset Some(HgNodeHash(Sha1(77f499cb064550703c65d943b8ce1b982a1293cd))), uuid: *", (glob)
remote: source: Error {
remote: context: "While processing entries",
remote: context: "While creating and verifying Changeset for blobstore",
remote: source: Error {
remote: context: "While uploading child entries",
remote: context: "While processing entries",
remote: source: Error {
remote: context: "While walking dependencies of Root Manifest with id HgManifestId(HgNodeHash(Sha1(a1da9053000e0fb9217762d82ba5db793cfb26ce)))",
remote: source: SharedError {
remote: error: InconsistentEntryHashForPath(
remote: FilePath(
remote: MPath("inconsistent_file"),
remote: context: "While uploading child entries",
remote: source: Error {
remote: context: "While walking dependencies of Root Manifest with id HgManifestId(HgNodeHash(Sha1(a1da9053000e0fb9217762d82ba5db793cfb26ce)))",
remote: source: SharedError {
remote: error: InconsistentEntryHashForPath(
remote: FilePath(
remote: MPath("inconsistent_file"),
remote: ),
remote: HgNodeHash(
remote: Sha1(ef5953d600ca68bacb539eab8dffb415441213bb),
remote: ),
remote: HgNodeHash(
remote: Sha1(232ec9b974a9df3d48c2b740396691fb8939976c),
remote: ),
remote: ),
remote: HgNodeHash(
remote: Sha1(ef5953d600ca68bacb539eab8dffb415441213bb),
remote: ),
remote: HgNodeHash(
remote: Sha1(232ec9b974a9df3d48c2b740396691fb8939976c),
remote: ),
remote: ),
remote: },
remote: },
remote: },
remote: },

View File

@ -37,7 +37,7 @@ Push the commit
remote: Caused by:
remote: While creating Changeset Some(HgNodeHash(Sha1(143fbdc73580e33c8432457df2a10e1038936a72))), uuid: * (glob)
remote: Caused by:
remote: While computing changed files
remote: While creating and verifying Changeset for blobstore
remote: Caused by:
remote: CaseConflict: the changes introduced by this commit have conflicting case. The first offending path is 'Foo.txt', and conflicted with 'foo.txt'. Resolve the conflict.
remote:
@ -48,7 +48,7 @@ Push the commit
remote: error: Error {
remote: context: "While creating Changeset Some(HgNodeHash(Sha1(143fbdc73580e33c8432457df2a10e1038936a72))), uuid: *", (glob)
remote: source: Error {
remote: context: "While computing changed files",
remote: context: "While creating and verifying Changeset for blobstore",
remote: source: InternalCaseConflict(
remote: MPath("Foo.txt"),
remote: MPath("foo.txt"),