resolver.rs: migrate resolve_pushrebase to async/await

Summary: See bottom diff of the stack for overview.

Reviewed By: StanislavGlebik

Differential Revision: D20633227

fbshipit-source-id: 16b3f3a764a75261da0585c9724a17853e865681
This commit is contained in:
Kostia Balytskyi 2020-03-27 03:56:35 -07:00 committed by Facebook GitHub Bot
parent 0d4a315fa6
commit eda4632327

View File

@ -31,7 +31,7 @@ use futures_ext::{
};
use futures_old::future::{self as old_future, err, Shared};
use futures_old::stream as old_stream;
use futures_old::{Future as OldFuture, IntoFuture, Stream as OldStream};
use futures_old::{Future as OldFuture, Stream as OldStream};
use futures_util::{
compat::Future01CompatExt, try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt,
};
@ -301,15 +301,22 @@ pub fn resolve(
fn changegroup_always_unacceptable() -> bool {
false
};
resolve_pushrebase(
ctx,
commonheads,
resolver,
bundle2,
maybe_pushvars,
maybe_full_content,
changegroup_always_unacceptable,
)
async move {
resolve_pushrebase(
ctx,
commonheads,
resolver,
bundle2,
maybe_pushvars,
maybe_full_content,
changegroup_always_unacceptable,
)
.await
}
.boxed()
.compat()
.from_err()
.boxify()
}
} else {
async move {
@ -426,7 +433,7 @@ impl<T: Copy> PushrebaseBookmarkSpec<T> {
}
}
fn resolve_pushrebase(
async fn resolve_pushrebase(
ctx: CoreContext,
commonheads: CommonHeads,
resolver: Bundle2Resolver,
@ -434,198 +441,109 @@ fn resolve_pushrebase(
maybe_pushvars: Option<HashMap<String, Bytes>>,
maybe_full_content: Option<Arc<Mutex<BytesOld>>>,
changegroup_acceptable: impl FnOnce() -> bool + Send + Sync + 'static,
) -> OldBoxFuture<PostResolveAction, BundleResolverError> {
{
cloned!(ctx, resolver);
async move {
resolver
.resolve_b2xtreegroup2(ctx, bundle2)
.await
.context("While resolving B2xTreegroup2")
) -> Result<PostResolveAction, BundleResolverError> {
let (manifests, bundle2) = resolver
.resolve_b2xtreegroup2(ctx.clone(), bundle2)
.await
.context("While resolving B2xTreegroup2")?;
let (maybe_cg_push, bundle2) = resolver
.maybe_resolve_changegroup(ctx.clone(), bundle2, changegroup_acceptable)
.await
.context("While resolving Changegroup")?;
let cg_push = maybe_cg_push.ok_or(Error::msg("Empty pushrebase"))?;
let onto_params = match cg_push.mparams.get("onto").cloned() {
Some(onto_bookmark) => {
let v = Vec::from(onto_bookmark.as_ref());
let onto_bookmark = String::from_utf8(v).map_err(Error::from)?;
let onto_bookmark = BookmarkName::new(onto_bookmark)?;
let onto_bookmark = pushrebase::OntoBookmarkParams::new(onto_bookmark);
onto_bookmark
}
None => return Err(format_err!("onto is not specified").into()),
};
let changesets = cg_push.changesets.clone();
let will_rebase = onto_params.bookmark != *DONOTREBASEBOOKMARK;
// Mutation information must not be present in public commits
// See T54101162, S186586
if !will_rebase {
for (_, hg_cs) in &changesets {
for key in pushrebase::MUTATION_KEYS {
if hg_cs.extra.as_ref().contains_key(key.as_bytes()) {
return Err(Error::msg("Forced push blocked because it contains mutation metadata.\n\
You can remove the metadata from a commit with `hg amend --config mutation.record=false`.\n\
For more help, please contact the Source Control team at https://fburl.com/27qnuyl2").into());
}
}
}
}
.boxed()
.compat()
.and_then({
cloned!(ctx, resolver);
move |(manifests, bundle2)| {
async move {
resolver
.maybe_resolve_changegroup(ctx, bundle2, changegroup_acceptable)
.await
.context("While resolving Changegroup")
}
.boxed()
.compat()
.map(move |(cg_push, bundle2)| (cg_push, manifests, bundle2))
}
})
.and_then(|(cg_push, manifests, bundle2)| {
cg_push
.ok_or(Error::msg("Empty pushrebase"))
.into_future()
.map(move |cg_push| (cg_push, manifests, bundle2))
})
.and_then(
|(cg_push, manifests, bundle2)| match cg_push.mparams.get("onto").cloned() {
Some(onto_bookmark) => {
let v = Vec::from(onto_bookmark.as_ref());
let onto_bookmark = String::from_utf8(v)?;
let onto_bookmark = BookmarkName::new(onto_bookmark)?;
let onto_bookmark = pushrebase::OntoBookmarkParams::new(onto_bookmark);
Ok((onto_bookmark, cg_push, manifests, bundle2))
}
None => bail!("onto is not specified"),
},
)
.and_then({
cloned!(ctx, resolver);
move |(onto_params, cg_push, manifests, bundle2)| {
let changesets = cg_push.changesets.clone();
let will_rebase = onto_params.bookmark != *DONOTREBASEBOOKMARK;
// Mutation information must not be present in public commits
// See T54101162, S186586
if !will_rebase {
for (_, hg_cs) in &changesets {
for key in pushrebase::MUTATION_KEYS {
if hg_cs.extra.as_ref().contains_key(key.as_bytes()) {
return old_future::err(Error::msg("Forced push blocked because it contains mutation metadata.\n\
You can remove the metadata from a commit with `hg amend --config mutation.record=false`.\n\
For more help, please contact the Source Control team at https://fburl.com/27qnuyl2")).left_future();
}
}
}
}
async move {
resolver
.upload_changesets(
ctx,
cg_push,
manifests,
).await
}
.boxed()
.compat()
.map(move |(upload_map, uploaded_hg_changeset_ids)| (changesets, onto_params, bundle2, upload_map, uploaded_hg_changeset_ids)).right_future()
}
})
.and_then({
cloned!(resolver);
move |(changesets, onto_params, bundle2, upload_map, uploaded_hg_changeset_ids)| {
{
cloned!(resolver);
async move {
resolver.resolve_multiple_parts(bundle2, Bundle2Resolver::maybe_resolve_pushkey)
.await
.context("While resolving Pushkey")
}
}
.boxed()
.compat()
.and_then({
cloned!(resolver);
move |(pushkeys, bundle2)| {
let bookmark_pushes = collect_pushkey_bookmark_pushes(pushkeys);
if bookmark_pushes.len() > 1 {
return old_future::err(format_err!(
"Too many pushkey parts: {:?}",
bookmark_pushes
))
.boxify();
}
let (uploaded_bonsais, uploaded_hg_changeset_ids) = resolver
.upload_changesets(ctx.clone(), cg_push, manifests)
.await?;
let (bookmark_push_part_id, bookmark_spec) = match bookmark_pushes
.get(0)
{
Some(bk_push)
if bk_push.name != onto_params.bookmark
&& onto_params.bookmark != *DONOTREBASEBOOKMARK =>
{
return old_future::err(format_err!(
"allowed only pushes of {} bookmark: {:?}",
onto_params.bookmark,
bookmark_pushes
))
.boxify();
}
Some(bk_push) if onto_params.bookmark == *DONOTREBASEBOOKMARK => {
(
// This is a force pushrebase scenario. We need to ignore `onto_params`
// and run normal push (using bk_push), but generate a pushrebase
// response.
// See comment next to DONOTREBASEBOOKMARK definition
Some(bk_push.part_id),
PushrebaseBookmarkSpec::ForcePushrebase(bk_push.clone()),
)
}
Some(bk_push) => (
Some(bk_push.part_id),
PushrebaseBookmarkSpec::NormalPushrebase(onto_params),
),
None => {
(None, PushrebaseBookmarkSpec::NormalPushrebase(onto_params))
}
};
let (pushkeys, bundle2) = resolver
.resolve_multiple_parts(bundle2, Bundle2Resolver::maybe_resolve_pushkey)
.await
.context("While resolving Pushkey")?;
async move {
resolver
.ensure_stream_finished(bundle2, maybe_full_content)
.await
}
.boxed()
.compat()
.map(move |maybe_raw_bundle2_id| {
(
changesets,
bookmark_push_part_id,
bookmark_spec,
maybe_raw_bundle2_id,
upload_map,
uploaded_hg_changeset_ids,
)
})
.boxify()
}
})
}
})
.and_then({
cloned!(ctx, resolver);
move |(changesets, bookmark_push_part_id, bookmark_spec, maybe_raw_bundle2_id, uploaded_bonsais, uploaded_hg_changeset_ids)| {
async move { hg_pushrebase_bookmark_spec_to_bonsai(ctx, &resolver.repo, bookmark_spec).await }.boxed().compat()
.map(move |bookmark_spec| (changesets, bookmark_push_part_id, bookmark_spec, maybe_raw_bundle2_id, uploaded_bonsais, uploaded_hg_changeset_ids))
}
})
.map({
move |(changesets, bookmark_push_part_id, bookmark_spec, maybe_raw_bundle2_id, uploaded_bonsais, uploaded_hg_changeset_ids)| {
let any_merges = changesets
.iter()
.any(|(_, revlog_cs)| revlog_cs.p1.is_some() && revlog_cs.p2.is_some());
let bookmark_pushes = collect_pushkey_bookmark_pushes(pushkeys);
if bookmark_pushes.len() > 1 {
return Err(format_err!("Too many pushkey parts: {:?}", bookmark_pushes).into());
}
let repo = resolver.repo.clone();
let maybe_hg_replay_data = maybe_raw_bundle2_id
.map(|raw_bundle2_id| {
HgReplayData::new_with_simple_convertor(
ctx.clone(),
raw_bundle2_id,
repo,
)
});
PostResolveAction::PushRebase(PostResolvePushRebase {
any_merges,
bookmark_push_part_id,
bookmark_spec,
maybe_hg_replay_data,
maybe_pushvars,
commonheads,
uploaded_bonsais,
uploaded_hg_changeset_ids,
})
}
})
.from_err()
.boxify()
let (bookmark_push_part_id, bookmark_spec) = match bookmark_pushes.get(0) {
Some(bk_push)
if bk_push.name != onto_params.bookmark
&& onto_params.bookmark != *DONOTREBASEBOOKMARK =>
{
return Err(format_err!(
"allowed only pushes of {} bookmark: {:?}",
onto_params.bookmark,
bookmark_pushes
)
.into());
}
Some(bk_push) if onto_params.bookmark == *DONOTREBASEBOOKMARK => {
(
// This is a force pushrebase scenario. We need to ignore `onto_params`
// and run normal push (using bk_push), but generate a pushrebase
// response.
// See comment next to DONOTREBASEBOOKMARK definition
Some(bk_push.part_id),
PushrebaseBookmarkSpec::ForcePushrebase(bk_push.clone()),
)
}
Some(bk_push) => (
Some(bk_push.part_id),
PushrebaseBookmarkSpec::NormalPushrebase(onto_params),
),
None => (None, PushrebaseBookmarkSpec::NormalPushrebase(onto_params)),
};
let maybe_raw_bundle2_id = resolver
.ensure_stream_finished(bundle2, maybe_full_content)
.await?;
let bookmark_spec =
hg_pushrebase_bookmark_spec_to_bonsai(ctx.clone(), &resolver.repo, bookmark_spec).await?;
let any_merges = changesets
.iter()
.any(|(_, revlog_cs)| revlog_cs.p1.is_some() && revlog_cs.p2.is_some());
let repo = resolver.repo.clone();
let maybe_hg_replay_data = maybe_raw_bundle2_id.map(|raw_bundle2_id| {
HgReplayData::new_with_simple_convertor(ctx.clone(), raw_bundle2_id, repo)
});
Ok(PostResolveAction::PushRebase(PostResolvePushRebase {
any_merges,
bookmark_push_part_id,
bookmark_spec,
maybe_hg_replay_data,
maybe_pushvars,
commonheads,
uploaded_bonsais,
uploaded_hg_changeset_ids,
}))
}
/// Do the right thing when pushrebase-enabled client only wants to manipulate bookmarks