mononoke: return bundle to the client in pushrebase

Summary:
Pushrebase should send back the newly created commits. This diff adds this
functionality.

Note that it fetches both pushrebased commit and current "onto" bookmark.
Normally they should be the same, however they maybe different if bookmark
suddenly moved before current pushrebase finished.

Reviewed By: lukaspiatkowski

Differential Revision: D9635433

fbshipit-source-id: 12a076cc95f55b1af49690d236cee567429aef93
This commit is contained in:
Stanislau Hlebik 2018-09-06 06:40:38 -07:00 committed by Facebook Github Bot
parent 045623e7c7
commit cab68edc75
2 changed files with 109 additions and 63 deletions

View File

@ -19,6 +19,7 @@ use futures::{Future, IntoFuture, Stream};
use futures::future::{self, err, ok, Shared};
use futures::stream;
use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
use getbundle_response;
use mercurial::changeset::RevlogChangeset;
use mercurial::manifest::{Details, ManifestContent};
use mercurial_bundles::{parts, Bundle2EncodeBuilder, Bundle2Item};
@ -175,7 +176,7 @@ fn resolve_push(
}
})
.and_then(move |(changegroup_id, bookmark_ids)| {
resolver.prepare_response(changegroup_id, bookmark_ids)
resolver.prepare_push_response(changegroup_id, bookmark_ids)
})
.context("bundle2-resolver error")
.from_err()
@ -183,7 +184,7 @@ fn resolve_push(
}
fn resolve_pushrebase(
_commonheads: CommonHeads,
commonheads: CommonHeads,
resolver: Bundle2Resolver,
bundle2: BoxStream<Bundle2Item, Error>,
) -> BoxFuture<Bytes, Error> {
@ -203,19 +204,30 @@ fn resolve_pushrebase(
.into_future()
.map(|cg_push| (cg_push, manifests, bundle2))
})
.and_then(
|(cg_push, manifests, bundle2)| match cg_push.mparams.get("onto").cloned() {
Some(onto_bookmark) => {
let v = Vec::from(onto_bookmark.as_ref());
let onto_bookmark = String::from_utf8(v)?;
let onto_bookmark = Bookmark::new(onto_bookmark)?;
Ok((onto_bookmark, cg_push, manifests, bundle2))
}
None => Err(err_msg("onto is not specified")),
},
)
.and_then({
cloned!(resolver);
move |(cg_push, manifests, bundle2)| {
move |(onto, cg_push, manifests, bundle2)| {
let changesets = cg_push.changesets.clone();
let mparams = cg_push.mparams.clone();
resolver
.upload_changesets(cg_push, manifests)
.map(move |()| (changesets, mparams, bundle2))
.map(move |()| (changesets, onto, bundle2))
}
})
.and_then({
cloned!(resolver);
move |(changesets, mparams, bundle2)| {
move |(changesets, onto, bundle2)| {
resolver
.resolve_multiple_parts(bundle2, Bundle2Resolver::maybe_resolve_pushkey)
.and_then({
@ -231,30 +243,24 @@ fn resolve_pushrebase(
resolver
.ensure_stream_finished(bundle2)
.map(move |()| (changesets, bookmark_pushes, mparams))
.map(move |()| (changesets, bookmark_pushes, onto))
}
})
}
})
.and_then({
cloned!(resolver);
move |(changesets, bookmark_pushes, mparams)| {
resolver.pushrebase(changesets, bookmark_pushes, mparams)
move |(changesets, bookmark_pushes, onto)| {
resolver
.pushrebase(changesets, bookmark_pushes, &onto)
.map(|pushrebased_rev| (pushrebased_rev, onto))
}
})
.and_then(|_| {
let writer = Cursor::new(Vec::new());
let mut bundle = Bundle2EncodeBuilder::new(writer);
// Mercurial currently hangs while trying to read compressed bundles over the wire:
// https://bz.mercurial-scm.org/show_bug.cgi?id=5646
// TODO: possibly enable compression support once this is fixed.
bundle.set_compressor_type(None);
bundle
.build()
.map(|cursor| Bytes::from(cursor.into_inner()))
.context("While preparing response")
.from_err()
.boxify()
.and_then({
cloned!(resolver);
move |(pushrebased_rev, onto)| {
resolver.prepare_pushrebase_response(commonheads, pushrebased_rev, onto)
}
})
.boxify()
}
@ -274,7 +280,7 @@ struct ChangegroupPush {
}
struct CommonHeads {
_heads: Vec<HgChangesetId>,
heads: Vec<HgChangesetId>,
}
enum Pushkey {
@ -384,7 +390,7 @@ impl Bundle2Resolver {
Some(Bundle2Item::B2xCommonHeads(_header, heads)) => heads
.collect()
.map(|heads| {
let heads = CommonHeads { _heads: heads };
let heads = CommonHeads { heads };
(Some(heads), bundle2)
})
.boxify(),
@ -692,7 +698,7 @@ impl Bundle2Resolver {
/// Takes a changegroup id and prepares a Bytes response containing Bundle2 with reply to
/// changegroup part saying that the push was successful
fn prepare_response(
fn prepare_push_response(
&self,
changegroup_id: Option<PartId>,
bookmark_ids: Vec<PartId>,
@ -720,6 +726,42 @@ impl Bundle2Resolver {
.boxify()
}
fn prepare_pushrebase_response(
&self,
commonheads: CommonHeads,
pushrebased_rev: ChangesetId,
onto: Bookmark,
) -> BoxFuture<Bytes, Error> {
// Send to the client both pushrebased commit and current "onto" bookmark. Normally they
// should be the same, however they might be different if bookmark
// suddenly moved before current pushrebase finished.
let repo: BlobRepo = (*self.repo).clone();
let common = commonheads.heads;
let maybe_onto_head = repo.get_bookmark(&onto);
let pushrebased_rev = repo.get_hg_from_bonsai_changeset(pushrebased_rev);
maybe_onto_head
.join(pushrebased_rev)
.and_then(move |(maybe_onto_head, pushrebased_rev)| {
let mut heads = vec![];
if let Some(onto_head) = maybe_onto_head {
heads.push(onto_head);
}
heads.push(pushrebased_rev);
getbundle_response::create_getbundle_response(repo, common, heads)
})
.and_then(|bundle2_builder| {
bundle2_builder
.build()
.map(|cursor| Bytes::from(cursor.into_inner()))
.context("While preparing response")
.from_err()
.boxify()
})
.boxify()
}
/// A method that can use any of the above maybe_resolve_* methods to return
/// a Vec of (potentailly multiple) Part rather than an Option of Part.
/// The original use case is to parse multiple pushkey Parts since bundle2 gets
@ -752,42 +794,32 @@ impl Bundle2Resolver {
&self,
changesets: Changesets,
bookmark_pushes: Vec<BookmarkPush>,
mparams: HashMap<String, Bytes>,
) -> impl Future<Item = (), Error = Error> {
onto_bookmark: &Bookmark,
) -> impl Future<Item = ChangesetId, Error = Error> {
let changesets: Vec<_> = changesets
.into_iter()
.map(|(node, _)| HgChangesetId::new(node))
.collect();
match mparams.get("onto").cloned() {
Some(onto_bookmark) => {
let v = Vec::from(onto_bookmark.as_ref());
let onto_bookmark = try_boxfuture!(String::from_utf8(v));
let onto_bookmark = try_boxfuture!(Bookmark::new(onto_bookmark));
let incorrect_bookmark_pushes: Vec<_> = bookmark_pushes
.iter()
.filter(|bp| &bp.name != onto_bookmark)
.collect();
let incorrect_bookmark_pushes: Vec<_> = bookmark_pushes
.iter()
.filter(|bp| bp.name != onto_bookmark)
.collect();
if !incorrect_bookmark_pushes.is_empty() {
try_boxfuture!(Err(err_msg(format!(
"allowed only pushes of {} bookmark: {:?}",
onto_bookmark, bookmark_pushes
))))
}
pushrebase::do_pushrebase(
self.repo.clone(),
self.pushrebase.clone(),
onto_bookmark,
changesets,
).map(|_| ())
.map_err(|err| err_msg(format!("pushrebase failed {:?}", err)))
.boxify()
}
None => Err(err_msg("onto is not specified")).into_future().boxify(),
if !incorrect_bookmark_pushes.is_empty() {
try_boxfuture!(Err(err_msg(format!(
"allowed only pushes of {} bookmark: {:?}",
onto_bookmark, bookmark_pushes
))))
}
pushrebase::do_pushrebase(
self.repo.clone(),
self.pushrebase.clone(),
onto_bookmark.clone(),
changesets,
).map_err(|err| err_msg(format!("pushrebase failed {:?}", err)))
.boxify()
}
}

View File

@ -35,32 +35,38 @@ start mononoke
$ wait_for_mononoke $TESTTMP/repo
Clone the repo
$ hgclone_treemanifest ssh://user@dummy/repo-hg repo2 --noupdate -q
$ hgclone_treemanifest ssh://user@dummy/repo-hg repo2 --noupdate --config extensions.remotenames= -q
$ cd repo2
$ setup_hg_client
$ cat >> .hg/hgrc <<EOF
> [extensions]
> pushrebase =
> remotenames =
> EOF
$ hg up -q 0
$ echo 1 > 1 && hg add 1 && hg ci -m 1
$ hgmn push -r . --to master_bookmark
pushing to ssh://user@dummy/repo
remote: * DEBG Session with Mononoke started with uuid: * (glob)
pushing rev a0c9c5791058 to destination ssh://user@dummy/repo bookmark master_bookmark
searching for changes
adding changesets
adding manifests
adding file changes
added 1 changesets with 0 changes to 0 files
server ignored bookmark master_bookmark update
remote: * DEBG Session with Mononoke started with uuid: * (glob)
TODO(stash): pushrebase of a merge commit, pushrebase over a merge commit
$ hgmn pull -q
$ hgmn up master_bookmark
remote: * DEBG Session with Mononoke started with uuid: * (glob)
2 files updated, 0 files merged, 0 files removed, 0 files unresolved
(activating bookmark master_bookmark)
$ hg sl -r ":"
@ changeset: 4:c2e526aacb51
| bookmark: master_bookmark
| tag: tip
| bookmark: default/master_bookmark
| hoistedname: master_bookmark
| parent: 2:26805aba1e60
| user: test
| date: Thu Jan 01 00:00:00 1970 +0000
@ -87,12 +93,14 @@ TODO(stash): pushrebase of a merge commit, pushrebase over a merge commit
date: Thu Jan 01 00:00:00 1970 +0000
summary: A
Push rebase fails with conflict
$ hg up -q 0
$ echo 1 > 1 && hg add 1 && hg ci -m 1
$ hgmn push -r . --to master_bookmark
pushing to ssh://user@dummy/repo
remote: * Session with Mononoke started with uuid: * (glob)
pushing rev a0c9c5791058 to destination ssh://user@dummy/repo bookmark master_bookmark
searching for changes
remote: * pushrebase failed * (glob)
remote: msg: "pushrebase failed Conflicts([PushrebaseConflict { left: MPath([49] \"1\"), right: MPath([49] \"1\") }])"
@ -105,15 +113,20 @@ Push stack
$ echo 2 > 2 && hg add 2 && hg ci -m 2
$ echo 3 > 3 && hg add 3 && hg ci -m 3
$ hgmn push -r . --to master_bookmark
pushing to ssh://user@dummy/repo
remote: * DEBG Session with Mononoke started with uuid: * (glob)
pushing rev 3953a5b36168 to destination ssh://user@dummy/repo bookmark master_bookmark
searching for changes
$ hgmn pull -q
adding changesets
adding manifests
adding file changes
added 2 changesets with 0 changes to 0 files
server ignored bookmark master_bookmark update
$ hgmn up -q master_bookmark
$ hg sl -r ":"
@ changeset: 8:6398085ceb9d
| bookmark: master_bookmark
| tag: tip
| bookmark: default/master_bookmark
| hoistedname: master_bookmark
| user: test
| date: Thu Jan 01 00:00:00 1970 +0000
| summary: 3
@ -162,3 +175,4 @@ Push stack
date: Thu Jan 01 00:00:00 1970 +0000
summary: A