mononoke/scs: add method to look up commit origin over pushrebase mutations

Reviewed By: StanislavGlebik

Differential Revision: D30495086

fbshipit-source-id: 5fe659033b5e0e8a557f173d677caa0dfd531b05
This commit is contained in:
Ilia Medianikov 2021-09-10 13:33:25 -07:00 committed by Facebook GitHub Bot
parent 424f210a29
commit f394b86528
30 changed files with 537 additions and 101 deletions

View File

@ -24,6 +24,10 @@ test = false
name = "check_git_wc"
path = "cmds/check_git_wc/main.rs"
[[bin]]
name = "compute_commit_stats"
path = "cmds/compute_commit_stats/src/main.rs"
[[bin]]
name = "configlint"
path = "cmds/configlint.rs"
@ -101,6 +105,7 @@ context = { version = "0.1.0", path = "server/context" }
criterion = "=0.3.1"
dashmap = { version = "4.0.2", features = ["serde"] }
derive_more = "0.99.3"
derived_data = { version = "0.1.0", path = "derived_data" }
environment = { version = "0.1.0", path = "cmdlib/environment" }
failure_ext = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbinit = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
@ -111,6 +116,7 @@ futures-util = "0.3.7"
git2 = "0.13"
itertools = "0.10.1"
lock_ext = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
manifest = { version = "0.1.0", path = "manifest" }
mercurial_derived_data = { version = "0.1.0", path = "derived_data/mercurial_derived_data" }
mercurial_revlog = { version = "0.1.0", path = "mercurial/revlog" }
mercurial_types = { version = "0.1.0", path = "mercurial/types" }
@ -125,6 +131,8 @@ scuba_ext = { version = "0.1.0", path = "common/scuba_ext" }
segmented_changelog = { version = "0.1.0", path = "segmented_changelog" }
serde = { version = "1.0.126", features = ["derive", "rc"] }
serde_derive = "1.0"
serde_json = { version = "1.0", features = ["float_roundtrip"] }
skeleton_manifest = { version = "0.1.0", path = "derived_data/skeleton_manifest" }
slog = { version = "2.5", features = ["max_level_trace", "nested-values"] }
sql_construct = { version = "0.1.0", path = "common/sql_construct" }
sql_ext = { version = "0.1.0", path = "common/rust/sql_ext" }

View File

@ -32,11 +32,6 @@ fn main() {
conf.options(options);
}
let include_srcs = vec![
];
conf.include_srcs(include_srcs);
conf
};

View File

@ -32,11 +32,6 @@ fn main() {
conf.options(options);
}
let include_srcs = vec![
];
conf.include_srcs(include_srcs);
conf
};

View File

@ -33,11 +33,6 @@ mononoke_types_thrift mononoke_types_thrift",
conf.options(options);
}
let include_srcs = vec![
];
conf.include_srcs(include_srcs);
conf
};

View File

@ -34,11 +34,6 @@ mononoke_types_thrift mononoke_types_thrift",
conf.options(options);
}
let include_srcs = vec![
];
conf.include_srcs(include_srcs);
conf
};

View File

@ -33,11 +33,6 @@ mononoke_types_thrift mononoke_types_thrift",
conf.options(options);
}
let include_srcs = vec![
];
conf.include_srcs(include_srcs);
conf
};

View File

@ -33,11 +33,6 @@ mononoke_types_thrift mononoke_types_thrift",
conf.options(options);
}
let include_srcs = vec![
];
conf.include_srcs(include_srcs);
conf
};

View File

@ -17,7 +17,6 @@ async-trait = "0.1.51"
cached_config = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
commitsync = { version = "0.1.0", path = "../../../../configerator/structs/scm/mononoke/repos/commitsync" }
context = { version = "0.1.0", path = "../../server/context" }
iterhelpers = { version = "0.1.0", path = "../../common/iterhelpers" }
metaconfig_parser = { version = "0.1.0", path = "../../metaconfig/parser" }
metaconfig_types = { version = "0.1.0", path = "../../metaconfig/types" }
mononoke_types = { version = "0.1.0", path = "../../mononoke_types" }

