use the SegmentedChangelogManager in the test

Summary:
We use completely different codepath for testing, let's go through the same as
we go in prod.

Reviewed By: farnz

Differential Revision: D35408141

fbshipit-source-id: 05f3f026c0c15b7b460c554828089247f8680eaf
This commit is contained in:
Mateusz Kwapich 2022-04-14 02:16:25 -07:00 committed by Facebook GitHub Bot
parent 33d9d3dd25
commit 98d1c489ff
3 changed files with 77 additions and 31 deletions

View File

@ -22,7 +22,7 @@ use sql_ext::SqlConnections;
use crate::iddag::IdDagSaveStore;
use crate::idmap::{CacheHandlers, ConcurrentMemIdMap, IdMapFactory};
use crate::manager::SegmentedChangelogManager;
use crate::manager::{SegmentedChangelogManager, SegmentedChangelogType};
use crate::on_demand::OnDemandUpdateSegmentedChangelog;
use crate::periodic_reload::PeriodicReloadSegmentedChangelog;
use crate::version_store::SegmentedChangelogVersionStore;
@ -119,7 +119,9 @@ pub async fn new_server_segmented_changelog<'a>(
changeset_fetcher,
bookmarks,
seed_heads,
config.update_to_master_bookmark_period,
SegmentedChangelogType::OnDemand {
update_to_master_bookmark_period: config.update_to_master_bookmark_period,
},
Some(clone_hints),
);
let name = repo_identity.name().to_string();

View File

