upload_changesets: migrate the main fn to async/await

Summary:
See bottom diff of this stack for overview.

This diff in particular asyncifies the `upload_changeset` fn. Apart from that,
it also makes sure it can accept `&RevlogChangeset` instead of
`RevlogChangeset`, which helps us to get rid of cloning.

Reviewed By: krallin

Differential Revision: D20693932

fbshipit-source-id: b0e5e1604cbfb6f6b6e269c85a79208115325734
This commit is contained in:
Kostia Balytskyi 2020-03-30 12:17:03 -07:00 committed by Facebook GitHub Bot
parent 19fff610d7
commit d7af87342b
2 changed files with 45 additions and 44 deletions

View File

@ -990,14 +990,13 @@ impl Bundle2Resolver {
self.repo.clone(),
self.ctx.scuba().clone(),
*node,
revlog_cs.clone(),
revlog_cs,
uploaded_changesets,
&filelogs,
&manifests,
&content_blobs,
self.pushrebase_flags.casefolding_check,
)
.compat()
.await
.with_context(err_context)?;
}

View File

@ -7,17 +7,18 @@
use crate::stats::*;
use crate::upload_blobs::UploadableHgBlob;
use anyhow::{bail, Error, Result};
use anyhow::{bail, Context, Error, Result};
use blobrepo::{BlobRepo, ChangesetHandle, CreateChangeset};
use context::CoreContext;
use failure_ext::{Compat, FutureFailureErrorExt, StreamFailureErrorExt};
use failure_ext::{Compat, StreamFailureErrorExt};
use futures_ext::{
try_boxfuture as try_oldboxfuture, BoxFuture as OldBoxFuture, BoxStream as OldBoxStream,
FutureExt as OldFutureExt, StreamExt as OldStreamExt,
BoxFuture as OldBoxFuture, BoxStream as OldBoxStream, FutureExt as OldFutureExt,
StreamExt as OldStreamExt,
};
use futures_old::future::{self as old_future, ok, Shared};
use futures_old::Future as OldFuture;
use futures_old::{stream as old_stream, Stream as OldStream};
use futures_util::compat::Future01CompatExt;
use mercurial_revlog::{
changeset::RevlogChangeset,
manifest::{Details, ManifestContent},
@ -276,64 +277,65 @@ fn get_parent(
ok(res)
}
pub fn upload_changeset(
pub async fn upload_changeset(
ctx: CoreContext,
repo: BlobRepo,
scuba_logger: ScubaSampleBuilder,
node: HgChangesetId,
revlog_cs: RevlogChangeset,
revlog_cs: &RevlogChangeset,
mut uploaded_changesets: UploadedChangesets,
filelogs: &Filelogs,
manifests: &Manifests,
content_blobs: &ContentBlobs,
must_check_case_conflicts: bool,
) -> OldBoxFuture<UploadedChangesets, Error> {
let (p1, p2) = {
(
get_parent(ctx.clone(), &repo, &uploaded_changesets, revlog_cs.p1),
get_parent(ctx.clone(), &repo, &uploaded_changesets, revlog_cs.p2),
)
};
) -> Result<UploadedChangesets, Error> {
let NewBlobs {
root_manifest,
sub_entries,
// XXX use these content blobs in the future
content_blobs: _content_blobs,
} = try_oldboxfuture!(NewBlobs::new(
} = NewBlobs::new(
revlog_cs.manifestid(),
&manifests,
&filelogs,
&content_blobs,
repo.clone(),
));
)?;
// DO NOT replace and_then() with join() or futures_ordered()!
let cs_metadata = ChangesetMetadata {
user: String::from_utf8(revlog_cs.user().into())?,
time: revlog_cs.time().clone(),
extra: revlog_cs.extra().clone(),
comments: String::from_utf8(revlog_cs.comments().into())?,
};
// DO NOT try to comute p1 and p2 concurrently!
// It may result in a combinatoral explosion in mergy repos (see D14100259)
p1.and_then(|p1| p2.map(|p2| (p1, p2)))
.with_context(move || format!("While fetching parents for Changeset {}", node))
.from_err()
.and_then(move |(p1, p2)| {
let cs_metadata = ChangesetMetadata {
user: String::from_utf8(revlog_cs.user().into())?,
time: revlog_cs.time().clone(),
extra: revlog_cs.extra().clone(),
comments: String::from_utf8(revlog_cs.comments().into())?,
};
let create_changeset = CreateChangeset {
expected_nodeid: Some(node.into_nodehash()),
expected_files: Some(Vec::from(revlog_cs.files())),
p1,
p2,
root_manifest,
sub_entries,
// XXX pass content blobs to CreateChangeset here
cs_metadata,
must_check_case_conflicts,
};
let scheduled_uploading = create_changeset.create(ctx, &repo, scuba_logger);
uploaded_changesets.insert(node, scheduled_uploading);
Ok(uploaded_changesets)
})
let p1 = get_parent(ctx.clone(), &repo, &uploaded_changesets, revlog_cs.p1)
.boxify()
.compat()
.await
.with_context(move || format!("While fetching parents for Changeset {}", node))?;
let p2 = get_parent(ctx.clone(), &repo, &uploaded_changesets, revlog_cs.p2)
.boxify()
.compat()
.await
.with_context(move || format!("While fetching parents for Changeset {}", node))?;
let create_changeset = CreateChangeset {
expected_nodeid: Some(node.into_nodehash()),
expected_files: Some(Vec::from(revlog_cs.files())),
p1,
p2,
root_manifest,
sub_entries,
// XXX pass content blobs to CreateChangeset here
cs_metadata,
must_check_case_conflicts,
};
let scheduled_uploading = create_changeset.create(ctx, &repo, scuba_logger);
uploaded_changesets.insert(node, scheduled_uploading);
Ok(uploaded_changesets)
}