View File

@ -12,7 +12,6 @@ use async_trait::async_trait;
use cached_config::{ConfigHandle, ConfigStore};
use commitsync::types::{RawCommitSyncAllVersions, RawCommitSyncCurrentVersions};
use context::CoreContext;
use iterhelpers::get_only_item;
use metaconfig_parser::Convert;
use metaconfig_types::{CommitSyncConfig, CommitSyncConfigVersion};
use mononoke_types::RepositoryId;
@ -76,6 +75,17 @@ pub trait LiveCommitSyncConfig: Send + Sync {
Ok(version_name)
}
/// Return current version of `CommitSyncConfig` struct
/// for a given repository. Returns None if no config was found
///
/// NOTE: two subsequent calls may return different results
/// as this queries config source
async fn get_current_commit_sync_config_if_exists(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
) -> Result<Option<CommitSyncConfig>>;
/// Return current version of `CommitSyncConfig` struct
/// for a given repository
///
@ -85,7 +95,11 @@ pub trait LiveCommitSyncConfig: Send + Sync {
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
) -> Result<CommitSyncConfig>;
) -> Result<CommitSyncConfig> {
self.get_current_commit_sync_config_if_exists(ctx, repo_id)
.await?
.ok_or(ErrorKind::NotPartOfAnyCommitSyncConfig(repo_id).into())
}
/// Return all historical versions of `CommitSyncConfig`
/// structs for a given repository
@ -186,17 +200,17 @@ impl LiveCommitSyncConfig for CfgrLiveCommitSyncConfig {
}
/// Return current version of `CommitSyncConfig` struct
/// for a given repository
/// for a given repository. Returns None if no config was found
///
/// NOTE: two subsequent calls may return different results
/// as this queries config source
async fn get_current_commit_sync_config(
async fn get_current_commit_sync_config_if_exists(
&self,
_ctx: &CoreContext,
repo_id: RepositoryId,
) -> Result<CommitSyncConfig> {
) -> Result<Option<CommitSyncConfig>> {
let config = self.config_handle_for_current_versions.get();
let raw_commit_sync_config = {
let maybe_raw_commit_sync_config = {
let interesting_top_level_configs = config
.repos
.iter()
@ -205,16 +219,19 @@ impl LiveCommitSyncConfig for CfgrLiveCommitSyncConfig {
})
.map(|(_, commit_sync_config)| commit_sync_config);
let res: Result<_, Error> = get_only_item(
interesting_top_level_configs,
|| ErrorKind::NotPartOfAnyCommitSyncConfig(repo_id),
|_, _| ErrorKind::PartOfMultipleCommitSyncConfigs(repo_id),
);
res?.clone()
let mut iter = interesting_top_level_configs;
match (iter.next(), iter.next()) {
(None, _) => Ok(None),
(Some(config), None) => Ok(Some(config)),
(Some(_), Some(_)) => Err(ErrorKind::PartOfMultipleCommitSyncConfigs(repo_id)),
}?
};
let commit_sync_config = raw_commit_sync_config.convert()?;
Ok(commit_sync_config)
if let Some(config) = maybe_raw_commit_sync_config {
Ok(Some(config.clone().convert()?))
} else {
Ok(None)
}
}
/// Return all historical versions of `CommitSyncConfig`
@ -334,10 +351,10 @@ impl TestLiveCommitSyncConfigSource {
.insert(repo_id, true);
}
pub fn get_commit_sync_config_for_repo(
pub fn get_commit_sync_config_for_repo_if_exists(
&self,
repo_id: RepositoryId,
) -> Result<CommitSyncConfig> {
) -> Result<Option<CommitSyncConfig>> {
let mut configs = vec![];
let current_versions = { self.0.current_versions.lock().unwrap().clone() };
@ -359,17 +376,25 @@ impl TestLiveCommitSyncConfigSource {
let mut iter = configs.into_iter();
match (iter.next(), iter.next()) {
(Some(config), None) => Ok(config.clone()),
(Some(config), None) => Ok(Some(config.clone())),
(Some(first), Some(second)) => Err(anyhow!(
"too many configs for {}: {:?} and {:?}",
repo_id,
first,
second
)),
(None, _) => Err(anyhow!("No config for {}", repo_id)),
(None, _) => Ok(None),
}
}
pub fn get_commit_sync_config_for_repo(
&self,
repo_id: RepositoryId,
) -> Result<CommitSyncConfig> {
self.get_commit_sync_config_for_repo_if_exists(repo_id)?
.ok_or(anyhow!("No config for {}", repo_id))
}
fn push_redirector_enabled_for_draft(&self, repo_id: RepositoryId) -> bool {
*self
.0
@ -390,6 +415,14 @@ impl TestLiveCommitSyncConfigSource {
.unwrap_or(&false)
}
fn get_current_commit_sync_config_if_exists(
&self,
_ctx: &CoreContext,
repo_id: RepositoryId,
) -> Result<Option<CommitSyncConfig>> {
self.get_commit_sync_config_for_repo_if_exists(repo_id)
}
fn get_current_commit_sync_config(
&self,
_ctx: &CoreContext,
@ -478,6 +511,15 @@ impl LiveCommitSyncConfig for TestLiveCommitSyncConfig {
self.source.push_redirector_enabled_for_public(repo_id)
}
async fn get_current_commit_sync_config_if_exists(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
) -> Result<Option<CommitSyncConfig>> {
self.source
.get_current_commit_sync_config_if_exists(ctx, repo_id)
}
async fn get_current_commit_sync_config(
&self,
ctx: &CoreContext,

View File

@ -33,11 +33,6 @@ mononoke_types_thrift mononoke_types_thrift",
conf.options(options);
}
let include_srcs = vec![
];
conf.include_srcs(include_srcs);
conf
};

View File

@ -34,11 +34,6 @@ mononoke_types_thrift mononoke_types_thrift",
conf.options(options);
}
let include_srcs = vec![
];
conf.include_srcs(include_srcs);
conf
};

View File

@ -33,11 +33,6 @@ mononoke_types_thrift mononoke_types_thrift",
conf.options(options);
}
let include_srcs = vec![
];
conf.include_srcs(include_srcs);
conf
};

