mononoke: pushrebase for 1 commit

Summary:
Initial implementation of pushrebase. At the moment it processes just one commit, but after implementing stub function it should work for every case.

Note that there is a special PushrebaseError. This is necessary to distinguish between infra error (i.e. mysql is unavailable) and expected errors like conflicts.

Reviewed By: aslpavel

Differential Revision: D9306815

fbshipit-source-id: 7c3f91b17c6270537d63e8c9dba8116f96840ece
This commit is contained in:
Stanislau Hlebik 2018-08-17 06:51:01 -07:00 committed by Facebook Github Bot
parent 6bb09bbb98
commit 9e24fff2fd
5 changed files with 470 additions and 29 deletions

View File

@ -6,11 +6,17 @@
pub use failure::prelude::*;
use mercurial_types::HgNodeHash;
use bookmarks::Bookmark;
use mercurial_types::{HgChangesetId, HgNodeHash};
#[derive(Debug, Fail)]
pub enum ErrorKind {
#[fail(display = "Bonsai not found for hg changeset: {:?}", _0)]
BonsaiNotFoundForHgChangeset(HgChangesetId),
#[fail(display = "Malformed treemanifest part: {}", _0)] MalformedTreemanifestPart(String),
#[fail(display = "Pushrebase onto bookmark not found: {:?}", _0)]
PushrebaseBookmarkNotFound(Bookmark),
#[fail(display = "Only one head is allowed in pushed set")] PushrebaseTooManyHeads,
#[fail(display = "Error while uploading data for changesets, hashes: {:?}", _0)]
WhileUploadingData(Vec<HgNodeHash>),
}

View File

@ -7,9 +7,15 @@
#![deny(warnings)]
extern crate ascii;
#[cfg(test)]
extern crate async_unit;
extern crate bytes;
#[macro_use]
extern crate cloned;
#[macro_use]
extern crate failure_ext as failure;
#[cfg(test)]
extern crate fixtures;
#[macro_use]
extern crate futures;
#[macro_use]

View File

