mononoke: add push redirector logic

Summary:
This diff adds some meat to the backbone, introduced in D18370903. Test are to come in later diffs.

Copied from the parent diff:

Push redirector is one of the core components of cross-repo sync in Mononoke. It comes into play when large repository serves writes. Here's the schematic flow of the `unbundle` pipeline:

|Step| Small repo | Push redirector/Backsyncer | Large repo |
|1|Parse `unbundle` bod, decide whether it's push, pushrebase, etc | | |
|2|Upload all of the changesets, provided in the `unbundle` body | | |
|3||(small-to-large direction) ->||
|4| |Convert parsing result (`PostResolveAction`) to be usable in the large repo. This involves syncing uploaded changesets, renaming bookmarks if needed, etc. ||
|5|||Process converted `PostResolveAction` (i.e. perform push, pushrebase or infinitepush). Create an `UnbundleResponse` struct, which contains all the information, necessary to generate response bytes to be sent to the user.|
|6|| <- (large-to-small direction) ||
|7||Call the Backsyncer to sync all the commits, created in the large repo into the small repo. Then, convert `UnbundleResponse` struct (by replacing commits with their equivalents and renaming bookmarks) to be suitable to be used in the small repo||
|8|Generate response bytes from the `UnbundleResponse` struct, and send those bytes to the user. |||

Reviewed By: StanislavGlebik

Differential Revision: D18288854

fbshipit-source-id: 36eb78fcc03ca5249776237ef9dda2b4747ecc68
This commit is contained in:
Kostia Balytskyi 2019-11-11 03:08:56 -08:00 committed by Facebook Github Bot
parent 2979cb564a
commit d2de3470af
3 changed files with 468 additions and 26 deletions

View File

@ -20,4 +20,5 @@ pub use resolver::{
resolve, BundleResolverError, Changesets, CommonHeads, InfiniteBookmarkPush,
NonFastForwardPolicy, PlainBookmarkPush, PostResolveAction, PostResolveBookmarkOnlyPushRebase,
PostResolveInfinitePush, PostResolvePush, PostResolvePushRebase, PushrebaseBookmarkSpec,
UploadedHgBonsaiMap,
};

View File

@ -7,6 +7,7 @@
*/
#![deny(warnings)]
#![feature(async_closure)]
//! State for a single source control Repo

View File

