Pass CoreContext, make async [2/n]

Summary: Small API change to simplify / split the rest of the stack.

Reviewed By: RajivTS

Differential Revision: D59328461

fbshipit-source-id: 60b195b5f92e3f82082261345083a679a27c4578
This commit is contained in:
Andrea Campi 2024-07-04 03:31:22 -07:00 committed by Facebook GitHub Bot
parent a6f269c2ff
commit b20ce36885
19 changed files with 186 additions and 66 deletions

View File

@ -169,7 +169,8 @@ impl<'op> PushrebaseOntoBookmarkOp<'op> {
self.bookmark,
&repo.repo_config().pushrebase,
None,
)?;
)
.await?;
// For pushrebase, we check the repo lock once at the beginning of the
// pushrebase operation, and then once more as part of the pushrebase

View File

@ -140,7 +140,7 @@ pub(crate) async fn ensure_ancestor_of(
}
pub async fn check_bookmark_sync_config(
_ctx: &CoreContext,
ctx: &CoreContext,
repo: &(impl RepoIdentityRef + RepoCrossRepoRef),
bookmark: &BookmarkKey,
kind: BookmarkKind,
@ -150,7 +150,8 @@ pub async fn check_bookmark_sync_config(
if repo
.repo_cross_repo()
.live_commit_sync_config()
.push_redirector_enabled_for_public(repo.repo_identity().id())
.push_redirector_enabled_for_public(ctx, repo.repo_identity().id())
.await
{
return Err(BookmarkMovementError::PushRedirectorEnabledForPublishing {
bookmark: bookmark.clone(),
@ -161,7 +162,8 @@ pub async fn check_bookmark_sync_config(
if repo
.repo_cross_repo()
.live_commit_sync_config()
.push_redirector_enabled_for_draft(repo.repo_identity().id())
.push_redirector_enabled_for_draft(ctx, repo.repo_identity().id())
.await
{
return Err(BookmarkMovementError::PushRedirectorEnabledForScratch {
bookmark: bookmark.clone(),

View File

@ -310,9 +310,13 @@ async fn run_pushredirection_subcommand<'a>(
)
.await?;
if live_commit_sync_config.push_redirector_enabled_for_public(
commit_syncer.get_small_repo().repo_identity().id(),
) {
if live_commit_sync_config
.push_redirector_enabled_for_public(
&ctx,
commit_syncer.get_small_repo().repo_identity().id(),
)
.await
{
return Err(format_err!(
"not allowed to run {} if pushredirection is enabled",
PREPARE_ROLLOUT_SUBCOMMAND
@ -377,9 +381,13 @@ async fn run_pushredirection_subcommand<'a>(
return Ok(());
}
if live_commit_sync_config.push_redirector_enabled_for_public(
commit_syncer.get_small_repo().repo_identity().id(),
) {
if live_commit_sync_config
.push_redirector_enabled_for_public(
&ctx,
commit_syncer.get_small_repo().repo_identity().id(),
)
.await
{
return Err(format_err!(
"not allowed to run {} if pushredirection is enabled",
CHANGE_MAPPING_VERSION_SUBCOMMAND
@ -486,7 +494,11 @@ async fn change_mapping_via_extras<'a>(
) -> Result<(), Error> {
// XXX(mitrandir): remove this check once this mode works regardless of sync direction
if !live_commit_sync_config
.push_redirector_enabled_for_public(commit_syncer.get_small_repo().repo_identity().id())
.push_redirector_enabled_for_public(
ctx,
commit_syncer.get_small_repo().repo_identity().id(),
)
.await
&& std::env::var("MONONOKE_ADMIN_ALWAYS_ALLOW_MAPPING_CHANGE_VIA_EXTRA").is_err()
{
return Err(format_err!(
@ -545,7 +557,8 @@ async fn change_mapping_via_extras<'a>(
&large_bookmark,
&repo_config.pushrebase,
None,
)?;
)
.await?;
let bcs = large_cs_id
.load(ctx, &large_repo.repo_blobstore().clone())

View File

@ -319,7 +319,9 @@ where
}
// We only care about public pushes because draft pushes are not in the bookmark
// update log at all.
let enabled = live_commit_sync_config.push_redirector_enabled_for_public(target_repo_id);
let enabled = live_commit_sync_config
.push_redirector_enabled_for_public(&ctx, target_repo_id)
.await;
if enabled {
let delay = calculate_delay(&ctx, &commit_syncer, &target_repo_dbs).await?;

View File

@ -150,6 +150,7 @@ impl<'a, R: Repo> CommitInMemorySyncer<'a, R> {
// TODO(T182311609): add docs
pub(crate) async fn unsafe_sync_commit_in_memory(
self,
ctx: &CoreContext,
cs: BonsaiChangeset,
commit_sync_context: CommitSyncContext,
expected_version: Option<CommitSyncConfigVersion>,
@ -159,14 +160,20 @@ impl<'a, R: Repo> CommitInMemorySyncer<'a, R> {
)?;
let commit_rewritten_to_empty = self
.get_empty_rewritten_commit_action(&maybe_mapping_change_version, commit_sync_context);
.get_empty_rewritten_commit_action(
ctx,
&maybe_mapping_change_version,
commit_sync_context,
)
.await;
// We are using the state of pushredirection to determine which repo is "source of truth" for the contents
// if it's the small repo we can't be rewriting the "mapping change" commits as even if we
// do they won't be synced back.
let pushredirection_disabled = !self
.live_commit_sync_config
.push_redirector_enabled_for_public(self.target_repo_id.0);
.push_redirector_enabled_for_public(ctx, self.target_repo_id.0)
.await;
// During backsyncing we provide an option to skip empty commits but we
// can only do that when they're not changing the mapping.
@ -205,7 +212,7 @@ impl<'a, R: Repo> CommitInMemorySyncer<'a, R> {
.await
} else {
// Syncing merge doesn't take rewrite_opts because merges are always rewritten.
self.sync_merge_in_memory(cs, commit_sync_context, expected_version)
self.sync_merge_in_memory(ctx, cs, commit_sync_context, expected_version)
.await
}
}
@ -418,6 +425,7 @@ impl<'a, R: Repo> CommitInMemorySyncer<'a, R> {
/// source of truth otherwise it would break forward syncer.
async fn sync_merge_in_memory(
self,
ctx: &CoreContext,
cs: BonsaiChangeset,
commit_sync_context: CommitSyncContext,
expected_version: Option<CommitSyncConfigVersion>,
@ -543,7 +551,8 @@ impl<'a, R: Repo> CommitInMemorySyncer<'a, R> {
let is_backsync_when_small_is_source_of_truth = !self.small_to_large
&& !self
.live_commit_sync_config
.push_redirector_enabled_for_public(self.target_repo_id.0);
.push_redirector_enabled_for_public(ctx, self.target_repo_id.0)
.await;
let rewrite_res = rewrite_commit(
self.ctx,
cs,
@ -619,14 +628,16 @@ impl<'a, R: Repo> CommitInMemorySyncer<'a, R> {
/// Determine what should happen to commits that would be empty when synced
/// to the target repo.
fn get_empty_rewritten_commit_action(
async fn get_empty_rewritten_commit_action(
&self,
ctx: &CoreContext,
maybe_mapping_change_version: &Option<CommitSyncConfigVersion>,
commit_sync_context: CommitSyncContext,
) -> CommitRewrittenToEmpty {
let pushredirection_disabled = !self
.live_commit_sync_config
.push_redirector_enabled_for_public(self.target_repo_id.0);
.push_redirector_enabled_for_public(ctx, self.target_repo_id.0)
.await;
// If a commit is changing mapping let's always rewrite it to
// small repo regardless if outcome is empty. This is to ensure
// that efter changing mapping there's a commit in small repo

View File

@ -851,7 +851,7 @@ where
submodule_deps,
large_repo: large_in_memory_repo,
}
.unsafe_sync_commit_in_memory(cs, commit_sync_context, expected_version)
.unsafe_sync_commit_in_memory(ctx, cs, commit_sync_context, expected_version)
.await?
.write(ctx, self)
.await
@ -1134,7 +1134,8 @@ where
large_repo_id: self.repos.get_target_repo().repo_identity().id(),
version_name: version.clone(),
}),
)?;
)
.await?;
debug!(ctx.logger(), "Starting pushrebase...");
let pushrebase_res = do_pushrebase_bonsai(

View File

@ -16,6 +16,7 @@ anyhow = "1.0.75"
async-trait = "0.1.71"
cached_config = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
commitsync = { version = "0.1.0", path = "../../../../configerator/structs/scm/mononoke/repos/commitsync" }
context = { version = "0.1.0", path = "../../server/context" }
metaconfig_parser = { version = "0.1.0", path = "../../metaconfig/parser" }
metaconfig_types = { version = "0.1.0", path = "../../metaconfig/types" }
mononoke_types = { version = "0.1.0", path = "../../mononoke_types" }
@ -24,7 +25,6 @@ slog = { version = "2.7", features = ["max_level_trace", "nested-values"] }
thiserror = "1.0.49"
[dev-dependencies]
context = { version = "0.1.0", path = "../../server/context" }
fbinit = { version = "0.1.2", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
fbinit-tokio = { version = "0.1.2", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
pretty_assertions = { version = "1.2", features = ["alloc"], default-features = false }

View File

@ -18,6 +18,7 @@ rust_library(
"//eden/mononoke/metaconfig:metaconfig_parser",
"//eden/mononoke/metaconfig:metaconfig_types",
"//eden/mononoke/mononoke_types:mononoke_types",
"//eden/mononoke/server/context:context",
],
)

View File

@ -17,6 +17,7 @@ use cached_config::ConfigHandle;
use cached_config::ConfigStore;
use commitsync::RawCommitSyncAllVersions;
use commitsync::RawCommitSyncConfigAllVersionsOneRepo;
use context::CoreContext;
use metaconfig_parser::Convert;
use metaconfig_types::CommitSyncConfig;
use metaconfig_types::CommitSyncConfigVersion;
@ -61,14 +62,22 @@ pub trait LiveCommitSyncConfig: Send + Sync {
///
/// NOTE: two subsequent calls may return different results
/// as this queries config source
fn push_redirector_enabled_for_draft(&self, repo_id: RepositoryId) -> bool;
async fn push_redirector_enabled_for_draft(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
) -> bool;
/// Return whether push redirection is currently
/// enabled for public commits in `repo_id`
///
/// NOTE: two subsequent calls may return different results
/// as this queries config source
fn push_redirector_enabled_for_public(&self, repo_id: RepositoryId) -> bool;
async fn push_redirector_enabled_for_public(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
) -> bool;
/// Return all historical versions of `CommitSyncConfig`
/// structs for a given repository
@ -140,8 +149,9 @@ impl CfgrLiveCommitSyncConfig {
})
}
fn get_push_redirection_repo_state(
async fn get_push_redirection_repo_state(
&self,
_ctx: &CoreContext,
repo_id: RepositoryId,
) -> Option<PushRedirectEnableState> {
let config = self.config_handle_for_push_redirection.get();
@ -167,8 +177,12 @@ impl LiveCommitSyncConfig for CfgrLiveCommitSyncConfig {
///
/// NOTE: two subsequent calls may return different results
/// as this queries config source
fn push_redirector_enabled_for_draft(&self, repo_id: RepositoryId) -> bool {
match self.get_push_redirection_repo_state(repo_id) {
async fn push_redirector_enabled_for_draft(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
) -> bool {
match self.get_push_redirection_repo_state(ctx, repo_id).await {
Some(config) => config.draft_push,
None => false,
}
@ -179,8 +193,12 @@ impl LiveCommitSyncConfig for CfgrLiveCommitSyncConfig {
///
/// NOTE: two subsequent calls may return different results
/// as this queries config source
fn push_redirector_enabled_for_public(&self, repo_id: RepositoryId) -> bool {
match self.get_push_redirection_repo_state(repo_id) {
async fn push_redirector_enabled_for_public(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
) -> bool {
match self.get_push_redirection_repo_state(ctx, repo_id).await {
Some(config) => config.public_push,
None => false,
}
@ -318,7 +336,7 @@ impl TestLiveCommitSyncConfigSource {
.insert(config.version_name.clone(), config);
}
pub fn set_draft_push_redirection_enabled(&self, repo_id: RepositoryId) {
pub fn set_draft_push_redirection_enabled(&self, _ctx: &CoreContext, repo_id: RepositoryId) {
self.0
.push_redirection_for_draft
.lock()
@ -326,7 +344,7 @@ impl TestLiveCommitSyncConfigSource {
.insert(repo_id, true);
}
pub fn set_public_push_redirection_enabled(&self, repo_id: RepositoryId) {
pub fn set_public_push_redirection_enabled(&self, _ctx: &CoreContext, repo_id: RepositoryId) {
self.0
.push_redirection_for_public
.lock()
@ -342,7 +360,11 @@ impl TestLiveCommitSyncConfigSource {
.push(config);
}
fn push_redirector_enabled_for_draft(&self, repo_id: RepositoryId) -> bool {
async fn push_redirector_enabled_for_draft(
&self,
_ctx: &CoreContext,
repo_id: RepositoryId,
) -> bool {
*self
.0
.push_redirection_for_draft
@ -352,7 +374,11 @@ impl TestLiveCommitSyncConfigSource {
.unwrap_or(&false)
}
fn push_redirector_enabled_for_public(&self, repo_id: RepositoryId) -> bool {
async fn push_redirector_enabled_for_public(
&self,
_ctx: &CoreContext,
repo_id: RepositoryId,
) -> bool {
*self
.0
.push_redirection_for_public
@ -454,12 +480,24 @@ impl TestLiveCommitSyncConfig {
#[async_trait]
impl LiveCommitSyncConfig for TestLiveCommitSyncConfig {
fn push_redirector_enabled_for_draft(&self, repo_id: RepositoryId) -> bool {
self.source.push_redirector_enabled_for_draft(repo_id)
async fn push_redirector_enabled_for_draft(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
) -> bool {
self.source
.push_redirector_enabled_for_draft(ctx, repo_id)
.await
}
fn push_redirector_enabled_for_public(&self, repo_id: RepositoryId) -> bool {
self.source.push_redirector_enabled_for_public(repo_id)
async fn push_redirector_enabled_for_public(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
) -> bool {
self.source
.push_redirector_enabled_for_public(ctx, repo_id)
.await
}
async fn get_all_commit_sync_config_versions(

View File

@ -15,9 +15,17 @@ use crate::EMTPY_COMMIT_SYNC_ALL;
#[fbinit::test]
async fn test_empty_configs(fb: FacebookInit) {
let (_ctx, _test_source, _store, live_commit_sync_config) =
let (ctx, _test_source, _store, live_commit_sync_config) =
get_ctx_source_store_and_live_config(fb, EMPTY_PUSHREDIRECTOR, EMTPY_COMMIT_SYNC_ALL);
let repo_1 = RepositoryId::new(1);
assert!(!live_commit_sync_config.push_redirector_enabled_for_draft(repo_1));
assert!(!live_commit_sync_config.push_redirector_enabled_for_public(repo_1));
assert!(
!live_commit_sync_config
.push_redirector_enabled_for_draft(&ctx, repo_1)
.await
);
assert!(
!live_commit_sync_config
.push_redirector_enabled_for_public(&ctx, repo_1)
.await
);
}

View File

@ -35,7 +35,7 @@ const PUSHREDIRECTOR_BOTH_ENABLED: &str = r#"{
#[fbinit::test]
async fn test_enabling_push_redirection(fb: FacebookInit) {
let (_ctx, test_source, _store, live_commit_sync_config) =
let (ctx, test_source, _store, live_commit_sync_config) =
get_ctx_source_store_and_live_config(fb, EMPTY_PUSHREDIRECTOR, EMTPY_COMMIT_SYNC_ALL);
let repo_1 = RepositoryId::new(1);
@ -48,8 +48,16 @@ async fn test_enabling_push_redirection(fb: FacebookInit) {
// Check that push-redirection of public commits has been picked up
ensure_all_updated();
assert!(!live_commit_sync_config.push_redirector_enabled_for_draft(repo_1));
assert!(live_commit_sync_config.push_redirector_enabled_for_public(repo_1));
assert!(
!live_commit_sync_config
.push_redirector_enabled_for_draft(&ctx, repo_1)
.await
);
assert!(
live_commit_sync_config
.push_redirector_enabled_for_public(&ctx, repo_1)
.await
);
// Enable push-redirection of public and draft commits
test_source.insert_config(
@ -60,6 +68,14 @@ async fn test_enabling_push_redirection(fb: FacebookInit) {
// Check that push-redirection of public and draft commits has been picked up
ensure_all_updated();
assert!(live_commit_sync_config.push_redirector_enabled_for_draft(repo_1));
assert!(live_commit_sync_config.push_redirector_enabled_for_public(repo_1));
assert!(
live_commit_sync_config
.push_redirector_enabled_for_draft(&ctx, repo_1)
.await
);
assert!(
live_commit_sync_config
.push_redirector_enabled_for_public(&ctx, repo_1)
.await
);
}

View File

@ -317,7 +317,8 @@ async fn push_merge_commit(
bookmark_to_merge_into,
&repo.repo_config().pushrebase,
None,
)?;
)
.await?;
let pushrebase_res = do_pushrebase_bonsai(
ctx,
repo,

View File

@ -274,8 +274,9 @@ async fn run_in_tailing_mode<M: SyncedCommitMapping + Clone + 'static>(
let scuba_sample = base_scuba_sample.clone();
// We only care about public pushes because draft pushes are not in the bookmark
// update log at all.
let enabled =
live_commit_sync_config.push_redirector_enabled_for_public(source_repo_id);
let enabled = live_commit_sync_config
.push_redirector_enabled_for_public(ctx, source_repo_id)
.await;
// Pushredirection is enabled - we need to disable forward sync in that case
if enabled {

View File

@ -302,7 +302,9 @@ async fn maybe_push_redirector(
Some(base) => base,
};
let live_commit_sync_config = repo.live_commit_sync_config();
let enabled = live_commit_sync_config.push_redirector_enabled_for_public(repo.repoid());
let enabled = live_commit_sync_config
.push_redirector_enabled_for_public(ctx, repo.repoid())
.await;
if enabled {
let large_repo_id = base.common_commit_sync_config.large_repo_id;
let large_repo = repos.get_by_id(large_repo_id.id()).ok_or_else(|| {

View File

@ -50,7 +50,7 @@ pub enum PushrebaseHooksError {
/// Get a Vec of the relevant pushrebase hooks for PushrebaseParams, using this repo when
/// required by those hooks.
pub fn get_pushrebase_hooks(
pub async fn get_pushrebase_hooks(
ctx: &CoreContext,
repo: &(
impl BonsaiGitMappingArc
@ -73,7 +73,8 @@ pub fn get_pushrebase_hooks(
// Only add hook if pushes are being redirected
repo.repo_cross_repo()
.live_commit_sync_config()
.push_redirector_enabled_for_public(small_repo_id)
.push_redirector_enabled_for_public(ctx, small_repo_id)
.await
} else {
true
};

View File

@ -871,7 +871,7 @@ impl RepoClient {
}
}
fn maybe_get_pushredirector_for_action(
async fn maybe_get_pushredirector_for_action(
&self,
ctx: &CoreContext,
action: &unbundle::PostResolveAction,
@ -893,9 +893,15 @@ impl RepoClient {
let repo_id = self.repo.blob_repo().repo_identity().id();
let redirect = match action {
InfinitePush(_) => live_commit_sync_config.push_redirector_enabled_for_draft(repo_id),
InfinitePush(_) => {
live_commit_sync_config
.push_redirector_enabled_for_draft(ctx, repo_id)
.await
}
Push(_) | PushRebase(_) | BookmarkOnlyPushRebase(_) => {
live_commit_sync_config.push_redirector_enabled_for_public(repo_id)
live_commit_sync_config
.push_redirector_enabled_for_public(ctx, repo_id)
.await
}
};
@ -1574,7 +1580,10 @@ impl HgCommands for RepoClient {
maybe_validate_pushed_bonsais(&ctx, repo.as_blob_repo(), &maybereplaydata)
.await?;
match client.maybe_get_pushredirector_for_action(&ctx, &action)? {
match client
.maybe_get_pushredirector_for_action(&ctx, &action)
.await?
{
Some(push_redirector) => {
// Push-redirection will cause
// hooks to be run in the large

View File

@ -676,7 +676,8 @@ async fn push_merge_commit(
bookmark_to_merge_into,
&repo_config.pushrebase,
None,
)?;
)
.await?;
let pushrebase_res = do_pushrebase_bonsai(
ctx,
@ -834,12 +835,15 @@ fn get_importing_bookmark(bookmark_suffix: &str) -> Result<BookmarkKey, Error> {
// Note: pushredirection only works from small repo to large repo.
async fn get_large_repo_config_if_pushredirected<'a>(
ctx: &CoreContext,
repo: &Repo,
live_commit_sync_config: &CfgrLiveCommitSyncConfig,
repos: &HashMap<String, RepoConfig>,
) -> Result<Option<RepoConfig>, Error> {
let repo_id = repo.repo_id();
let enabled = live_commit_sync_config.push_redirector_enabled_for_public(repo_id);
let enabled = live_commit_sync_config
.push_redirector_enabled_for_public(ctx, repo_id)
.await;
if enabled {
let common_commit_sync_config = match live_commit_sync_config.get_common_config(repo_id) {
@ -1099,9 +1103,13 @@ async fn repo_import(
// Check if the import target is a small repo that is pushredirected to a
// large repo. In that case we will import to the large repo and then
// backsync to the small repo.
let maybe_large_repo_config =
get_large_repo_config_if_pushredirected(&repo, &live_commit_sync_config, &configs.repos)
.await?;
let maybe_large_repo_config = get_large_repo_config_if_pushredirected(
&ctx,
&repo,
&live_commit_sync_config,
&configs.repos,
)
.await?;
let mut maybe_small_repo_back_sync_vars = None;
let mut movers = vec![movers::mover_factory(
HashMap::new(),
@ -1540,9 +1548,13 @@ async fn check_additional_setup_steps(
let live_commit_sync_config = CfgrLiveCommitSyncConfig::new(ctx.logger(), &env.config_store)?;
let maybe_large_repo_config =
get_large_repo_config_if_pushredirected(&repo, &live_commit_sync_config, &configs.repos)
.await?;
let maybe_large_repo_config = get_large_repo_config_if_pushredirected(
&ctx,
&repo,
&live_commit_sync_config,
&configs.repos,
)
.await?;
if let Some(large_repo_config) = maybe_large_repo_config {
let (large_repo, large_repo_import_setting, _syncers) = get_pushredirected_vars(
app,

View File

@ -499,7 +499,7 @@ mod tests {
insert_repo_config(0, &mut repos);
assert!(
get_large_repo_config_if_pushredirected(&repo0, &live_commit_sync_config, &repos)
get_large_repo_config_if_pushredirected(&ctx, &repo0, &live_commit_sync_config, &repos)
.await?
.is_none()
);
@ -508,7 +508,7 @@ mod tests {
insert_repo_config(1, &mut repos);
assert!(
get_large_repo_config_if_pushredirected(&repo1, &live_commit_sync_config, &repos)
get_large_repo_config_if_pushredirected(&ctx, &repo1, &live_commit_sync_config, &repos)
.await?
.is_some()
);
@ -517,7 +517,7 @@ mod tests {
insert_repo_config(2, &mut repos);
assert!(
get_large_repo_config_if_pushredirected(&repo2, &live_commit_sync_config, &repos)
get_large_repo_config_if_pushredirected(&ctx, &repo2, &live_commit_sync_config, &repos)
.await?
.is_none()
);

View File

@ -60,6 +60,7 @@ pub async fn pushrebase(
&repo.repo_config().pushrebase,
None,
)
.await
.map_err(Error::from)?;
let bonsais = stream::iter(csids)