View File

@ -34,11 +34,6 @@ source_control source_control",
conf.options(options);
}
let include_srcs = vec![
];
conf.include_srcs(include_srcs);
conf
};

View File

@ -33,11 +33,6 @@ mononoke_types_thrift mononoke_types_thrift",
conf.options(options);
}
let include_srcs = vec![
];
conf.include_srcs(include_srcs);
conf
};

View File

@ -34,11 +34,6 @@ mononoke_types_thrift mononoke_types_thrift",
conf.options(options);
}
let include_srcs = vec![
];
conf.include_srcs(include_srcs);
conf
};

View File

@ -195,6 +195,13 @@ impl Mononoke {
self.repos.values()
}
pub fn repo_name_from_id(&self, repo_id: RepositoryId) -> Option<&String> {
match self.repos_by_ids.get(&repo_id) {
None => None,
Some(repo) => Some(&repo.name()),
}
}
/// Report configured monitoring stats
pub async fn report_monitoring_stats(&self, ctx: &CoreContext) -> Result<(), MononokeError> {
for (_, repo) in self.repos.iter() {

View File

@ -32,11 +32,6 @@ fn main() {
conf.options(options);
}
let include_srcs = vec![
];
conf.include_srcs(include_srcs);
conf
};

View File

@ -12,6 +12,9 @@ mod sql_queries;
#[cfg(test)]
mod test;
use anyhow::Result;
use async_trait::async_trait;
use context::CoreContext;
use mononoke_types::{ChangesetId, RepositoryId};
use pushrebase_hook::PushrebaseHook;
@ -40,7 +43,13 @@ impl PushrebaseMutationMappingEntry {
}
}
#[async_trait]
#[facet::facet]
pub trait PushrebaseMutationMapping: Send + Sync {
fn get_hook(&self) -> Option<Box<dyn PushrebaseHook>>;
async fn get_prepushrebase_ids(
&self,
ctx: &CoreContext,
successor_bcs_id: ChangesetId,
) -> Result<Vec<ChangesetId>>;
}