@ -15,21 +15,35 @@ use crate::unbundle::response::{
};
use crate::unbundle::run_post_resolve_action;
use backsyncer::backsync_all_latest;
use backsyncer::TargetRepoDbs;
use bundle2_resolver::InfiniteBookmarkPush;
use bundle2_resolver::PlainBookmarkPush;
use bundle2_resolver::PushrebaseBookmarkSpec;
use bundle2_resolver::{
BundleResolverError, PostResolveAction, PostResolveBookmarkOnlyPushRebase,
PostResolveInfinitePush, PostResolvePush, PostResolvePushRebase,
PostResolveInfinitePush, PostResolvePush, PostResolvePushRebase, UploadedHgBonsaiMap,
};
use cloned::cloned;
use context::CoreContext;
use cross_repo_sync::CommitSyncer;
use failure::Error;
use cross_repo_sync::{CommitSyncOutcome, CommitSyncer};
use failure::{format_err, Error};
use futures::Future;
use futures_preview::compat::Future01CompatExt;
use futures_util::{future::FutureExt, try_future::TryFutureExt};
use futures_preview::future::try_join_all;
use futures_util::{future::FutureExt, try_future::TryFutureExt, try_join};
use mercurial_types::HgChangesetId;
use mononoke_types::ChangesetId;
use pushrebase::PushrebaseChangesetPair;
use std::collections::HashMap;
use std::sync::Arc;
use synced_commit_mapping::SyncedCommitMapping;
use topo_sort::sort_topological;
#[derive(Clone)]
/// Core push redirector struct. Performs conversions of pushes
/// to be processed by the large repo, and conversions of results
/// to be presented as if the pushes were processed by the small repo
pub struct RepoSyncTarget {
// target (large) repo to sync into
pub repo: MononokeRepo,
@ -133,10 +147,39 @@ impl RepoSyncTarget {
/// the large repo
async fn convert_post_resolve_push_action(
&self,
_ctx: CoreContext,
_orig: PostResolvePush,
ctx: CoreContext,
orig: PostResolvePush,
) -> Result<PostResolvePush, Error> {
unimplemented!("convert_post_resolve_push_action")
// Note: the `maybe_raw_bundle2_id` field here contains a bundle, which
// was uploaded in the small repo (and is stored in the small repo's blobstore).
// However, once the `bookmarks_update_log` transaction is successful, we
// will mention this bundle id in the table entry. In essense, the table
// entry for the large repo will point to a blobstore key, which does not
// exist in that large repo.
let PostResolvePush {
changegroup_id,
bookmark_pushes,
maybe_raw_bundle2_id,
non_fast_forward_policy,
uploaded_hg_bonsai_map,
} = orig;
let uploaded_hg_bonsai_map = self
.sync_uploaded_changesets(ctx.clone(), uploaded_hg_bonsai_map)
.await?;
let bookmark_pushes = try_join_all(bookmark_pushes.into_iter().map(|bookmark_push| {
self.convert_plain_bookmark_push_small_to_large(ctx.clone(), bookmark_push)
}))
.await?;
Ok(PostResolvePush {
changegroup_id,
bookmark_pushes,
maybe_raw_bundle2_id,
non_fast_forward_policy,
uploaded_hg_bonsai_map,
})
}
/// Convert `PostResolvePushRebase` struct in the small-to-large direction
@ -144,10 +187,41 @@ impl RepoSyncTarget {
/// the large repo
async fn convert_post_resolve_pushrebase_action(
&self,
_ctx: CoreContext,
_orig: PostResolvePushRebase,
ctx: CoreContext,
orig: PostResolvePushRebase,
) -> Result<PostResolvePushRebase, Error> {
unimplemented!("convert_post_resolve_pushrebase_action")
// Note: the `maybe_raw_bundle2_id` field here contains a bundle, which
// was uploaded in the small repo (and is stored in the small repo's blobstore).
// However, once the `bookmarks_update_log` transaction is successful, we
// will mention this bundle id in the table entry. In essense, the table
// entry for the large repo will point to a blobstore key, which does not
// exist in that large repo.
let PostResolvePushRebase {
any_merges,
bookmark_push_part_id,
bookmark_spec,
maybe_raw_bundle2_id,
maybe_pushvars,
commonheads,
uploaded_hg_bonsai_map,
} = orig;
let uploaded_hg_bonsai_map = self
.sync_uploaded_changesets(ctx.clone(), uploaded_hg_bonsai_map)
.await?;
let bookmark_spec = self
.convert_pushrebase_bookmark_spec(ctx.clone(), bookmark_spec)
.await?;
Ok(PostResolvePushRebase {
any_merges,
bookmark_push_part_id,
bookmark_spec,
maybe_raw_bundle2_id,
maybe_pushvars,
commonheads,
uploaded_hg_bonsai_map,
})
}
/// Convert `PostResolveInfinitePush` struct in the small-to-large direction
@ -155,20 +229,51 @@ impl RepoSyncTarget {
/// the large repo
async fn convert_post_resolve_infinitepush_action(
&self,
_ctx: CoreContext,
_orig: PostResolveInfinitePush,
ctx: CoreContext,
orig: PostResolveInfinitePush,
) -> Result<PostResolveInfinitePush, Error> {
unimplemented!("convert_post_resolve_infinitepush_action")
let PostResolveInfinitePush {
changegroup_id,
bookmark_push,
maybe_raw_bundle2_id,
uploaded_hg_bonsai_map,
} = orig;
let uploaded_hg_bonsai_map = self
.sync_uploaded_changesets(ctx.clone(), uploaded_hg_bonsai_map)
.await?;
let bookmark_push = self
.convert_infinite_bookmark_push_small_to_large(ctx.clone(), bookmark_push)
.await?;
Ok(PostResolveInfinitePush {
changegroup_id,
bookmark_push,
maybe_raw_bundle2_id,
uploaded_hg_bonsai_map,
})
}
/// Convert a `PostResolveBookmarkOnlyPushRebase` in a small-to-large
/// direction, to be suitable for a processing in a large repo
async fn convert_post_resolve_bookmark_only_pushrebase_action(
&self,
_ctx: CoreContext,
_orig: PostResolveBookmarkOnlyPushRebase,
ctx: CoreContext,
orig: PostResolveBookmarkOnlyPushRebase,
) -> Result<PostResolveBookmarkOnlyPushRebase, Error> {
unimplemented!("convert_post_resolve_bookmark_only_pushrebase_action")
let PostResolveBookmarkOnlyPushRebase {
bookmark_push,
maybe_raw_bundle2_id,
non_fast_forward_policy,
} = orig;
let bookmark_push = self
.convert_plain_bookmark_push_small_to_large(ctx.clone(), bookmark_push)
.await?;
Ok(PostResolveBookmarkOnlyPushRebase {
bookmark_push,
maybe_raw_bundle2_id,
non_fast_forward_policy,
})
}
/// Convert `UnbundleResponse` enum in a large-to-small direction
@ -199,30 +304,94 @@ impl RepoSyncTarget {
/// direction to be suitable for response generation in the small repo
async fn convert_unbundle_pushrebase_response(
&self,
_ctx: CoreContext,
_orig: UnbundlePushRebaseResponse,
ctx: CoreContext,
orig: UnbundlePushRebaseResponse,
) -> Result<UnbundlePushRebaseResponse, Error> {
unimplemented!("convert_unbundle_pushrebase_response")
let UnbundlePushRebaseResponse {
commonheads,
pushrebased_rev,
pushrebased_changesets,
onto,
bookmark_push_part_id,
} = orig;
// Let's make sure all the public pushes to the large repo
// are backsynced to the small repo, by tailing the `bookmarks_update_log`
// of the large repo
backsync_all_latest(
ctx.clone(),
self.large_to_small_commit_syncer.clone(),
self.target_repo_dbs.clone(),
)
.await?;
let (pushrebased_rev, pushrebased_changesets) = try_join!(
self.remap_changeset_expect_rewritten_or_preserved(
ctx.clone(),
&self.large_to_small_commit_syncer,
pushrebased_rev,
),
self.convert_pushrebased_changesets(ctx.clone(), pushrebased_changesets),
)?;
let onto = self
.large_to_small_commit_syncer
.rename_bookmark(&onto)
.ok_or(format_err!(
"bookmark_renamer unexpectedly dropped {} in {:?}",
onto,
self.large_to_small_commit_syncer
))?;
Ok(UnbundlePushRebaseResponse {
commonheads,
pushrebased_rev,
pushrebased_changesets,
onto,
bookmark_push_part_id,
})
}
/// Convert `UnbundleBookmarkOnlyPushRebaseResponse` struct in a large-to-small
/// direction to be suitable for response generation in the small repo
async fn convert_unbundle_bookmark_only_pushrebase_response(
&self,
_ctx: CoreContext,
_orig: UnbundleBookmarkOnlyPushRebaseResponse,
ctx: CoreContext,
orig: UnbundleBookmarkOnlyPushRebaseResponse,
) -> Result<UnbundleBookmarkOnlyPushRebaseResponse, Error> {
unimplemented!("convert_unbundle_bookmark_only_pushrebase_response")
// `UnbundleBookmarkOnlyPushRebaseResponse` consists of only one field:
// `bookmark_push_part_id`, which does not need to be converted
// We do, however, need to wait until the backsyncer catches up with
// with the `bookmarks_update_log` tailing
backsync_all_latest(
ctx.clone(),
self.large_to_small_commit_syncer.clone(),
self.target_repo_dbs.clone(),
)
.await?;
Ok(orig)
}
/// Convert `UnbundlePushResponse` struct in a large-to-small
/// direction to be suitable for response generation in the small repo
async fn convert_unbundle_push_response(
&self,
_ctx: CoreContext,
_orig: UnbundlePushResponse,
ctx: CoreContext,
orig: UnbundlePushResponse,
) -> Result<UnbundlePushResponse, Error> {
unimplemented!("convert_unbundle_push_response")
// `UnbundlePushResponse` consists of only two fields:
// `changegroup_id` and `bookmark_ids`, which do not need to be converted
// We do, however, need to wait until the backsyncer catches up with
// with the `bookmarks_update_log` tailing
backsync_all_latest(
ctx.clone(),
self.large_to_small_commit_syncer.clone(),
self.target_repo_dbs.clone(),
)
.await?;
Ok(orig)
}
/// Convert `UnbundleInfinitePushResponse` struct in a large-to-small
@ -232,6 +401,277 @@ impl RepoSyncTarget {
_ctx: CoreContext,
_orig: UnbundleInfinitePushResponse,
) -> Result<UnbundleInfinitePushResponse, Error> {
unimplemented!("convert_unbundle_infinite_push_response")
// TODO: this can only be implemented once we have a way
// catch up on non-public commits, created in the
// large repo. One proposal is to include those in
// `UnbundleInfinitePushResponse` and make this
// method call some `CommitSyncer` method to sync
// those commits.
Err(format_err!(
"convert_unbundle_infinite_push_response is not implemented"
))
}
/// Given, the `source_cs_id` in the small repo, get it's equivalent
/// in a large repo. See `remap_changeset_expect_rewritten_or_preserved`
/// for details
async fn get_small_to_large_commit_equivalent(
&self,
ctx: CoreContext,
source_cs_id: ChangesetId,
) -> Result<ChangesetId, Error> {
self.remap_changeset_expect_rewritten_or_preserved(
ctx,
&self.small_to_large_commit_syncer,
source_cs_id,
)
.await
}
/// Query the changeset mapping from the provided `syncer`
/// Error out if the `CommitSyncOutcome` is not `RewrittenAs` or `Preserved`
/// The logic of this method is to express an expectation that `cs_id`
/// from the source repo MUST be properly present in the target repo,
/// either with paths moved, or preserved. What is unacceptable is that
/// the changeset is not yet synced, or rewritten into nothingness, or
/// preserved from a different repo.
async fn remap_changeset_expect_rewritten_or_preserved(
&self,
ctx: CoreContext,
syncer: &CommitSyncer<Arc<dyn SyncedCommitMapping>>,
cs_id: ChangesetId,
) -> Result<ChangesetId, Error> {
let maybe_commit_sync_outcome = syncer.get_commit_sync_outcome(ctx.clone(), cs_id).await?;
maybe_commit_sync_outcome
.ok_or(format_err!(
"Unexpected absence of CommitSyncOutcome for {} in {:?}",
cs_id,
syncer
))
.and_then(|commit_sync_outcome| match commit_sync_outcome {
CommitSyncOutcome::Preserved => Ok(cs_id),
CommitSyncOutcome::RewrittenAs(rewritten) => Ok(rewritten),
cso => Err(format_err!(
"Unexpected CommitSyncOutcome for {} in {:?}: {:?}",
cs_id,
syncer,
cso
)),
})
}
/// Convert `InfiniteBookmarkPush<ChangesetId>` in the small-to-large direction
/// Note: this does not cause any changesets to be synced, just converts the struct
/// all the syncing is expected to be done prior to calling this fn.
async fn convert_infinite_bookmark_push_small_to_large(
&self,
ctx: CoreContext,
orig: InfiniteBookmarkPush<ChangesetId>,
) -> Result<InfiniteBookmarkPush<ChangesetId>, Error> {
let maybe_old = orig.old.clone();
let new = orig.new.clone();
let (old, new) = try_join!(
async {
match maybe_old {
None => Ok(None),
Some(old) => self
.get_small_to_large_commit_equivalent(ctx.clone(), old)
.await
.map(Some),
}
},
self.get_small_to_large_commit_equivalent(ctx.clone(), new),
)?;
Ok(InfiniteBookmarkPush { old, new, ..orig })
}
/// Convert `PlainBookmarkPush<ChangesetId>` in the small-to-large direction
/// Note: this does not cause any changesets to be synced, just converts the struct
/// all the syncing is expected to be done prior to calling this fn.
async fn convert_plain_bookmark_push_small_to_large(
&self,
ctx: CoreContext,
orig: PlainBookmarkPush<ChangesetId>,
) -> Result<PlainBookmarkPush<ChangesetId>, Error> {
let PlainBookmarkPush {
part_id,
name,
old: maybe_old,
new: maybe_new,
} = orig;
let (old, new) = try_join!(
async {
match maybe_old {
None => Ok(None),
Some(old) => self
.get_small_to_large_commit_equivalent(ctx.clone(), old)
.await
.map(Some),
}
},
async {
match maybe_new {
None => Ok(None),
Some(new) => self
.get_small_to_large_commit_equivalent(ctx.clone(), new)
.await
.map(Some),
}
},
)?;
let name = self
.small_to_large_commit_syncer
.rename_bookmark(&name)
.ok_or(format_err!(
"Bookmark {} unexpectedly dropped in {:?}",
name,
self.small_to_large_commit_syncer
))?;
Ok(PlainBookmarkPush {
part_id,
name,
old,
new,
})
}
/// Convert the `PushrebaseBookmarkSpec` struct in the small-to-large direction
async fn convert_pushrebase_bookmark_spec(
&self,
ctx: CoreContext,
pushrebase_bookmark_spec: PushrebaseBookmarkSpec<ChangesetId>,
) -> Result<PushrebaseBookmarkSpec<ChangesetId>, Error> {
match pushrebase_bookmark_spec {
PushrebaseBookmarkSpec::NormalPushrebase(onto_params) => {
Ok(PushrebaseBookmarkSpec::NormalPushrebase(onto_params))
}
PushrebaseBookmarkSpec::ForcePushrebase(plain_push) => {
let converted = self
.convert_plain_bookmark_push_small_to_large(ctx.clone(), plain_push)
.await?;
Ok(PushrebaseBookmarkSpec::ForcePushrebase(converted))
}
}
}
/// Convert `PushrebaseChangesetPair` struct in the large-to-small direction
async fn convert_pushrebase_changeset_pair(
&self,
ctx: CoreContext,
pushrebase_changeset_pair: PushrebaseChangesetPair,
) -> Result<PushrebaseChangesetPair, Error> {
let PushrebaseChangesetPair { id_old, id_new } = pushrebase_changeset_pair;
let (id_old, id_new) = try_join!(
self.remap_changeset_expect_rewritten_or_preserved(
ctx.clone(),
&self.large_to_small_commit_syncer,
id_old
),
self.remap_changeset_expect_rewritten_or_preserved(
ctx.clone(),
&self.large_to_small_commit_syncer,
id_new
),
)?;
Ok(PushrebaseChangesetPair { id_old, id_new })
}
/// Convert all the produced `PushrebaseChangesetPair` structs in the
/// large-to-small direction
async fn convert_pushrebased_changesets(
&self,
ctx: CoreContext,
pushrebased_changesets: Vec<PushrebaseChangesetPair>,
) -> Result<Vec<PushrebaseChangesetPair>, Error> {
try_join_all(pushrebased_changesets.into_iter().map({
|pushrebase_changeset_pair| {
self.convert_pushrebase_changeset_pair(ctx.clone(), pushrebase_changeset_pair)
}
}))
.await
}
/// Take a map, representing the changesets uploaded during the `unbundle` resolution
/// and sync all the changesets into a large repo, while remembering which `HgChangesetId`
/// corresponds to which synced changeset
async fn sync_uploaded_changesets(
&self,
ctx: CoreContext,
uploaded_map: UploadedHgBonsaiMap,
) -> Result<UploadedHgBonsaiMap, Error> {
let target_repo = self.small_to_large_commit_syncer.get_target_repo();
let bonsai_hg_map: HashMap<ChangesetId, HgChangesetId> = uploaded_map
.iter()
.map(|(hg_cs_id, bcs)| (bcs.get_changeset_id(), hg_cs_id.clone()))
.collect();
let to_sync: HashMap<ChangesetId, Vec<ChangesetId>> = uploaded_map
.iter()
.map(|(_, bcs)| {
// For the toposort purposes, let's only collect parents, uploaded
// as part of this push
let uploaded_parents: Vec<ChangesetId> = bcs
.parents()
.filter(|bcs_id| bonsai_hg_map.contains_key(bcs_id))
.collect();
(bcs.get_changeset_id(), uploaded_parents)
})
.collect();
let to_sync: Vec<ChangesetId> = sort_topological(&to_sync)
.ok_or(format_err!("Cycle in the uploaded changeset DAG!"))?
.into_iter()
.rev()
.collect();
let mut synced_ids: HashMap<ChangesetId, ChangesetId> = HashMap::new();
for bcs_id in to_sync.iter() {
let synced_bcs_id = self
.small_to_large_commit_syncer
.sync_commit(ctx.clone(), *bcs_id)
.await?
.ok_or(format_err!(
"{} was rewritten into nothingness during uploaded changesets sync",
bcs_id
))?;
synced_ids.insert(*bcs_id, synced_bcs_id);
}
try_join_all(
synced_ids
.into_iter()
.map(|(source_repo_bcs_id, target_repo_bcs_id)| {
let maybe_hg_cs_id = bonsai_hg_map.get(&source_repo_bcs_id);
(source_repo_bcs_id, target_repo_bcs_id, maybe_hg_cs_id)
})
.map(
move |(source_repo_bcs_id, target_repo_bcs_id, maybe_hg_cs_id)| {
cloned!(ctx, target_repo);
async move {
let hg_cs_id = maybe_hg_cs_id.ok_or(format_err!(
"ProgrammerError: we synced a commit we weren't supposed to: {}",
source_repo_bcs_id
))?;
let bcs = target_repo
.get_bonsai_changeset(ctx, target_repo_bcs_id)
.compat()
.await?;
let res: Result<_, Error> = Ok((hg_cs_id.clone(), bcs));
res
}
},
),
)
.await
.map(|v| v.into_iter().collect())
}
}