@ -4,19 +4,379 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use blobrepo::BlobRepo;
/// Mononoke pushrebase implementation. The main goal of pushrebase is to decrease push contention.
/// Commits that client pushed are rebased on top of `onto_bookmark` on the server
///
/// Client
///
/// O <- `onto` on client, potentially outdated
/// |
/// O O <- pushed set (in this case just one commit)
/// | /
/// O <- root
///
/// Server
///
/// O <- update `onto` bookmark, pointing at the pushed commit
/// |
/// O <- `onto` bookmark on the server before the push
/// |
/// O
/// |
/// O
/// |
/// O <- root
///
/// Terminology:
/// *onto bookmark* - bookmark that is the destination of the rebase, for example "master"
///
/// *pushed set* - a set of commits that client has sent us.
/// Note: all pushed set MUST be committed before doing pushrebase
/// Note: pushed set MUST contain only one head
/// Note: not all commits from pushed set maybe rebased on top of onto bookmark. See *rebased set*
///
/// *root* - parents of pushed set that are not in the pushed set (see graphs above)
///
/// *rebased set* - subset of pushed set that will be rebased on top of onto bookmark
/// Note: Usually rebased set == pushed set. However in case of merges it may differ
use blobrepo::{save_bonsai_changeset, BlobRepo};
use bookmarks::Bookmark;
use errors::*;
use failure::err_msg;
use futures::Future;
use futures::future::err;
use mercurial_types::HgChangesetId;
use futures::future::{join_all, loop_fn, ok, Loop};
use futures_ext::{BoxFuture, FutureExt};
use mercurial_types::{HgChangesetId, MPath};
use mononoke_types::{BonsaiChangeset, ChangesetId};
use std::collections::HashSet;
use std::iter::FromIterator;
use std::sync::Arc;
pub fn do_pushrebase(
_repo: Arc<BlobRepo>,
_onto_bookmark: Bookmark,
_changesets: Vec<HgChangesetId>,
) -> impl Future<Item = (), Error = Error> {
err(err_msg("not implementd"))
#[allow(dead_code)]
#[derive(Debug)]
pub enum PushrebaseError {
Conflicts,
Error(Error),
}
impl From<Error> for PushrebaseError {
fn from(error: Error) -> Self {
PushrebaseError::Error(error)
}
}
/// Does a pushrebase of a list of commits `pushed_set` onto `onto_bookmark`
/// The commits from the pushed set should already be committed to the blobrepo
pub fn do_pushrebase(
repo: Arc<BlobRepo>,
onto_bookmark: Bookmark,
pushed_set: Vec<HgChangesetId>,
) -> impl Future<Item = (), Error = PushrebaseError> {
fetch_bonsai_changesets(repo.clone(), pushed_set)
.and_then(|pushed| {
let head = find_only_head_or_fail(&pushed)?;
let roots = find_roots(&pushed)?;
Ok((head, roots))
})
.and_then({
let repo = repo.clone();
let onto_bookmark = onto_bookmark.clone();
move |(head, roots)| {
find_closest_root(&repo, onto_bookmark, roots).map(move |root| (head, root))
}
})
.and_then({
let repo = repo.clone();
move |(head, root)| {
// Calculate client changed files only once, since they won't change
find_changed_files(&repo, root, head).and_then(move |client_cf| {
rebase_in_loop(repo, onto_bookmark, head, root, client_cf)
})
}
})
}
fn rebase_in_loop(
repo: Arc<BlobRepo>,
onto_bookmark: Bookmark,
head: ChangesetId,
root: ChangesetId,
client_cf: Vec<MPath>,
) -> BoxFuture<(), PushrebaseError> {
loop_fn(root, move |root| {
get_bookmark_value(&repo, &onto_bookmark).and_then({
cloned!(client_cf, onto_bookmark, repo);
move |bookmark_val| {
find_changed_files(&repo, root.clone(), bookmark_val)
.and_then(|server_cf| intersect_changed_files(server_cf, client_cf))
.and_then(move |()| {
do_rebase(repo, root, head, bookmark_val, onto_bookmark).map(
move |update_res| {
if update_res {
Loop::Break(())
} else {
Loop::Continue(bookmark_val)
}
},
)
})
}
})
}).boxify()
}
fn do_rebase(
repo: Arc<BlobRepo>,
root: ChangesetId,
head: ChangesetId,
bookmark_val: ChangesetId,
onto_bookmark: Bookmark,
) -> impl Future<Item = bool, Error = PushrebaseError> {
create_rebased_changesets(repo.clone(), root, head, bookmark_val).and_then({
move |new_head| try_update_bookmark(&repo, &onto_bookmark, bookmark_val, new_head)
})
}
fn fetch_bonsai_changesets(
repo: Arc<BlobRepo>,
commit_ids: Vec<HgChangesetId>,
) -> impl Future<Item = Vec<BonsaiChangeset>, Error = PushrebaseError> {
join_all(commit_ids.into_iter().map(move |hg_cs| {
repo.get_bonsai_from_hg(&hg_cs)
.and_then({
cloned!(hg_cs);
move |bcs_cs| bcs_cs.ok_or(ErrorKind::BonsaiNotFoundForHgChangeset(hg_cs).into())
})
.and_then({
cloned!(repo);
move |bcs_id| repo.get_bonsai_changeset(bcs_id).from_err()
})
.with_context(move |_| format!("While intitial bonsai changesets fetching"))
.map_err(Error::from)
.from_err()
}))
}
// There should only be one head in the pushed set
fn find_only_head_or_fail(
commits: &Vec<BonsaiChangeset>,
) -> ::std::result::Result<ChangesetId, PushrebaseError> {
let mut commits_set: HashSet<_> =
HashSet::from_iter(commits.iter().map(|commit| commit.get_changeset_id()));
for commit in commits {
for p in commit.parents() {
commits_set.remove(p);
}
}
if commits_set.len() == 1 {
Ok(commits_set.iter().next().unwrap().clone())
} else {
Err(PushrebaseError::Error(
ErrorKind::PushrebaseTooManyHeads.into(),
))
}
}
fn find_roots(
commits: &Vec<BonsaiChangeset>,
) -> ::std::result::Result<Vec<ChangesetId>, PushrebaseError> {
let commits_set: HashSet<_> =
HashSet::from_iter(commits.iter().map(|commit| commit.get_changeset_id()));
let mut roots = vec![];
for commit in commits {
for p in commit.parents() {
if !commits_set.contains(p) {
roots.push(p.clone());
}
}
}
Ok(roots)
}
fn find_closest_root(
_repo: &Arc<BlobRepo>,
_bookmark: Bookmark,
roots: Vec<ChangesetId>,
) -> impl Future<Item = ChangesetId, Error = PushrebaseError> {
// TODO(stash, aslpavel): actually find closest root
if roots.len() == 1 {
ok(roots.get(0).unwrap().clone())
} else {
unimplemented!()
}
}
fn find_changed_files(
_repo: &Arc<BlobRepo>,
_ancestor: ChangesetId,
_descendant: ChangesetId,
) -> impl Future<Item = Vec<MPath>, Error = PushrebaseError> {
// TODO(stash, aslpavel) actually find changed files
ok(vec![])
}
fn intersect_changed_files(
_left: Vec<MPath>,
_right: Vec<MPath>,
) -> ::std::result::Result<(), PushrebaseError> {
// TODO(stash, aslpavel) actually find intersection
Ok(())
}
fn get_bookmark_value(
repo: &Arc<BlobRepo>,
bookmark_name: &Bookmark,
) -> impl Future<Item = ChangesetId, Error = PushrebaseError> {
repo.get_bookmark(bookmark_name)
.and_then({
cloned!(bookmark_name);
move |bookmark| {
bookmark.ok_or(ErrorKind::PushrebaseBookmarkNotFound(bookmark_name).into())
}
})
.and_then({
cloned!(repo);
move |hg_bookmark_value| {
repo.get_bonsai_from_hg(&hg_bookmark_value).and_then({
cloned!(hg_bookmark_value);
move |bonsai| {
bonsai.ok_or(
ErrorKind::BonsaiNotFoundForHgChangeset(hg_bookmark_value).into(),
)
}
})
}
})
.with_context(move |_| format!("While getting bookmark value"))
.map_err(Error::from)
.from_err()
}
fn create_rebased_changesets(
repo: Arc<BlobRepo>,
root: ChangesetId,
head: ChangesetId,
onto: ChangesetId,
) -> impl Future<Item = ChangesetId, Error = PushrebaseError> {
// TODO(stash, aslpavel) at the moment it rebases just one commit
repo.get_bonsai_changeset(head)
.and_then(move |bcs| {
{
let parents: Vec<_> = bcs.parents().collect();
if parents.len() != 1 {
unimplemented!()
}
if parents != vec![&root] {
unimplemented!()
}
}
let mut bcs = bcs.into_mut();
bcs.parents[0] = onto;
bcs.freeze()
})
.and_then({
cloned!(repo);
move |bcs| {
// TODO(stash): avoid .deref().clone(), get rid of Arc<BlobRepo>
let repo: &BlobRepo = &repo;
let bcs_id = bcs.get_changeset_id();
save_bonsai_changeset(bcs, repo.clone()).map(move |()| bcs_id)
}
})
.with_context(move |_| format!("While creating rebased changesets"))
.map_err(Error::from)
.from_err()
}
fn try_update_bookmark(
repo: &Arc<BlobRepo>,
bookmark_name: &Bookmark,
old_value: ChangesetId,
new_value: ChangesetId,
) -> BoxFuture<bool, PushrebaseError> {
let mut txn = repo.update_bookmark_transaction();
try_boxfuture!(txn.update(bookmark_name, &new_value, &old_value));
txn.commit().from_err().boxify()
}
#[cfg(test)]
mod tests {
use super::*;
use async_unit;
use bytes::Bytes;
use fixtures::linear;
use mononoke_types::{BonsaiChangesetMut, DateTime, FileChange, FileContents, FileType};
use std::collections::BTreeMap;
use std::str::FromStr;
fn store_files(
files: BTreeMap<&str, Option<&str>>,
repo: BlobRepo,
) -> BTreeMap<MPath, Option<FileChange>> {
let mut res = btreemap!{};
for (path, content) in files {
let path = MPath::new(path).unwrap();
match content {
Some(content) => {
let size = content.len();
let content = FileContents::Bytes(Bytes::from(content));
let content_id = repo.unittest_store(content).wait().unwrap();
let file_change =
FileChange::new(content_id, FileType::Regular, size as u64, None);
res.insert(path, Some(file_change));
}
None => {
res.insert(path, None);
}
}
}
res
}
#[test]
fn pushrebase_one_commit() {
async_unit::tokio_unit_test(|| {
let repo = linear::getrepo(None);
let file_changes = store_files(btreemap!{"file" => Some("content")}, repo.clone());
// Bottom commit of the repo
let root = HgChangesetId::from_str("2d7d4ba9ce0a6ffd222de7785b249ead9c51c536").unwrap();
let p = repo.get_bonsai_from_hg(&root).wait().unwrap().unwrap();
let parents = vec![p];
let bcs = BonsaiChangesetMut {
parents: parents,
author: "author".to_string(),
author_date: DateTime::from_timestamp(0, 0).unwrap(),
committer: None,
committer_date: None,
message: "message".to_string(),
extra: btreemap!{},
file_changes,
}.freeze()
.unwrap();
let bcs_id = bcs.get_changeset_id();
save_bonsai_changeset(bcs, repo.clone()).wait().unwrap();
let hg_cs = repo.get_hg_from_bonsai_changeset(bcs_id).wait().unwrap();
let book = Bookmark::new("master").unwrap();
let head = HgChangesetId::from_str("a5ffa77602a066db7d5cfb9fb5823a0895717c5a").unwrap();
let head = repo.get_bonsai_from_hg(&head).wait().unwrap().unwrap();
let mut txn = repo.update_bookmark_transaction();
txn.force_set(&book, &head).unwrap();
txn.commit().wait().unwrap();
do_pushrebase(Arc::new(repo), book, vec![hg_cs])
.wait()
.expect("pushrebase failed");
});
}
}

View File

@ -188,17 +188,40 @@ fn resolve_pushrebase(
resolver
.resolve_b2xtreegroup2(bundle2)
.and_then({
let resolver = resolver.clone();
move |(_, bundle2)| resolver.maybe_resolve_changegroup(bundle2)
cloned!(resolver);
move |(manifests, bundle2)| {
resolver
.maybe_resolve_changegroup(bundle2)
.map(move |(cg_push, bundle2)| (cg_push, manifests, bundle2))
}
})
.and_then(|(cg_push, manifests, bundle2)| {
cg_push
.ok_or(err_msg("Empty pushrebase"))
.into_future()
.map(|cg_push| (cg_push, manifests, bundle2))
})
.and_then({
let resolver = resolver.clone();
move |(cg_push, bundle2)| resolver.ensure_stream_finished(bundle2).map(|_| cg_push)
cloned!(resolver);
move |(cg_push, manifests, bundle2)| {
let changesets = cg_push.changesets.clone();
let mparams = cg_push.mparams.clone();
resolver
.upload_changesets(cg_push, manifests)
.map(move |()| (changesets, mparams, bundle2))
}
})
.and_then(|cg_push| cg_push.ok_or(err_msg("Empty pushrebase")))
.and_then({
let resolver = resolver.clone();
move |cg_push| resolver.pushrebase(cg_push)
cloned!(resolver);
move |(changesets, mparams, bundle2)| {
resolver
.ensure_stream_finished(bundle2)
.map(|_| (changesets, mparams))
}
})
.and_then({
cloned!(resolver);
move |(changesets, mparams)| resolver.pushrebase(changesets, mparams)
})
.and_then(|_| {
let writer = Cursor::new(Vec::new());
@ -698,19 +721,25 @@ impl Bundle2Resolver {
}).boxify()
}
fn pushrebase(&self, cg_push: ChangegroupPush) -> impl Future<Item = (), Error = Error> {
let changesets: Vec<_> = cg_push
.changesets
fn pushrebase(
&self,
changesets: Changesets,
mparams: HashMap<String, Bytes>,
) -> impl Future<Item = (), Error = Error> {
let changesets: Vec<_> = changesets
.into_iter()
.map(|(node, _)| HgChangesetId::new(node))
.collect();
match cg_push.mparams.get("onto").cloned() {
match mparams.get("onto").cloned() {
Some(onto_bookmark) => {
let v = Vec::from(onto_bookmark.as_ref());
let onto_bookmark = try_boxfuture!(String::from_utf8(v));
let onto_bookmark = try_boxfuture!(Bookmark::new(onto_bookmark));
pushrebase::do_pushrebase(self.repo.clone(), onto_bookmark, changesets).boxify()
pushrebase::do_pushrebase(self.repo.clone(), onto_bookmark, changesets)
.map(|_| ())
.map_err(|err| err_msg(format!("pushrebase failed {:?}", err)))
.boxify()
}
None => Err(err_msg("onto is not specified")).into_future().boxify(),
}

View File

@ -43,16 +43,56 @@ Clone the repo
> pushrebase =
> EOF
$ hg up -q tip
$ hg up -q 0
$ echo 1 > 1 && hg add 1 && hg ci -m 1
$ hgmn push -r . --to master_bookmark
pushing to ssh://user@dummy/repo
remote: * DEBG Session with Mononoke started with uuid: * (glob)
searching for changes
remote: * ERRO Command failed, remote: true, error: not implementd, root_cause: ErrorMessage { (glob)
remote: msg: "not implementd"
remote: }, backtrace: , session_uuid: * (glob)
abort: stream ended unexpectedly (got 0 bytes, expected 4)
[255]
TODO(stash): pushrebase of a merge commit, pushrebase over a merge commit
$ hgmn pull -q
$ hgmn up master_bookmark
remote: * DEBG Session with Mononoke started with uuid: * (glob)
2 files updated, 0 files merged, 0 files removed, 0 files unresolved
(activating bookmark master_bookmark)
$ hg sl -r ":"
@ changeset: 4:c2e526aacb51
| bookmark: master_bookmark
| tag: tip
| parent: 2:26805aba1e60
| user: test
| date: Thu Jan 01 00:00:00 1970 +0000
| summary: 1
|
o changeset: 2:26805aba1e60
| user: test
| date: Thu Jan 01 00:00:00 1970 +0000
| summary: C
|
o changeset: 1:112478962961
| user: test
| date: Thu Jan 01 00:00:00 1970 +0000
| summary: B
|
| o changeset: 3:a0c9c5791058
|/ parent: 0:426bada5c675
| user: test
| date: Thu Jan 01 00:00:00 1970 +0000
| summary: 1
|
o changeset: 0:426bada5c675
user: test
date: Thu Jan 01 00:00:00 1970 +0000
summary: A
TODO(stash, aslpavel) This push should fail because of the conflicts
$ hg up -q 0
$ echo 1 > 1 && hg add 1 && hg ci -m 1
$ hgmn push -r . --to master_bookmark
pushing to ssh://user@dummy/repo
remote: * Session with Mononoke started with uuid: * (glob)
searching for changes