diff --git a/eden/mononoke/cmds/segmented_changelog_seeder.rs b/eden/mononoke/cmds/segmented_changelog_seeder.rs index d8f038c3c7..8c42406927 100644 --- a/eden/mononoke/cmds/segmented_changelog_seeder.rs +++ b/eden/mononoke/cmds/segmented_changelog_seeder.rs @@ -23,7 +23,7 @@ use context::CoreContext; use fbinit::FacebookInit; use metaconfig_types::MetadataDatabaseConfig; use segmented_changelog::types::IdMapVersion; -use segmented_changelog::{SegmentedChangelogBuilder, SegmentedChangelogSqlConnections}; +use segmented_changelog::{SegmentedChangelogSeeder, SegmentedChangelogSqlConnections}; use sql_ext::facebook::MyAdmin; use sql_ext::replication::{NoReplicaLagMonitor, ReplicaLagMonitor}; @@ -111,12 +111,14 @@ async fn run<'a>(ctx: CoreContext, matches: &'a MononokeMatches<'a>) -> Result<( .await .context("error opening segmented changelog sql connections")?; - let segmented_changelog_seeder = SegmentedChangelogBuilder::new() - .with_sql_connections(segmented_changelog_sql_connections) - .with_blobrepo(&repo) - .with_replica_lag_monitor(replica_lag_monitor) - .build_seeder() - .context("building SegmentedChangelogSeeder")?; + let segmented_changelog_seeder = SegmentedChangelogSeeder::new( + repo.get_repoid(), + segmented_changelog_sql_connections, + replica_lag_monitor, + repo.get_changesets_object(), + repo.get_phases(), + Arc::new(repo.get_blobstore()), + ); info!( ctx.logger(), diff --git a/eden/mononoke/segmented_changelog/Cargo.toml b/eden/mononoke/segmented_changelog/Cargo.toml index 5bbbee9c2f..124e4f634e 100644 --- a/eden/mononoke/segmented_changelog/Cargo.toml +++ b/eden/mononoke/segmented_changelog/Cargo.toml @@ -11,7 +11,6 @@ abomonation_derive = "0.5" anyhow = "1.0" arc-swap = "1.1" async-trait = "0.1.45" -blobrepo = { version = "0.1.0", path = "../blobrepo" } blobstore = { version = "0.1.0", path = "../blobstore" } bookmarks = { version = "0.1.0", path = "../bookmarks" } bulkops = { version = "0.1.0", path = "../bulkops" } @@ -42,6 +41,7 @@ stats = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust tokio = { version = "0.2.25", features = ["full", "test-util"] } [dev-dependencies] +blobrepo = { version = "0.1.0", path = "../blobrepo" } fbinit-tokio-02 = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } fixtures = { version = "0.1.0", path = "../tests/fixtures" } maplit = "1.0" diff --git a/eden/mononoke/segmented_changelog/src/builder.rs b/eden/mononoke/segmented_changelog/src/builder.rs index c98c6f7369..7e7e04d6bb 100644 --- a/eden/mononoke/segmented_changelog/src/builder.rs +++ b/eden/mononoke/segmented_changelog/src/builder.rs @@ -6,33 +6,24 @@ */ use std::sync::Arc; -use std::time::Duration; -use anyhow::{format_err, Context, Result}; -use blobrepo::BlobRepo; +use anyhow::{Context, Result}; use blobstore::Blobstore; use bookmarks::{BookmarkName, Bookmarks}; -use bulkops::PublicChangesetBulkFetch; use changeset_fetcher::ChangesetFetcher; use context::CoreContext; use fbinit::FacebookInit; use metaconfig_types::SegmentedChangelogConfig; use mononoke_types::RepositoryId; use sql_construct::{SqlConstruct, SqlConstructFromMetadataDatabaseConfig}; -use sql_ext::replication::{NoReplicaLagMonitor, ReplicaLagMonitor}; +use sql_ext::replication::NoReplicaLagMonitor; use sql_ext::SqlConnections; use crate::iddag::IdDagSaveStore; -use crate::idmap::{ - CacheHandlers, CachedIdMap, ConcurrentMemIdMap, IdMap, IdMapFactory, SqlIdMap, - SqlIdMapVersionStore, -}; +use crate::idmap::{CacheHandlers, ConcurrentMemIdMap, IdMapFactory}; use crate::manager::SegmentedChangelogManager; -use crate::on_demand::{OnDemandUpdateSegmentedChangelog, PeriodicUpdateSegmentedChangelog}; -use crate::owned::OwnedSegmentedChangelog; +use crate::on_demand::OnDemandUpdateSegmentedChangelog; use crate::periodic_reload::PeriodicReloadSegmentedChangelog; -use crate::seeder::SegmentedChangelogSeeder; -use crate::types::IdMapVersion; use crate::version_store::SegmentedChangelogVersionStore; use crate::{DisabledSegmentedChangelog, InProcessIdDag, SegmentedChangelog}; @@ -114,352 +105,3 @@ pub async fn new_server_segmented_changelog( }; Ok(sc) } - -/// SegmentedChangelog instatiation helper. -/// It works together with SegmentedChangelogConfig and BlobRepoFactory to produce a -/// SegmentedChangelog. -/// Config options: -/// Enabled = false -> DisabledSegmentedChangelog -/// Enabled = true -/// update_algorithm = 'ondemand' -> OnDemandUpdateSegmentedChangelog -#[derive(Default, Clone)] -pub struct SegmentedChangelogBuilder { - connections: Option, - repo_id: Option, - idmap_version: Option, - replica_lag_monitor: Option>, - changeset_fetcher: Option>, - changeset_bulk_fetch: Option>, - blobstore: Option>, - bookmarks: Option>, - bookmark_name: Option, - cache_handlers: Option, - update_to_bookmark_period: Option, - reload_dag_period: Option, -} - -impl SegmentedChangelogBuilder { - pub fn new() -> Self { - Self::default() - } - - pub fn build_manager(mut self) -> Result { - Ok(SegmentedChangelogManager::new( - self.repo_id()?, - self.build_segmented_changelog_version_store()?, - self.build_iddag_save_store()?, - self.build_idmap_factory()?, - self.changeset_fetcher()?, - self.bookmarks()?, - self.bookmark_name()?, - self.update_to_bookmark_period.take(), - )) - } - - pub fn build_disabled(self) -> DisabledSegmentedChangelog { - DisabledSegmentedChangelog::new() - } - - pub fn build_on_demand_update(mut self) -> Result { - let owned = self.new_owned()?; - Ok(OnDemandUpdateSegmentedChangelog::new( - self.repo_id()?, - owned.iddag, - owned.idmap, - self.changeset_fetcher()?, - self.bookmarks()?, - self.bookmark_name()?, - )) - } - - pub async fn build_on_demand_update_start_from_save( - mut self, - ctx: &CoreContext, - ) -> Result { - let repo_id = self.repo_id()?; - let changeset_fetcher = self.changeset_fetcher()?; - let bookmarks = self.bookmarks()?; - let bookmark_name = self.bookmark_name()?; - let manager = self.build_manager()?; - let owned = manager.load_owned(ctx).await?; - Ok(OnDemandUpdateSegmentedChangelog::new( - repo_id, - owned.iddag, - owned.idmap, - changeset_fetcher, - bookmarks, - bookmark_name, - )) - } - - pub fn build_periodic_update( - mut self, - ctx: &CoreContext, - ) -> Result { - let owned = self.new_owned()?; - let dag = PeriodicUpdateSegmentedChangelog::new( - ctx, - Arc::new(OnDemandUpdateSegmentedChangelog::new( - self.repo_id()?, - owned.iddag, - owned.idmap, - self.changeset_fetcher()?, - self.bookmarks()?, - self.bookmark_name()?, - )), - self.update_to_bookmark_period()?, - ); - Ok(dag) - } - - pub async fn build_periodic_reload( - mut self, - ctx: &CoreContext, - ) -> Result { - let reload_dag_period = self.reload_dag_period()?; - let manager = self.build_manager()?; - PeriodicReloadSegmentedChangelog::start_from_manager(ctx, reload_dag_period, manager).await - } - - pub fn build_seeder(mut self) -> Result { - let seeder = SegmentedChangelogSeeder::new_from_built_dependencies( - self.build_sql_idmap_version_store()?, - self.changeset_bulk_fetch()?, - self.build_segmented_changelog_version_store()?, - self.build_iddag_save_store()?, - self.build_idmap_factory()?, - ); - Ok(seeder) - } - - pub fn with_sql_connections(mut self, connections: SegmentedChangelogSqlConnections) -> Self { - self.connections = Some(connections); - self - } - - pub fn with_repo_id(mut self, repo_id: RepositoryId) -> Self { - self.repo_id = Some(repo_id); - self - } - - pub fn with_idmap_version(mut self, version: u64) -> Self { - self.idmap_version = Some(IdMapVersion(version)); - self - } - - pub fn with_replica_lag_monitor( - mut self, - replica_lag_monitor: Arc, - ) -> Self { - self.replica_lag_monitor = Some(replica_lag_monitor); - self - } - - pub fn with_changeset_fetcher(mut self, changeset_fetcher: Arc) -> Self { - self.changeset_fetcher = Some(changeset_fetcher); - self - } - - pub fn with_changeset_bulk_fetch( - mut self, - changeset_bulk_fetch: Arc, - ) -> Self { - self.changeset_bulk_fetch = Some(changeset_bulk_fetch); - self - } - - pub fn with_blobstore(mut self, blobstore: Arc) -> Self { - self.blobstore = Some(blobstore); - self - } - - pub fn with_bookmarks(mut self, bookmarks: Arc) -> Self { - self.bookmarks = Some(bookmarks); - self - } - - pub fn with_bookmark_name(mut self, bookmark_name: BookmarkName) -> Self { - self.bookmark_name = Some(bookmark_name); - self - } - - pub fn with_caching( - mut self, - fb: FacebookInit, - cache_pool: cachelib::VolatileLruCachePool, - ) -> Self { - self.cache_handlers = Some(CacheHandlers::prod(fb, cache_pool)); - self - } - - pub fn with_cache_handlers(mut self, cache_handlers: CacheHandlers) -> Self { - self.cache_handlers = Some(cache_handlers); - self - } - - pub fn with_update_to_bookmark_period(mut self, period: Duration) -> Self { - self.update_to_bookmark_period = Some(period); - self - } - - pub fn with_reload_dag_period(mut self, period: Duration) -> Self { - self.reload_dag_period = Some(period); - self - } - - pub fn with_blobrepo(self, repo: &BlobRepo) -> Self { - let repo_id = repo.get_repoid(); - let changesets = repo.get_changesets_object(); - let phases = repo.get_phases(); - let bulk_fetch = PublicChangesetBulkFetch::new(repo_id, changesets, phases); - self.with_repo_id(repo_id) - .with_changeset_fetcher(repo.get_changeset_fetcher()) - .with_bookmarks(repo.bookmarks().clone()) - .with_blobstore(Arc::new(repo.get_blobstore())) - .with_changeset_bulk_fetch(Arc::new(bulk_fetch)) - } - - pub(crate) fn new_owned(&mut self) -> Result { - let iddag = InProcessIdDag::new_in_process(); - let idmap: Arc = Arc::new(ConcurrentMemIdMap::new()); - Ok(OwnedSegmentedChangelog::new(iddag, idmap)) - } - - #[allow(dead_code)] - pub(crate) fn build_idmap(&mut self) -> Result> { - let mut idmap: Arc = Arc::new(self.build_sql_idmap()?); - if let Some(cache_handlers) = self.cache_handlers.take() { - idmap = Arc::new(CachedIdMap::new( - idmap, - cache_handlers, - self.repo_id()?, - self.idmap_version(), - )); - } - Ok(idmap) - } - - #[allow(dead_code)] - pub(crate) fn build_sql_idmap(&mut self) -> Result { - let connections = self.connections_clone()?; - let replica_lag_monitor = self.replica_lag_monitor(); - let repo_id = self.repo_id()?; - let idmap_version = self.idmap_version(); - Ok(SqlIdMap::new( - connections, - replica_lag_monitor, - repo_id, - idmap_version, - )) - } - - pub(crate) fn build_idmap_factory(&mut self) -> Result { - let connections = self.connections_clone()?; - let replica_lag_monitor = self.replica_lag_monitor(); - let repo_id = self.repo_id()?; - let mut idmap_factory = IdMapFactory::new(connections, replica_lag_monitor, repo_id); - if let Some(cache_handlers) = self.cache_handlers.take() { - idmap_factory = idmap_factory.with_cache_handlers(cache_handlers); - } - Ok(idmap_factory) - } - - pub(crate) fn build_sql_idmap_version_store(&self) -> Result { - let connections = self.connections_clone()?; - let repo_id = self.repo_id()?; - Ok(SqlIdMapVersionStore::new(connections, repo_id)) - } - - pub(crate) fn build_segmented_changelog_version_store( - &self, - ) -> Result { - let connections = self.connections_clone()?; - let repo_id = self.repo_id()?; - Ok(SegmentedChangelogVersionStore::new(connections, repo_id)) - } - - pub(crate) fn build_iddag_save_store(&mut self) -> Result { - let blobstore = self.blobstore()?; - let repo_id = self.repo_id()?; - Ok(IdDagSaveStore::new(repo_id, blobstore)) - } - - fn repo_id(&self) -> Result { - self.repo_id.ok_or_else(|| { - format_err!("SegmentedChangelog cannot be built without RepositoryId being specified.") - }) - } - - fn idmap_version(&self) -> IdMapVersion { - self.idmap_version.unwrap_or_default() - } - - fn replica_lag_monitor(&mut self) -> Arc { - self.replica_lag_monitor - .take() - .unwrap_or_else(|| Arc::new(NoReplicaLagMonitor())) - } - - fn changeset_fetcher(&mut self) -> Result> { - self.changeset_fetcher.clone().ok_or_else(|| { - format_err!( - "SegmentedChangelog cannot be built without ChangesetFetcher being specified." - ) - }) - } - - fn changeset_bulk_fetch(&mut self) -> Result> { - self.changeset_bulk_fetch.take().ok_or_else(|| { - format_err!( - "SegmentedChangelog cannot be built without ChangesetBulkFetch being specified." - ) - }) - } - - fn connections_clone(&self) -> Result { - let connections = self.connections.as_ref().ok_or_else(|| { - format_err!( - "SegmentedChangelog cannot be built without SqlConnections being specified." - ) - })?; - Ok(connections.0.clone()) - } - - fn blobstore(&mut self) -> Result> { - self.blobstore.take().ok_or_else(|| { - format_err!("SegmentedChangelog cannot be built without Blobstore being specified.") - }) - } - - fn bookmarks(&mut self) -> Result> { - self.bookmarks.take().ok_or_else(|| { - format_err!("SegmentedChangelog cannot be built without Bookmarks being specified.") - }) - } - - fn bookmark_name(&mut self) -> Result { - if let Some(name) = &self.bookmark_name { - Ok(name.clone()) - } else { - BookmarkName::new("master") - } - } - - fn update_to_bookmark_period(&mut self) -> Result { - self.update_to_bookmark_period.take().ok_or_else(|| { - format_err!( - "SegmentedChangelog cannot be built without \ - update_to_bookmark_period being specified." - ) - }) - } - - fn reload_dag_period(&mut self) -> Result { - self.reload_dag_period.take().ok_or_else(|| { - format_err!( - "SegmentedChangelog cannot be built without \ - reload_dag_period being specified." - ) - }) - } -} diff --git a/eden/mononoke/segmented_changelog/src/lib.rs b/eden/mononoke/segmented_changelog/src/lib.rs index 356dd14993..24cec55948 100644 --- a/eden/mononoke/segmented_changelog/src/lib.rs +++ b/eden/mononoke/segmented_changelog/src/lib.rs @@ -42,9 +42,8 @@ pub use segmented_changelog_types::{ InProcessIdDag, Location, PreparedFlatSegments, SegmentedChangelog, StreamCloneData, Vertex, }; -pub use crate::builder::{ - new_server_segmented_changelog, SegmentedChangelogBuilder, SegmentedChangelogSqlConnections, -}; +pub use crate::builder::{new_server_segmented_changelog, SegmentedChangelogSqlConnections}; +pub use crate::seeder::SegmentedChangelogSeeder; pub use crate::tailer::SegmentedChangelogTailer; // public for benchmarking