Log pushrebase commits to scribe

Summary:
Currently we are logging new commits from BlobRepo. This will lead to issues once CommitCloud starts using Mononoke as we cannot differentiate between phases at that level. The solution is to log commits when they are pushrebased as this guarantees that they are public commits.

Note: This only introduces the new logic, cleaning up the existing implementation is part of D14279210

Reviewed By: StanislavGlebik

Differential Revision: D14279065

fbshipit-source-id: d714fae7164a8af815fc7716379ff0b7eb4826fb
This commit is contained in:
David Budischek 2019-03-12 04:47:49 -07:00 committed by Facebook Github Bot
parent b02e32e02e
commit a76d7c1cdd
6 changed files with 221 additions and 24 deletions

View File

@ -126,6 +126,7 @@ type RebasedChangesets = HashMap<ChangesetId, (ChangesetId, Timestamp)>;
pub struct PushrebaseSuccessResult {
pub head: ChangesetId,
pub retry_num: usize,
pub rebased_changesets: Vec<ChangesetId>,
}
#[derive(Clone)]
@ -247,8 +248,12 @@ fn rebase_in_loop(
maybe_raw_bundle2_id,
)
.and_then(move |update_res| match update_res {
Some(head) => {
ok(Loop::Break(PushrebaseSuccessResult { head, retry_num }))
Some((head, rebased_changesets)) => {
ok(Loop::Break(PushrebaseSuccessResult {
head,
retry_num,
rebased_changesets,
}))
}
None => {
if retry_num < MAX_REBASE_ATTEMPTS {
@ -278,7 +283,7 @@ fn do_rebase(
bookmark_val: Option<ChangesetId>,
onto_bookmark: Bookmark,
maybe_raw_bundle2_id: Option<RawBundle2Id>,
) -> impl Future<Item = Option<ChangesetId>, Error = PushrebaseError> {
) -> impl Future<Item = Option<(ChangesetId, Vec<ChangesetId>)>, Error = PushrebaseError> {
create_rebased_changesets(
ctx.clone(),
repo.clone(),
@ -823,10 +828,14 @@ fn try_update_bookmark(
new_value: ChangesetId,
maybe_raw_bundle2_id: Option<RawBundle2Id>,
rebased_changesets: RebasedChangesets,
) -> BoxFuture<Option<ChangesetId>, PushrebaseError> {
) -> BoxFuture<Option<(ChangesetId, Vec<ChangesetId>)>, PushrebaseError> {
let mut txn = repo.update_bookmark_transaction(ctx.clone());
let bookmark_update_reason =
create_bookmark_update_reason(ctx, repo.clone(), maybe_raw_bundle2_id, rebased_changesets);
let bookmark_update_reason = create_bookmark_update_reason(
ctx,
repo.clone(),
maybe_raw_bundle2_id,
rebased_changesets.clone(),
);
bookmark_update_reason
.from_err()
.and_then({
@ -834,7 +843,17 @@ fn try_update_bookmark(
move |reason| {
try_boxfuture!(txn.update(&bookmark_name, new_value, old_value, reason));
txn.commit()
.map(move |success| if success { Some(new_value) } else { None })
.map(move |success| {
if success {
let changesets = rebased_changesets
.into_iter()
.map(|(_, (cs_id, _))| cs_id)
.collect();
Some((new_value, changesets))
} else {
None
}
})
.from_err()
.boxify()
}
@ -849,10 +868,14 @@ fn try_create_bookmark(
new_value: ChangesetId,
maybe_raw_bundle2_id: Option<RawBundle2Id>,
rebased_changesets: RebasedChangesets,
) -> BoxFuture<Option<ChangesetId>, PushrebaseError> {
) -> BoxFuture<Option<(ChangesetId, Vec<ChangesetId>)>, PushrebaseError> {
let mut txn = repo.update_bookmark_transaction(ctx.clone());
let bookmark_update_reason =
create_bookmark_update_reason(ctx, repo.clone(), maybe_raw_bundle2_id, rebased_changesets);
let bookmark_update_reason = create_bookmark_update_reason(
ctx,
repo.clone(),
maybe_raw_bundle2_id,
rebased_changesets.clone(),
);
bookmark_update_reason
.from_err()
@ -861,7 +884,17 @@ fn try_create_bookmark(
move |reason| {
try_boxfuture!(txn.create(&bookmark_name, new_value, reason));
txn.commit()
.map(move |success| if success { Some(new_value) } else { None })
.map(move |success| {
if success {
let changesets = rebased_changesets
.into_iter()
.map(|(_, (cs_id, _))| cs_id)
.collect();
Some((new_value, changesets))
} else {
None
}
})
.from_err()
.boxify()
}

View File

@ -0,0 +1,100 @@
// Copyright (c) 2018-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use std::sync::Arc;
use failure_ext::Error;
use futures::{Future, IntoFuture};
use futures_ext::{try_boxfuture, BoxFuture, FutureExt};
use scribe::ScribeClient;
use scribe_cxx::ScribeCxxClient;
use serde_derive::Serialize;
use serde_json;
use mononoke_types::{ChangesetId, Generation, RepositoryId};
#[derive(Serialize)]
pub struct CommitInfo {
repo_id: RepositoryId,
generation: Generation,
changeset_id: ChangesetId,
parents: Vec<ChangesetId>,
}
impl CommitInfo {
pub fn new(
repo_id: RepositoryId,
generation: Generation,
changeset_id: ChangesetId,
parents: Vec<ChangesetId>,
) -> Self {
Self {
repo_id,
generation,
changeset_id,
parents,
}
}
}
pub trait ScribeCommitQueue: Send + Sync {
fn queue_commit(&self, commit: CommitInfo) -> BoxFuture<(), Error>;
}
pub struct LogToScribe<C>
where
C: ScribeClient + Sync + Send + 'static,
{
client: Option<Arc<C>>,
category: String,
}
impl LogToScribe<ScribeCxxClient> {
pub fn new_with_default_scribe(category: String) -> Self {
Self {
client: Some(Arc::new(ScribeCxxClient::new())),
category,
}
}
pub fn new_with_discard() -> Self {
Self {
client: None,
category: String::new(),
}
}
}
impl<C> LogToScribe<C>
where
C: ScribeClient + Sync + Send + 'static,
{
pub fn new(client: C, category: String) -> Self {
Self {
client: Some(Arc::new(client)),
category,
}
}
}
impl<C> ScribeCommitQueue for LogToScribe<C>
where
C: ScribeClient + Sync + Send + 'static,
{
fn queue_commit(&self, commit: CommitInfo) -> BoxFuture<(), Error> {
match &self.client {
Some(client) => {
let commit = try_boxfuture!(serde_json::to_string(&commit));
client
.offer(&self.category, &commit)
.into_future()
.from_err()
.boxify()
}
None => Ok(()).into_future().boxify(),
}
}
}

View File

@ -12,6 +12,7 @@ extern crate ascii;
extern crate async_unit;
extern crate blobstore;
extern crate bytes;
extern crate scribe_commit_queue;
#[macro_use]
extern crate cloned;
#[macro_use]

View File

@ -35,6 +35,7 @@ use metaconfig_types::{PushrebaseParams, RepoReadOnly};
use mononoke_types::{BlobstoreValue, ChangesetId, RawBundle2, RawBundle2Id};
use pushrebase;
use reachabilityindex::LeastCommonAncestorsHint;
use scribe_commit_queue::{self, ScribeCommitQueue};
use scuba_ext::{ScubaSampleBuilder, ScubaSampleBuilderExt};
use stats::*;
@ -339,7 +340,12 @@ fn resolve_pushrebase(
cloned!(ctx, resolver);
move |(changesets, bookmark_push_part_id, onto_params, maybe_raw_bundle2_id)| {
resolver
.run_hooks(ctx.clone(), changesets.clone(), maybe_pushvars, &onto_params.bookmark)
.run_hooks(
ctx.clone(),
changesets.clone(),
maybe_pushvars,
&onto_params.bookmark,
)
.map_err(|err| match err {
RunHooksError::Failures((cs_hook_failures, file_hook_failures)) => {
let mut err_msgs = vec![];
@ -372,17 +378,28 @@ fn resolve_pushrebase(
})
}
})
.and_then(move |(pushrebased_rev, onto_params, bookmark_push_part_id)| {
resolver.prepare_pushrebase_response(
ctx,
commonheads,
pushrebased_rev,
onto_params.bookmark,
lca_hint,
phases_hint,
.and_then(
move |(
(pushrebased_rev, pushrebased_changesets),
onto_params,
bookmark_push_part_id,
)
})
)| {
// TODO: (dbudischek) T41565649 log pushed changesets as well, not only pushrebased
resolver
.log_commits_to_scribe(ctx.clone(), pushrebased_changesets)
.and_then(move |_| {
resolver.prepare_pushrebase_response(
ctx,
commonheads,
pushrebased_rev,
onto_params.bookmark,
lca_hint,
phases_hint,
bookmark_push_part_id,
)
})
},
)
.boxify()
}
@ -532,6 +549,7 @@ struct Bundle2Resolver {
repo: BlobRepo,
pushrebase: PushrebaseParams,
hook_manager: Arc<HookManager>,
scribe_commit_queue: Arc<ScribeCommitQueue>,
}
impl Bundle2Resolver {
@ -541,11 +559,19 @@ impl Bundle2Resolver {
pushrebase: PushrebaseParams,
hook_manager: Arc<HookManager>,
) -> Self {
let scribe_commit_queue = match pushrebase.commit_scribe_category.clone() {
Some(category) => Arc::new(scribe_commit_queue::LogToScribe::new_with_default_scribe(
category,
)),
None => Arc::new(scribe_commit_queue::LogToScribe::new_with_discard()),
};
Self {
ctx,
repo,
pushrebase,
hook_manager,
scribe_commit_queue,
}
}
@ -1004,6 +1030,37 @@ impl Bundle2Resolver {
.boxify()
}
fn log_commits_to_scribe(
&self,
ctx: CoreContext,
changesets: Vec<ChangesetId>,
) -> BoxFuture<(), Error> {
let repo = self.repo.clone();
let queue = self.scribe_commit_queue.clone();
let futs = changesets.into_iter().map(move |changeset_id| {
cloned!(ctx, repo, queue, changeset_id);
let generation = repo
.get_generation_number_by_bonsai(ctx.clone(), changeset_id)
.and_then(|maybe_gen| maybe_gen.ok_or(err_msg("No generation number found")));
let parents = repo.get_changeset_parents_by_bonsai(ctx, changeset_id);
let repo_id = repo.get_repoid();
let queue = queue;
generation
.join(parents)
.and_then(move |(generation, parents)| {
let ci = scribe_commit_queue::CommitInfo::new(
repo_id,
generation,
changeset_id,
parents,
);
queue.queue_commit(ci)
})
});
future::join_all(futs).map(|_| ()).boxify()
}
/// Ensures that the next item in stream is None
fn ensure_stream_finished(
&self,
@ -1201,7 +1258,7 @@ impl Bundle2Resolver {
changesets: Changesets,
onto_bookmark: &pushrebase::OntoBookmarkParams,
maybe_raw_bundle2_id: Option<RawBundle2Id>,
) -> impl Future<Item = ChangesetId, Error = Error> {
) -> impl Future<Item = (ChangesetId, Vec<ChangesetId>), Error = Error> {
pushrebase::do_pushrebase(
ctx,
self.repo.clone(),
@ -1226,7 +1283,7 @@ impl Bundle2Resolver {
Ok(())
}
})
.map(|res| res.head)
.map(|res| (res.head, res.rebased_changesets))
.boxify()
}

View File

@ -353,6 +353,7 @@ impl RepoConfigs {
PushrebaseParams {
rewritedates: raw.rewritedates.unwrap_or(default.rewritedates),
recursion_limit: raw.recursion_limit.unwrap_or(default.recursion_limit),
commit_scribe_category: raw.commit_scribe_category,
}
})
.unwrap_or_default();
@ -516,6 +517,7 @@ enum RawBlobstoreType {
struct RawPushrebaseParams {
rewritedates: Option<bool>,
recursion_limit: Option<usize>,
commit_scribe_category: Option<String>,
}
#[derive(Clone, Debug, Deserialize)]
@ -726,6 +728,7 @@ mod test {
pushrebase: PushrebaseParams {
rewritedates: false,
recursion_limit: 1024,
commit_scribe_category: None,
},
lfs: LfsParams {
threshold: Some(1000),

View File

@ -243,6 +243,8 @@ pub struct PushrebaseParams {
pub rewritedates: bool,
/// How far will we go from bookmark to find rebase root
pub recursion_limit: usize,
/// Scribe category we log new commits to
pub commit_scribe_category: Option<String>,
}
impl Default for PushrebaseParams {
@ -250,6 +252,7 @@ impl Default for PushrebaseParams {
PushrebaseParams {
rewritedates: true,
recursion_limit: 16384, // this number is fairly arbirary
commit_scribe_category: None,
}
}
}