View File

@ -6,6 +6,8 @@
*/
use anyhow::Result;
use async_trait::async_trait;
use context::{CoreContext, PerfCounterType};
use mononoke_types::{ChangesetId, RepositoryId};
use pushrebase_hook::PushrebaseHook;
use sql::{queries, Connection, Transaction};
@ -62,8 +64,6 @@ pub async fn add_pushrebase_mapping(
Ok(transaction)
}
// This is only used in tests thus it is unnecessary to keep a SQL connection
// in the mapping. We can just pass the connection to the function.
pub async fn get_prepushrebase_ids(
connection: &Connection,
repo_id: RepositoryId,
@ -76,20 +76,45 @@ pub async fn get_prepushrebase_ids(
pub struct SqlPushrebaseMutationMapping {
repo_id: RepositoryId,
sql_conn: SqlPushrebaseMutationMappingConnection,
}
impl SqlPushrebaseMutationMapping {
pub fn new(repo_id: RepositoryId, _sql_conn: SqlPushrebaseMutationMappingConnection) -> Self {
Self { repo_id }
pub fn new(repo_id: RepositoryId, sql_conn: SqlPushrebaseMutationMappingConnection) -> Self {
Self { repo_id, sql_conn }
}
}
pub struct SqlPushrebaseMutationMappingConnection {}
#[derive(Clone)]
pub struct SqlPushrebaseMutationMappingConnection {
write_connection: Connection,
read_connection: Connection,
read_master_connection: Connection,
}
impl SqlPushrebaseMutationMappingConnection {
pub fn with_repo_id(self, repo_id: RepositoryId) -> SqlPushrebaseMutationMapping {
SqlPushrebaseMutationMapping::new(repo_id, self)
}
async fn get_prepushrebase_ids(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
successor_bcs_id: ChangesetId,
) -> Result<Vec<ChangesetId>> {
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsReplica);
let mut ids =
get_prepushrebase_ids(&self.read_connection, repo_id, successor_bcs_id).await?;
if ids.is_empty() {
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsMaster);
ids = get_prepushrebase_ids(&self.read_master_connection, repo_id, successor_bcs_id)
.await?;
}
Ok(ids)
}
}
impl SqlConstruct for SqlPushrebaseMutationMappingConnection {
@ -100,13 +125,18 @@ impl SqlConstruct for SqlPushrebaseMutationMappingConnection {
// We don't need the connections because we never use them.
// But we need SqlConstruct to get our SQL tables created in tests.
fn from_sql_connections(_connections: SqlConnections) -> Self {
Self {}
fn from_sql_connections(connections: SqlConnections) -> Self {
Self {
write_connection: connections.write_connection,
read_connection: connections.read_connection,
read_master_connection: connections.read_master_connection,
}
}
}
impl SqlConstructFromMetadataDatabaseConfig for SqlPushrebaseMutationMappingConnection {}
#[async_trait]
impl PushrebaseMutationMapping for SqlPushrebaseMutationMapping {
fn get_hook(&self) -> Option<Box<dyn PushrebaseHook>> {
if tunables().get_disable_save_mapping_pushrebase_hook() {
@ -115,4 +145,14 @@ impl PushrebaseMutationMapping for SqlPushrebaseMutationMapping {
Some(SaveMappingPushrebaseHook::new(self.repo_id))
}
}
async fn get_prepushrebase_ids(
&self,
ctx: &CoreContext,
successor_bcs_id: ChangesetId,
) -> Result<Vec<ChangesetId>> {
self.sql_conn
.get_prepushrebase_ids(ctx, self.repo_id, successor_bcs_id)
.await
}
}

View File

@ -33,11 +33,6 @@ skiplist crate",
conf.options(options);
}
let include_srcs = vec![
];
conf.include_srcs(include_srcs);
conf
};

View File

@ -0,0 +1,130 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use std::collections::BTreeMap;
use std::io::Write;
use anyhow::{bail, Result};
use clap::{App, AppSettings, ArgMatches, SubCommand};
use futures::stream::{self, StreamExt, TryStreamExt};
use serde_derive::Serialize;
use source_control::types as thrift;
use crate::args::commit_id::{
add_commit_id_args, add_scheme_args, get_commit_id, get_request_schemes, get_schemes,
map_commit_id, map_commit_ids, resolve_commit_id,
};
use crate::args::repo::{add_repo_args, get_repo_specifier};
use crate::connection::Connection;
use crate::lib::commit_id::render_commit_id;
use crate::render::{Render, RenderStream};
pub(super) const NAME: &str = "lookup-pushrebase-history";
pub(super) fn make_subcommand<'a, 'b>() -> App<'a, 'b> {
let cmd = SubCommand::with_name(NAME)
.about("Find pushrebase history for a public commit by traversing mappings")
.setting(AppSettings::ColoredHelp);
let cmd = add_repo_args(cmd);
let cmd = add_scheme_args(cmd);
let cmd = add_commit_id_args(cmd);
cmd
}
#[derive(Serialize)]
struct CommitLookupOutput {
repo_name: String,
#[serde(skip)]
requested: String,
exists: bool,
ids: BTreeMap<String, String>,
}
impl Render for CommitLookupOutput {
fn render(&self, matches: &ArgMatches, w: &mut dyn Write) -> Result<()> {
if self.exists {
write!(w, "repo={}\n", self.repo_name)?;
let schemes = get_schemes(matches);
render_commit_id(None, "\n", &self.requested, &self.ids, &schemes, w)?;
write!(w, "\n")?;
} else {
bail!(
"{} does not exist in repo {}\n",
self.requested,
self.repo_name
);
}
Ok(())
}
fn render_json(&self, _matches: &ArgMatches, w: &mut dyn Write) -> Result<()> {
Ok(serde_json::to_writer(w, self)?)
}
}
#[derive(Serialize)]
struct PushrebaseLookupOutput {
commit_lookups: Vec<CommitLookupOutput>,
}
impl Render for PushrebaseLookupOutput {
fn render(&self, matches: &ArgMatches, w: &mut dyn Write) -> Result<()> {
for (i, commit) in self.commit_lookups.iter().enumerate() {
if i > 0 {
write!(w, "--\n")?;
}
commit.render(matches, w)?;
}
Ok(())
}
fn render_json(&self, _matches: &ArgMatches, w: &mut dyn Write) -> Result<()> {
Ok(serde_json::to_writer(w, self)?)
}
}
pub(super) async fn run(matches: &ArgMatches<'_>, connection: Connection) -> Result<RenderStream> {
let repo = get_repo_specifier(matches).expect("repository is required");
let commit_id = get_commit_id(matches)?;
let id = resolve_commit_id(&connection, &repo, &commit_id).await?;
let commit = thrift::CommitSpecifier { repo, id };
let pushrebase_history = connection
.commit_lookup_pushrebase_history(&commit, &thrift::CommitLookupPushrebaseHistoryParams {})
.await?;
let lookup_params = thrift::CommitLookupParams {
identity_schemes: get_request_schemes(&matches),
};
let commit_lookups: Vec<_> = stream::iter(pushrebase_history.history.clone())
.map(|commit| connection.commit_lookup(&commit, &lookup_params))
.buffered(10)
.try_collect()
.await?;
let commit_lookups: Vec<_> = pushrebase_history
.history
.into_iter()
.zip(commit_lookups)
.filter_map(|(commit, commit_lookup)| {
let ids = match &commit_lookup.ids {
Some(ids) => map_commit_ids(ids.values()),
None => BTreeMap::new(),
};
if let Some((_, id)) = map_commit_id(&commit.id) {
Some(CommitLookupOutput {
repo_name: commit.repo.name,
requested: id,
exists: commit_lookup.exists,
ids,
})
} else {
None
}
})
.collect();
let output = Box::new(PushrebaseLookupOutput { commit_lookups });
Ok(stream::once(async move { Ok(output as Box<dyn Render>) }).boxed())
}

View File

@ -73,6 +73,7 @@ commands! {
mod repos;
mod blame;
mod xrepo_lookup;
mod lookup_pushrebase_history;
}
#[derive(Copy, Clone, Debug)]

View File

@ -680,6 +680,9 @@ struct CommitLookupParams {
1: set<CommitIdentityScheme> identity_schemes;
}
struct CommitLookupPushrebaseHistoryParams {
}
struct CommitInfoParams {
// Commit identity schemes to return.
1: set<CommitIdentityScheme> identity_schemes;
@ -1156,6 +1159,12 @@ struct CommitLookupResponse {
2: optional map<CommitIdentityScheme, CommitId> ids;
}
struct CommitLookupPushrebaseHistoryResponse {
1: list<CommitSpecifier> history;
// Always equals to the last element of history
2: CommitSpecifier origin;
}
struct CommitFindFilesResponse {
// The files that match.
1: list<string> files;
@ -1483,6 +1492,27 @@ service SourceControlService extends fb303.FacebookService {
2: CommitLookupParams params,
) throws (1: RequestError request_error, 2: InternalError internal_error);
// Look up commit history over Pushrebase mutations. It finishes on commit
// version that was originally pushed. Provided commit must be public.
//
// Currently attempts to traverse over commit sync and pushrebase mappings.
// Returns an error if there is ambiguity in any mapping but this is not
// expected to ever happen.
//
// Method may also theoretically return inaccurate results because of the
// way commit sync mapping currently works. "Inaccurate" means it may
// go in the wrong direction and return a commit that was a successor
// of the given one. This should be fixed soon. In the meantime you can
// consider using full history from the response if you are fine with a set
// of origin "candidates" rather than the exact one which may be incorrect.
//
// NOTE: returns commit specifiers with bonsai hashes. Use commit_lookup
// on specifiers to obtain hashes in needed schemes.
CommitLookupPushrebaseHistoryResponse commit_lookup_pushrebase_history(
1: CommitSpecifier commit,
2: CommitLookupPushrebaseHistoryParams params,
) throws (1: RequestError request_error, 2: InternalError internal_error);
// Get commit info.
CommitInfo commit_info(
1: CommitSpecifier commit,

View File

@ -34,11 +34,6 @@ source_control crate",
conf.options(options);
}
let include_srcs = vec![
];
conf.include_srcs(include_srcs);
conf
};

View File

@ -190,6 +190,7 @@ impl_into_thrift_error!(service::RepoStackInfoExn);
impl_into_thrift_error!(service::CommitCommonBaseWithExn);
impl_into_thrift_error!(service::CommitFileDiffsExn);
impl_into_thrift_error!(service::CommitLookupExn);
impl_into_thrift_error!(service::CommitLookupPushrebaseHistoryExn);
impl_into_thrift_error!(service::CommitInfoExn);
impl_into_thrift_error!(service::CommitCompareExn);
impl_into_thrift_error!(service::CommitIsAncestorOfExn);

View File

@ -0,0 +1,234 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use std::iter::once;
use std::sync::Arc;
use context::CoreContext;
use futures::compat::Future01CompatExt;
use metaconfig_types::CommitSyncConfig;
use mononoke_api::{Mononoke, RepoContext};
use mononoke_types::ChangesetId;
use source_control as thrift;
use crate::errors;
use crate::source_control_impl::SourceControlServiceImpl;
#[derive(Debug, Clone)]
struct RepoChangeset(String, ChangesetId);
impl RepoChangeset {
fn into_thrift(self) -> thrift::CommitSpecifier {
let RepoChangeset(name, cs_id) = self;
thrift::CommitSpecifier {
repo: thrift::RepoSpecifier { name },
id: thrift::CommitId::bonsai(cs_id.as_ref().to_vec()),
}
}
}
// Struct that represents current pushrebase history. Provides a simple
// interface which allows extending the history by looking into different
// mappings.
struct RepoChangesetsPushrebaseHistory {
ctx: CoreContext,
mononoke: Arc<Mononoke>,
head: RepoChangeset,
changesets: Vec<RepoChangeset>,
}
impl RepoChangesetsPushrebaseHistory {
fn new(
ctx: CoreContext,
mononoke: Arc<Mononoke>,
repo_name: String,
changeset: ChangesetId,
) -> Self {
Self {
ctx,
mononoke,
head: RepoChangeset(repo_name, changeset),
changesets: vec![],
}
}
fn last(&self) -> RepoChangeset {
match self.changesets.last() {
Some(rc) => rc.clone(),
None => self.head.clone(),
}
}
async fn repo(&self, repo_name: &String) -> Result<RepoContext, errors::ServiceError> {
let repo = self
.mononoke
.repo(self.ctx.clone(), repo_name)
.await?
.ok_or(errors::repo_not_found(repo_name.clone()))?;
Ok(repo)
}
async fn ensure_head_is_public(&self) -> Result<(), errors::ServiceError> {
let RepoChangeset(repo_name, bcs_id) = &self.head;
let repo = self.repo(repo_name).await?;
let is_public = repo
.blob_repo()
.get_phases()
.get_public(
self.ctx.clone(),
vec![*bcs_id],
true, /* ephemeral_derive */
)
.await
.map_err(errors::internal_error)?
.contains(bcs_id);
if !is_public {
return Err(errors::invalid_request(format!(
"changeset {} is not public, and only public commits could be pushrebased",
bcs_id,
))
.into());
}
Ok(())
}
async fn try_traverse_pushrebase(&mut self) -> Result<bool, errors::ServiceError> {
let RepoChangeset(repo_name, bcs_id) = self.last();
let repo = self.repo(&repo_name).await?;
let bcs_ids = repo
.blob_repo()
.pushrebase_mutation_mapping()
.get_prepushrebase_ids(&self.ctx, bcs_id)
.await
.map_err(errors::internal_error)?;
let mut iter = bcs_ids.iter();
match (iter.next(), iter.next()) {
(None, _) => Ok(false),
(Some(bcs_id), None) => {
self.changesets
.push(RepoChangeset(repo_name.clone(), *bcs_id));
Ok(true)
}
(Some(_), Some(_)) => Err(errors::internal_error(format!(
"pushrebase mapping is ambiguous in repo {} for {}: {:?} (expected only one)",
repo_name, bcs_id, bcs_ids,
))
.into()),
}
}
async fn try_traverse_commit_sync(&mut self) -> Result<bool, errors::ServiceError> {
let RepoChangeset(repo_name, bcs_id) = self.last();
let repo = self.repo(&repo_name).await?;
let maybe_commit_sync_config = repo
.live_commit_sync_config()
.get_current_commit_sync_config_if_exists(&self.ctx, repo.repoid())
.await
.map_err(errors::internal_error)?;
if let Some(config) = maybe_commit_sync_config {
self.try_traverse_commit_sync_inner(repo, bcs_id, config)
.await
} else {
Ok(false)
}
}
// Helper function that wraps traversing logic from try_traverse_commit_sync.
// Only needed for better code readability
async fn try_traverse_commit_sync_inner(
&mut self,
repo: RepoContext,
bcs_id: ChangesetId,
config: CommitSyncConfig,
) -> Result<bool, errors::ServiceError> {
let mut synced_changesets = vec![];
let target_repo_ids = if config.large_repo_id == repo.repoid() {
config.small_repos.keys().copied().collect()
} else {
vec![config.large_repo_id]
};
for target_repo_id in target_repo_ids.into_iter() {
let entries = repo
.synced_commit_mapping()
.get(self.ctx.clone(), repo.repoid(), bcs_id, target_repo_id)
.compat()
.await
.map_err(errors::internal_error)?;
if let Some(target_repo_name) = self.mononoke.repo_name_from_id(target_repo_id) {
synced_changesets.extend(
entries
.into_iter()
.map(|(cs, _)| RepoChangeset(target_repo_name.clone(), cs)),
);
}
}
let mut iter = synced_changesets.iter();
match (iter.next(), iter.next()) {
(None, _) => Ok(false),
(Some(rc), None) => {
self.changesets.push(rc.clone());
Ok(true)
}
(Some(_), Some(_)) => Err(errors::internal_error(format!(
"commit sync mapping is ambiguous in repo {} for {}: {:?} (expected only one)",
repo.name(),
bcs_id,
synced_changesets,
))
.into()),
}
}
fn into_thrift(self) -> thrift::CommitLookupPushrebaseHistoryResponse {
let origin = self.last().into_thrift();
let history: Vec<_> = once(self.head)
.chain(self.changesets.into_iter())
.map(|rc| rc.into_thrift())
.collect();
thrift::CommitLookupPushrebaseHistoryResponse { history, origin }
}
}
impl SourceControlServiceImpl {
// Look up commit history over Pushrebase mutations
pub(crate) async fn commit_lookup_pushrebase_history(
&self,
ctx: CoreContext,
commit: thrift::CommitSpecifier,
_params: thrift::CommitLookupPushrebaseHistoryParams,
) -> Result<thrift::CommitLookupPushrebaseHistoryResponse, errors::ServiceError> {
let (repo, changeset) = self.repo_changeset(ctx.clone(), &commit).await?;
let mut history = RepoChangesetsPushrebaseHistory::new(
ctx,
self.mononoke.clone(),
repo.name().to_string(),
changeset.id(),
);
// Ensure that commit is public
history.ensure_head_is_public().await?;
let mut pushrebased = false;
if history.try_traverse_pushrebase().await? {
pushrebased = true;
} else {
if history.try_traverse_commit_sync().await? {
pushrebased = history.try_traverse_pushrebase().await?;
}
}
if pushrebased {
history.try_traverse_commit_sync().await?;
}
Ok(history.into_thrift())
}
}

View File

@ -12,6 +12,7 @@ use crate::errors;
use crate::source_control_impl::SourceControlServiceImpl;
pub(crate) mod commit;
pub(crate) mod commit_lookup_pushrebase_history;
pub(crate) mod commit_path;
pub(crate) mod file;
pub(crate) mod megarepo;

View File

@ -199,6 +199,8 @@ impl AddScubaParams for thrift::CommitLookupParams {
}
}
impl AddScubaParams for thrift::CommitLookupPushrebaseHistoryParams {}
impl AddScubaParams for thrift::CommitHistoryParams {
fn add_scuba_params(&self, scuba: &mut MononokeScubaSampleBuilder) {
scuba.add("param_format", self.format.to_string());

View File

@ -54,6 +54,8 @@ impl AddScubaResponse for thrift::CommitInfo {}
impl AddScubaResponse for thrift::CommitLookupResponse {}
impl AddScubaResponse for thrift::CommitLookupPushrebaseHistoryResponse {}
impl AddScubaResponse for thrift::CommitHistoryResponse {}
impl AddScubaResponse for thrift::CommitListDescendantBookmarksResponse {}

View File

@ -522,6 +522,11 @@ impl SourceControlService for SourceControlServiceThriftImpl {
params: thrift::CommitLookupParams,
) -> Result<thrift::CommitLookupResponse, service::CommitLookupExn>;
async fn commit_lookup_pushrebase_history(
commit: thrift::CommitSpecifier,
params: thrift::CommitLookupPushrebaseHistoryParams,
) -> Result<thrift::CommitLookupPushrebaseHistoryResponse, service::CommitLookupPushrebaseHistoryExn>;
async fn commit_file_diffs(
commit: thrift::CommitSpecifier,
params: thrift::CommitFileDiffsParams,