segmented_changelog: remove SegmentedChangelogManager::save

Summary:
The manager was added as a high level abstraction for storing and loading a
SegmentedChangelog. It worked well when we had one configuration for
SegmentedChangelog. The problem now is that SegmentedChangelog has various
configurations. Storing and loading is an asymetric operation.

In contexts where we do storing we want to have used a specific configuration,
one that operates on an owned dag and has an IdMap that writes to the database.
Then, when running on the server we never store, our writes to the idmap are
in process only and the iddag is wrapped in layers that keep it up to date.

The manager would have to be too complicated to handle all these scenarios.
The solution here is to simplify the manager to cater to the server use case
and inline the logic for the saves where it is used (seeder and tailer).

Reviewed By: krallin

Differential Revision: D26921451

fbshipit-source-id: aedf4acf4bc8371a5d0b249f8bccd9447e85ae0a
This commit is contained in:
Stefan Filip 2021-03-10 12:12:55 -08:00 committed by Facebook GitHub Bot
parent b0c07b8206
commit e656047533
6 changed files with 105 additions and 79 deletions

View File

@ -55,7 +55,6 @@ pub struct SegmentedChangelogBuilder {
bookmarks: Option<Arc<dyn Bookmarks>>,
bookmark_name: Option<BookmarkName>,
cache_handlers: Option<CacheHandlers>,
with_in_memory_write_idmap: bool,
update_to_bookmark_period: Option<Duration>,
reload_dag_period: Option<Duration>,
}
@ -77,7 +76,6 @@ impl SqlConstruct for SegmentedChangelogBuilder {
bookmarks: None,
bookmark_name: None,
cache_handlers: None,
with_in_memory_write_idmap: false,
update_to_bookmark_period: None,
reload_dag_period: None,
}
@ -101,7 +99,6 @@ impl SegmentedChangelogBuilder {
sc_version_store,
iddag_save_store,
idmap_factory,
self.with_in_memory_write_idmap,
))
}
@ -123,7 +120,6 @@ impl SegmentedChangelogBuilder {
ctx: &CoreContext,
) -> Result<OnDemandUpdateSegmentedChangelog> {
let changeset_fetcher = self.changeset_fetcher()?;
self.with_in_memory_write_idmap = true;
let manager = self.build_manager()?;
let (_, owned) = manager.load(ctx).await?;
Ok(OnDemandUpdateSegmentedChangelog::from_owned(
@ -177,7 +173,9 @@ impl SegmentedChangelogBuilder {
self.idmap_version(),
idmap_version_store,
self.changeset_bulk_fetch()?,
self.build_manager()?,
self.build_segmented_changelog_version_store()?,
self.build_iddag_save_store()?,
self.build_idmap_factory()?,
);
Ok(seeder)
}
@ -188,7 +186,9 @@ impl SegmentedChangelogBuilder {
self.changeset_fetcher()?,
self.bookmarks()?,
self.bookmark_name()?,
self.build_manager()?,
self.build_segmented_changelog_version_store()?,
self.build_iddag_save_store()?,
self.build_idmap_factory()?,
);
Ok(tailer)
}

View File

@ -45,6 +45,13 @@ pub fn log_new_segmented_changelog_version(
repo_id: RepositoryId,
sc_version: SegmentedChangelogVersion,
) {
slog::info!(
ctx.logger(),
"repo {}: segmented changelog version saved, idmap_version: {}, iddag_version: {}",
repo_id,
sc_version.idmap_version,
sc_version.iddag_version,
);
MononokeScubaSampleBuilder::new(ctx.fb, SCUBA_TABLE)
.add_common_server_data()
.add("type", "segmented_changelog")

View File

@ -12,21 +12,19 @@ use std::time::Duration;
use anyhow::{format_err, Context, Result};
use arc_swap::ArcSwap;
use async_trait::async_trait;
use slog::{debug, info};
use tokio::sync::Notify;
use tokio::time::Instant;
use dag::{InProcessIdDag, Location};
use dag::Location;
use futures_ext::future::{spawn_controlled, ControlledHandle};
use context::CoreContext;
use mononoke_types::{ChangesetId, RepositoryId};
use crate::iddag::IdDagSaveStore;
use crate::idmap::{IdMap, IdMapFactory};
use crate::logging::log_new_segmented_changelog_version;
use crate::idmap::IdMapFactory;
use crate::owned::OwnedSegmentedChangelog;
use crate::types::{IdMapVersion, SegmentedChangelogVersion};
use crate::types::SegmentedChangelogVersion;
use crate::version_store::SegmentedChangelogVersionStore;
use crate::{segmented_changelog_delegate, CloneData, SegmentedChangelog, StreamCloneData};
@ -35,7 +33,6 @@ pub struct SegmentedChangelogManager {
sc_version_store: SegmentedChangelogVersionStore,
iddag_save_store: IdDagSaveStore,
idmap_factory: IdMapFactory,
with_in_memory_write_idmap: bool,
}
impl SegmentedChangelogManager {
@ -44,51 +41,15 @@ impl SegmentedChangelogManager {
sc_version_store: SegmentedChangelogVersionStore,
iddag_save_store: IdDagSaveStore,
idmap_factory: IdMapFactory,
with_in_memory_write_idmap: bool,
) -> Self {
Self {
repo_id,
sc_version_store,
iddag_save_store,
idmap_factory,
with_in_memory_write_idmap,
}
}
pub async fn save(
&self,
ctx: &CoreContext,
iddag: &InProcessIdDag,
idmap_version: IdMapVersion,
) -> Result<SegmentedChangelogVersion> {
// Save the IdDag
let iddag_version = self
.iddag_save_store
.save(&ctx, &iddag)
.await
.with_context(|| format!("repo {}: error saving iddag", self.repo_id))?;
// Update SegmentedChangelogVersion
let sc_version = SegmentedChangelogVersion::new(iddag_version, idmap_version);
self.sc_version_store
.set(&ctx, sc_version)
.await
.with_context(|| {
format!(
"repo {}: error updating segmented changelog version store",
self.repo_id
)
})?;
log_new_segmented_changelog_version(ctx, self.repo_id, sc_version);
info!(
ctx.logger(),
"repo {}: segmented changelog version saved, idmap_version: {}, iddag_version: {}",
self.repo_id,
idmap_version,
iddag_version,
);
Ok(sc_version)
}
pub async fn load(
&self,
ctx: &CoreContext,
@ -114,8 +75,8 @@ impl SegmentedChangelogManager {
.load(&ctx, sc_version.iddag_version)
.await
.with_context(|| format!("repo {}: failed to load iddag", self.repo_id))?;
let idmap = self.new_idmap(ctx, sc_version.idmap_version);
debug!(
let idmap = self.idmap_factory.for_server(ctx, sc_version.idmap_version);
slog::debug!(
ctx.logger(),
"segmented changelog dag successfully loaded - repo_id: {}, idmap_version: {}, \
iddag_version: {} ",
@ -126,14 +87,6 @@ impl SegmentedChangelogManager {
let owned = OwnedSegmentedChangelog::new(iddag, idmap);
Ok((sc_version, owned))
}
pub fn new_idmap(&self, ctx: &CoreContext, idmap_version: IdMapVersion) -> Arc<dyn IdMap> {
if self.with_in_memory_write_idmap {
self.idmap_factory.for_server(ctx, idmap_version)
} else {
self.idmap_factory.for_writer(ctx, idmap_version)
}
}
}
segmented_changelog_delegate!(SegmentedChangelogManager, |&self, ctx: &CoreContext| {

View File

@ -19,11 +19,13 @@ use changesets::ChangesetEntry;
use context::CoreContext;
use mononoke_types::ChangesetId;
use crate::iddag::IdDagSaveStore;
use crate::idmap::IdMapFactory;
use crate::idmap::SqlIdMapVersionStore;
use crate::manager::SegmentedChangelogManager;
use crate::owned::OwnedSegmentedChangelog;
use crate::types::IdMapVersion;
use crate::types::{IdMapVersion, SegmentedChangelogVersion};
use crate::update::StartState;
use crate::version_store::SegmentedChangelogVersionStore;
define_stats! {
prefix = "mononoke.segmented_changelog.seeder";
@ -34,7 +36,9 @@ pub struct SegmentedChangelogSeeder {
idmap_version: IdMapVersion,
idmap_version_store: SqlIdMapVersionStore,
changeset_bulk_fetch: Arc<PublicChangesetBulkFetch>,
manager: SegmentedChangelogManager,
sc_version_store: SegmentedChangelogVersionStore,
iddag_save_store: IdDagSaveStore,
idmap_factory: IdMapFactory,
}
impl SegmentedChangelogSeeder {
@ -42,13 +46,17 @@ impl SegmentedChangelogSeeder {
idmap_version: IdMapVersion,
idmap_version_store: SqlIdMapVersionStore,
changeset_bulk_fetch: Arc<PublicChangesetBulkFetch>,
manager: SegmentedChangelogManager,
sc_version_store: SegmentedChangelogVersionStore,
iddag_save_store: IdDagSaveStore,
idmap_factory: IdMapFactory,
) -> Self {
Self {
idmap_version,
idmap_version_store,
changeset_bulk_fetch,
manager,
sc_version_store,
iddag_save_store,
idmap_factory,
}
}
pub async fn run(&self, ctx: &CoreContext, head: ChangesetId) -> Result<()> {
@ -64,10 +72,18 @@ impl SegmentedChangelogSeeder {
ctx.logger(),
"finished building dag, head '{}' has assigned vertex '{}'", head, last_vertex
);
self.manager
.save(ctx, &owned.iddag, self.idmap_version)
// Save the IdDag
let iddag_version = self
.iddag_save_store
.save(&ctx, &owned.iddag)
.await
.context("failed to save dag")?;
.context("error saving iddag")?;
// Update SegmentedChangelogVersion
let sc_version = SegmentedChangelogVersion::new(iddag_version, self.idmap_version);
self.sc_version_store
.set(&ctx, sc_version)
.await
.context("error updating segmented changelog version store")?;
// Update IdMapVersion
self.idmap_version_store
.set(&ctx, self.idmap_version)
@ -103,7 +119,7 @@ impl SegmentedChangelogSeeder {
}
let low_vertex = dag::Group::MASTER.min_id();
let idmap = self.manager.new_idmap(ctx, self.idmap_version);
let idmap = self.idmap_factory.for_writer(ctx, self.idmap_version);
let mut owned = OwnedSegmentedChangelog::new(InProcessIdDag::new_in_process(), idmap);
let last_vertex = crate::update::build(
ctx,

View File

@ -20,9 +20,12 @@ use changeset_fetcher::ChangesetFetcher;
use context::CoreContext;
use mononoke_types::RepositoryId;
use crate::manager::SegmentedChangelogManager;
use crate::iddag::IdDagSaveStore;
use crate::idmap::IdMapFactory;
use crate::owned::OwnedSegmentedChangelog;
use crate::types::SegmentedChangelogVersion;
use crate::update::build_incremental;
use crate::version_store::SegmentedChangelogVersionStore;
define_stats! {
prefix = "mononoke.segmented_changelog.update";
@ -45,7 +48,9 @@ pub struct SegmentedChangelogTailer {
changeset_fetcher: Arc<dyn ChangesetFetcher>,
bookmarks: Arc<dyn Bookmarks>,
bookmark_name: BookmarkName,
manager: SegmentedChangelogManager,
sc_version_store: SegmentedChangelogVersionStore,
iddag_save_store: IdDagSaveStore,
idmap_factory: IdMapFactory,
}
impl SegmentedChangelogTailer {
@ -54,14 +59,18 @@ impl SegmentedChangelogTailer {
changeset_fetcher: Arc<dyn ChangesetFetcher>,
bookmarks: Arc<dyn Bookmarks>,
bookmark_name: BookmarkName,
manager: SegmentedChangelogManager,
sc_version_store: SegmentedChangelogVersionStore,
iddag_save_store: IdDagSaveStore,
idmap_factory: IdMapFactory,
) -> Self {
Self {
repo_id,
changeset_fetcher,
bookmarks,
bookmark_name,
manager,
sc_version_store,
iddag_save_store,
idmap_factory,
}
}
@ -107,11 +116,37 @@ impl SegmentedChangelogTailer {
"repo {}: starting incremental update to segmented changelog", self.repo_id,
);
let (sc_version, mut owned) = self
.manager
.load(&ctx)
let sc_version = self
.sc_version_store
.get(&ctx)
.await
.context("failed to load base segmented changelog")?;
.with_context(|| {
format!(
"repo {}: error loading segmented changelog version",
self.repo_id
)
})?
.ok_or_else(|| {
format_err!(
"repo {}: segmented changelog metadata not found, maybe repo is not seeded",
self.repo_id
)
})?;
let iddag = self
.iddag_save_store
.load(&ctx, sc_version.iddag_version)
.await
.with_context(|| format!("repo {}: failed to load iddag", self.repo_id))?;
let idmap = self.idmap_factory.for_writer(ctx, sc_version.idmap_version);
let mut owned = OwnedSegmentedChangelog::new(iddag, idmap);
debug!(
ctx.logger(),
"segmented changelog dag successfully loaded - repo_id: {}, idmap_version: {}, \
iddag_version: {} ",
self.repo_id,
sc_version.idmap_version,
sc_version.iddag_version,
);
let head = self
.bookmarks
@ -153,11 +188,24 @@ impl SegmentedChangelogTailer {
new_iddag.build_segments_volatile(head_vertex, &get_parents)?;
info!(ctx.logger(), "repo {}: IdDag rebuilt", self.repo_id);
// Save Segmented Changelog
self.manager
.save(&ctx, &new_iddag, sc_version.idmap_version)
// Save the IdDag
let iddag_version = self
.iddag_save_store
.save(&ctx, &new_iddag)
.await
.context("failed to save updated segmented changelog")?;
.with_context(|| format!("repo {}: error saving iddag", self.repo_id))?;
// Update SegmentedChangelogVersion
let sc_version = SegmentedChangelogVersion::new(iddag_version, sc_version.idmap_version);
self.sc_version_store
.set(&ctx, sc_version)
.await
.with_context(|| {
format!(
"repo {}: error updating segmented changelog version store",
self.repo_id
)
})?;
info!(
ctx.logger(),

View File

@ -14,6 +14,7 @@ use stats::prelude::*;
use context::{CoreContext, PerfCounterType};
use mononoke_types::RepositoryId;
use crate::logging::log_new_segmented_changelog_version;
use crate::types::{IdDagVersion, IdMapVersion, SegmentedChangelogVersion};
define_stats! {
@ -52,6 +53,7 @@ impl SegmentedChangelogVersionStore {
)
.await
.context("inserting segmented changelog version")?;
log_new_segmented_changelog_version(ctx, self.repo_id, version);
Ok(())
}