pass job type to seed heads configuration

Summary:
We want to be able to return different heads for segmented changelog for
backgorund jobs vs live server. The seedheads function needs to be aware of
that distinction.

For now all SC abstraction generated by repo factor will be for live jobs
because they're used there (the tailer and repo imported construct the SC
explicitly).

Reviewed By: farnz

Differential Revision: D35934779

fbshipit-source-id: 732027d1db5e5f31237879dded3eed6d651edbe9
This commit is contained in:
Mateusz Kwapich 2022-04-28 04:02:41 -07:00 committed by Facebook GitHub Bot
parent cdb7ed47c5
commit 2235e09960
5 changed files with 27 additions and 12 deletions

View File

@ -24,7 +24,7 @@ use cmdlib::{
};
use context::{CoreContext, SessionContainer};
use fbinit::FacebookInit;
use segmented_changelog::{seedheads_from_config, SegmentedChangelogTailer};
use segmented_changelog::{self, seedheads_from_config, SegmentedChangelogTailer};
const ONCE_ARG: &str = "once";
const REPO_ARG: &str = "repo";
@ -169,7 +169,11 @@ async fn run<'a>(ctx: CoreContext, matches: &'a MononokeMatches<'a>) -> Result<(
let head_args = matches.values_of(HEAD_ARG);
let head_args_len = head_args.as_ref().map_or(0, |a| a.len());
let mut heads = if head_args.is_none() || matches.is_present(CONFIG_HEADS_ARG) {
let mut heads = seedheads_from_config(&ctx, &config.segmented_changelog_config)?;
let mut heads = seedheads_from_config(
&ctx,
&config.segmented_changelog_config,
segmented_changelog::JobType::Background,
)?;
heads.reserve(head_args_len);
heads
} else {

View File

@ -45,7 +45,7 @@ use movers::{DefaultAction, Mover};
use pushrebase::do_pushrebase_bonsai;
use question::{Answer, Question};
use repo_blobstore::RepoBlobstoreRef;
use segmented_changelog::{seedheads_from_config, SeedHead, SegmentedChangelogTailer};
use segmented_changelog::{self, seedheads_from_config, SeedHead, SegmentedChangelogTailer};
use serde::{Deserialize, Serialize};
use serde_json;
use slog::info;
@ -1274,7 +1274,11 @@ async fn tail_segmented_changelog(
mysql_options: &MysqlOptions,
segmented_changelog_config: &SegmentedChangelogConfig,
) -> Result<(), Error> {
let mut seed_heads = seedheads_from_config(&ctx, &segmented_changelog_config)?;
let mut seed_heads = seedheads_from_config(
ctx,
segmented_changelog_config,
segmented_changelog::JobType::Background,
)?;
seed_heads.push(SeedHead::from(imported_cs_id));
let segmented_changelog_tailer = SegmentedChangelogTailer::build_from(

View File

@ -27,7 +27,7 @@ use crate::on_demand::OnDemandUpdateSegmentedChangelog;
use crate::periodic_reload::PeriodicReloadSegmentedChangelog;
use crate::version_store::SegmentedChangelogVersionStore;
use crate::{
seedheads_from_config, CloneHints, DisabledSegmentedChangelog, InProcessIdDag,
seedheads_from_config, CloneHints, DisabledSegmentedChangelog, InProcessIdDag, JobType,
SegmentedChangelog,
};
@ -56,8 +56,8 @@ pub fn new_test_segmented_changelog(
if !config.enabled {
return Ok(Arc::new(DisabledSegmentedChangelog::new()));
}
let seed_heads =
seedheads_from_config(&ctx, config).context("finding segmented changelog heads")?;
let seed_heads = seedheads_from_config(&ctx, config, JobType::Server)
.context("finding segmented changelog heads")?;
Ok(Arc::new(OnDemandUpdateSegmentedChangelog::new(
ctx,
repo_id,
@ -82,8 +82,8 @@ pub async fn new_server_segmented_changelog_manager<'a>(
cache_pool: Option<cachelib::VolatileLruCachePool>,
) -> Result<SegmentedChangelogManager> {
let repo_id = repo_identity.id();
let seed_heads =
seedheads_from_config(ctx, &config).context("finding segmented changelog heads")?;
let seed_heads = seedheads_from_config(ctx, &config, JobType::Server)
.context("finding segmented changelog heads")?;
let replica_lag_monitor = Arc::new(NoReplicaLagMonitor());
let mut idmap_factory = IdMapFactory::new(connections.0.clone(), replica_lag_monitor, repo_id);
if let Some(pool) = cache_pool {
@ -124,8 +124,8 @@ pub async fn new_server_segmented_changelog<'a>(
}
if config.skip_dag_load_at_startup {
let repo_id = repo_identity.id();
let seed_heads =
seedheads_from_config(ctx, &config).context("finding segmented changelog heads")?;
let seed_heads = seedheads_from_config(ctx, &config, JobType::Server)
.context("finding segmented changelog heads")?;
// This is a special case. We build Segmented Changelog using an in process iddag and idmap
// and update then on demand.
// All other configuration is ignored, for example there won't be periodic updates

View File

@ -54,7 +54,7 @@ pub use crate::clone_hints::CloneHints;
pub use crate::copy::copy_segmented_changelog;
pub use crate::manager::{ArcSegmentedChangelogManager, SegmentedChangelogManager};
pub use crate::tailer::SegmentedChangelogTailer;
pub use crate::update::{seedheads_from_config, SeedHead};
pub use crate::update::{seedheads_from_config, JobType, SeedHead};
// public for benchmarking
pub use crate::idmap::{ConcurrentMemIdMap, IdMap};

View File

@ -52,9 +52,16 @@ impl IntoVertexList for SeedHead {
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum JobType {
Background,
Server,
}
pub fn seedheads_from_config(
ctx: &CoreContext,
config: &SegmentedChangelogConfig,
_job_type: JobType,
) -> Result<Vec<SeedHead>> {
let head = config
.master_bookmark