@ -29,6 +29,14 @@ use crate::{
segmented_changelog_delegate, CloneData, CloneHints, Location, SeedHead, SegmentedChangelog,
};
pub enum SegmentedChangelogType {
OnDemand {
update_to_master_bookmark_period: Option<Duration>,
},
#[cfg(test)]
Owned,
}
pub struct SegmentedChangelogManager {
repo_id: RepositoryId,
sc_version_store: SegmentedChangelogVersionStore,
@ -37,7 +45,7 @@ pub struct SegmentedChangelogManager {
changeset_fetcher: ArcChangesetFetcher,
bookmarks: Arc<dyn Bookmarks>,
seed_heads: Vec<SeedHead>,
update_to_master_bookmark_period: Option<Duration>,
segmented_changelog_type: SegmentedChangelogType,
clone_hints: Option<CloneHints>,
}
@ -50,7 +58,7 @@ impl SegmentedChangelogManager {
changeset_fetcher: ArcChangesetFetcher,
bookmarks: Arc<dyn Bookmarks>,
seed_heads: Vec<SeedHead>,
update_to_master_bookmark_period: Option<Duration>,
segmented_changelog_type: SegmentedChangelogType,
clone_hints: Option<CloneHints>,
) -> Self {
Self {
@ -61,7 +69,7 @@ impl SegmentedChangelogManager {
changeset_fetcher,
bookmarks,
seed_heads,
update_to_master_bookmark_period,
segmented_changelog_type,
clone_hints,
}
}
@ -74,12 +82,25 @@ impl SegmentedChangelogManager {
SegmentedChangelogVersion,
)> {
let monitored = async {
let (on_demand, sc_version) = self.load_ondemand_update(ctx).await?;
let asc: Arc<dyn SegmentedChangelog + Send + Sync> =
match self.update_to_master_bookmark_period {
None => on_demand,
Some(period) => {
Arc::new(on_demand.with_periodic_update_to_master_bookmark(ctx, period))
let (asc, sc_version): (Arc<dyn SegmentedChangelog + Send + Sync>, _) =
match self.segmented_changelog_type {
SegmentedChangelogType::OnDemand {
update_to_master_bookmark_period,
} => {
let (on_demand, sc_version) = self.load_ondemand_update(ctx).await?;
let on_demand: Arc<dyn SegmentedChangelog + Send + Sync> =
match update_to_master_bookmark_period {
None => on_demand,
Some(period) => Arc::new(
on_demand.with_periodic_update_to_master_bookmark(ctx, period),
),
};
(on_demand, sc_version)
}
#[cfg(test)]
SegmentedChangelogType::Owned => {
let (sc, sc_version) = self.load_owned(ctx).await?;
(Arc::new(sc), sc_version)
}
};
Ok((asc, sc_version))

View File

@ -38,13 +38,16 @@ use tunables::with_tunables_async;
use crate::builder::SegmentedChangelogSqlConnections;
use crate::iddag::IdDagSaveStore;
use crate::idmap::{CacheHandlers, ConcurrentMemIdMap, IdMap, IdMapFactory, SqlIdMap};
use crate::manager::{SegmentedChangelogManager, SegmentedChangelogType};
use crate::on_demand::OnDemandUpdateSegmentedChangelog;
use crate::owned::OwnedSegmentedChangelog;
use crate::periodic_reload::PeriodicReloadSegmentedChangelog;
use crate::tailer::SegmentedChangelogTailer;
use crate::types::{IdDagVersion, IdMapVersion, SegmentedChangelogVersion};
use crate::version_store::SegmentedChangelogVersionStore;
use crate::{InProcessIdDag, Location, SeedHead, SegmentedChangelog, SegmentedChangelogRef};
use crate::{
CloneHints, InProcessIdDag, Location, SeedHead, SegmentedChangelog, SegmentedChangelogRef,
};
#[async_trait::async_trait]
trait SegmentedChangelogExt {
@ -206,6 +209,36 @@ async fn load_owned(
Ok(OwnedSegmentedChangelog::new(iddag, idmap))
}
async fn get_manager(
blobrepo: &BlobRepo,
connections: &SegmentedChangelogSqlConnections,
seed_heads: Vec<SeedHead>,
segmented_changelog_type: SegmentedChangelogType,
) -> Result<SegmentedChangelogManager> {
let repo_id = blobrepo.get_repoid();
let blobstore = Arc::new(blobrepo.get_blobstore());
let sc_version_store = SegmentedChangelogVersionStore::new(connections.0.clone(), repo_id);
let iddag_save_store = IdDagSaveStore::new(repo_id, blobstore.clone());
let clone_hints = CloneHints::new(connections.0.clone(), repo_id, blobstore);
let idmap_factory = IdMapFactory::new(
connections.0.clone(),
Arc::new(NoReplicaLagMonitor()),
repo_id,
);
let manager = SegmentedChangelogManager::new(
repo_id,
sc_version_store,
iddag_save_store,
idmap_factory,
blobrepo.get_changeset_fetcher(),
Arc::clone(blobrepo.bookmarks()) as Arc<dyn Bookmarks>,
seed_heads,
segmented_changelog_type,
Some(clone_hints),
);
Ok(manager)
}
async fn validate_build_idmap(
ctx: CoreContext,
blobrepo: BlobRepo,
@ -836,6 +869,7 @@ async fn test_caching(fb: FacebookInit) -> Result<()> {
// It's easier to reason about cache hits and sets when the dag is already built
let head = resolve_cs_id(&ctx, &blobrepo, "79a13814c5ce7330173ec04d279bf95ab3f652fb").await?;
seed(&ctx, &blobrepo, &conns, head).await?;
let iddag = load_iddag(&ctx, &blobrepo, &conns).await?;
let sc_version = load_sc_version(&ctx, blobrepo.get_repoid(), &conns).await?;
let idmap = IdMapFactory::new(
@ -959,38 +993,27 @@ async fn test_periodic_reload(fb: FacebookInit) -> Result<()> {
let blobrepo = Arc::new(Linear::getrepo(fb).await);
let conns = SegmentedChangelogSqlConnections::with_sqlite_in_memory()?;
let load_fn = {
let ctx = ctx.clone();
let conns = conns.clone();
let blobrepo = blobrepo.clone();
move || {
let ctx = ctx.clone();
let conns = conns.clone();
let blobrepo = blobrepo.clone();
async move {
let asc: Arc<dyn SegmentedChangelog + Send + Sync> =
Arc::new(load_owned(&ctx, &blobrepo, &conns).await?);
Ok(Some(asc))
}
.boxed()
}
};
let start_hg_id = "607314ef579bd2407752361ba1b0c1729d08b281"; // commit 4
let start_cs_id = resolve_cs_id(&ctx, &blobrepo, start_hg_id).await?;
seed(&ctx, &blobrepo, &conns, start_cs_id).await?;
tokio::time::pause();
let sc = PeriodicReloadSegmentedChangelog::start(
let manager = get_manager(&blobrepo, &conns, vec![], SegmentedChangelogType::Owned).await?;
let sc = PeriodicReloadSegmentedChangelog::start_from_manager(
&ctx,
Duration::from_secs(10),
load_fn,
manager,
blobrepo.name().to_string(),
)
.await?;
assert_eq!(sc.head(&ctx).await?, start_cs_id);
// Try waiting for segmented changelog update without tailer running
tokio::time::advance(Duration::from_secs(15)).await;
sc.wait_for_update().await; // Update happens even if there's nothing new to load.
let tailer = new_tailer_for_tailing(&blobrepo, &conns).await?;
let _ = tailer.once(&ctx, false).await?;
tokio::time::advance(Duration::from_secs(15)).await;