diff --git a/eden/mononoke/bookmarks/warm_bookmarks_cache/lib.rs b/eden/mononoke/bookmarks/warm_bookmarks_cache/lib.rs index b2a29e82fa..13cab7bcb6 100644 --- a/eden/mononoke/bookmarks/warm_bookmarks_cache/lib.rs +++ b/eden/mononoke/bookmarks/warm_bookmarks_cache/lib.rs @@ -7,6 +7,7 @@ #![deny(warnings)] +use std::borrow::Cow; use std::collections::{HashMap, HashSet, VecDeque}; use std::convert::TryInto; use std::ops::RangeBounds; @@ -227,6 +228,7 @@ impl WarmBookmarksCache { BookmarksCoordinator::new( bookmarks.clone(), + sub, repo.clone(), warmers.clone(), bookmark_update_delay, @@ -507,6 +509,7 @@ pub async fn find_all_underived_and_latest_derived( struct BookmarksCoordinator { bookmarks: Arc>>, + sub: Box, repo: BlobRepo, warmers: Arc>, bookmark_update_delay: BookmarkUpdateDelay, @@ -516,12 +519,14 @@ struct BookmarksCoordinator { impl BookmarksCoordinator { fn new( bookmarks: Arc>>, + sub: Box, repo: BlobRepo, warmers: Arc>, bookmark_update_delay: BookmarkUpdateDelay, ) -> Self { Self { bookmarks, + sub, repo, warmers, bookmark_update_delay, @@ -529,27 +534,38 @@ impl BookmarksCoordinator { } } - async fn update(&self, ctx: &CoreContext) -> Result<(), Error> { + async fn update(&mut self, ctx: &CoreContext) -> Result<(), Error> { // Report delay and remove finished updaters report_delay_and_remove_finished_updaters(&ctx, &self.live_updaters, &self.repo.name()); let cur_bookmarks = self.bookmarks.with_read(|bookmarks| bookmarks.clone()); - let new_bookmarks = self - .repo - .get_bonsai_publishing_bookmarks_maybe_stale(ctx.clone()) - .map_ok(|(book, cs_id)| { - let kind = *book.kind(); - (book.into_name(), (cs_id, kind)) - }) - .try_collect::>() - .await - .context("Error fetching bookmarks")?; + let new_bookmarks = if tunables().get_warm_bookmark_cache_disable_subscription() { + let books = self + .repo + .get_bonsai_publishing_bookmarks_maybe_stale(ctx.clone()) + .map_ok(|(book, cs_id)| { + let kind = *book.kind(); + (book.into_name(), (cs_id, kind)) + }) + .try_collect::>() + .await + .context("Error fetching bookmarks")?; + + Cow::Owned(books) + } else { + self.sub + .refresh(ctx) + .await + .context("Error refreshing subscription")?; + + Cow::Borrowed(self.sub.bookmarks()) + }; let mut changed_bookmarks = vec![]; // Find bookmarks that were moved/created and spawn an updater // for them - for (key, new_value) in &new_bookmarks { + for (key, new_value) in new_bookmarks.iter() { let cur_value = cur_bookmarks.get(key); if Some(new_value) != cur_value { let book = Bookmark::new(key.clone(), new_value.1); @@ -633,7 +649,7 @@ impl BookmarksCoordinator { } // Loop that finds bookmarks that were modified and spawns separate bookmark updaters for them - pub fn spawn(self, ctx: CoreContext, terminate: oneshot::Receiver<()>) { + pub fn spawn(mut self, ctx: CoreContext, terminate: oneshot::Receiver<()>) { let fut = async move { info!(ctx.logger(), "Started warm bookmark cache updater"); let infinite_loop = async { @@ -1065,8 +1081,11 @@ mod tests { .await?; bookmark(&ctx, &repo, "master").set_to(master).await?; - let coordinator = BookmarksCoordinator::new( + let mut coordinator = BookmarksCoordinator::new( bookmarks.clone(), + repo.bookmarks() + .create_subscription(&ctx, Freshness::MostRecent) + .await?, repo.clone(), warmers, BookmarkUpdateDelay::Disallow, @@ -1075,14 +1094,14 @@ mod tests { let master_book = BookmarkName::new("master")?; update_and_wait_for_bookmark( &ctx, - &coordinator, + &mut coordinator, &master_book, Some((master, BookmarkKind::PullDefaultPublishing)), ) .await?; bookmark(&ctx, &repo, "master").delete().await?; - update_and_wait_for_bookmark(&ctx, &coordinator, &master_book, None).await?; + update_and_wait_for_bookmark(&ctx, &mut coordinator, &master_book, None).await?; Ok(()) } @@ -1144,12 +1163,14 @@ mod tests { async fn update_and_wait_for_bookmark( ctx: &CoreContext, - coordinator: &BookmarksCoordinator, + coordinator: &mut BookmarksCoordinator, book: &BookmarkName, expected_value: Option<(ChangesetId, BookmarkKind)>, ) -> Result<(), Error> { coordinator.update(ctx).await?; + let coordinator = &coordinator; + wait(|| async move { let val = coordinator .bookmarks @@ -1240,8 +1261,11 @@ mod tests { warmers.push(warmer); let warmers = Arc::new(warmers); - let coordinator = BookmarksCoordinator::new( + let mut coordinator = BookmarksCoordinator::new( bookmarks.clone(), + repo.bookmarks() + .create_subscription(&ctx, Freshness::MostRecent) + .await?, repo.clone(), warmers, BookmarkUpdateDelay::Disallow, @@ -1250,7 +1274,7 @@ mod tests { let master_book = BookmarkName::new("master")?; update_and_wait_for_bookmark( &ctx, - &coordinator, + &mut coordinator, &master_book, Some((master, BookmarkKind::PullDefaultPublishing)), ) @@ -1268,8 +1292,11 @@ mod tests { warmers.push(create_derived_data_warmer::(&ctx)); let warmers = Arc::new(warmers); - let coordinator = BookmarksCoordinator::new( + let mut coordinator = BookmarksCoordinator::new( bookmarks.clone(), + repo.bookmarks() + .create_subscription(&ctx, Freshness::MostRecent) + .await?, repo.clone(), warmers, BookmarkUpdateDelay::Disallow, @@ -1277,7 +1304,7 @@ mod tests { update_and_wait_for_bookmark( &ctx, - &coordinator, + &mut coordinator, &failing_book, Some((failing_cs_id, BookmarkKind::PullDefaultPublishing)), ) @@ -1349,8 +1376,11 @@ mod tests { .await?; bookmark(&ctx, &repo, "master").set_to(master).await?; - let coordinator = BookmarksCoordinator::new( + let mut coordinator = BookmarksCoordinator::new( bookmarks.clone(), + repo.bookmarks() + .create_subscription(&ctx, Freshness::MostRecent) + .await?, repo.clone(), warmers.clone(), BookmarkUpdateDelay::Disallow, @@ -1406,8 +1436,11 @@ mod tests { .create_publishing(new_cs_id) .await?; - let coordinator = BookmarksCoordinator::new( + let mut coordinator = BookmarksCoordinator::new( bookmarks.clone(), + repo.bookmarks() + .create_subscription(&ctx, Freshness::MostRecent) + .await?, repo.clone(), warmers, BookmarkUpdateDelay::Disallow, @@ -1416,7 +1449,7 @@ mod tests { let publishing_book = BookmarkName::new("publishing")?; update_and_wait_for_bookmark( &ctx, - &coordinator, + &mut coordinator, &publishing_book, Some((new_cs_id, BookmarkKind::Publishing)), ) @@ -1430,7 +1463,7 @@ mod tests { update_and_wait_for_bookmark( &ctx, - &coordinator, + &mut coordinator, &publishing_book, Some((new_cs_id, BookmarkKind::PullDefaultPublishing)), ) diff --git a/eden/mononoke/tunables/src/lib.rs b/eden/mononoke/tunables/src/lib.rs index 2c11a23e9b..e0e210b22f 100644 --- a/eden/mononoke/tunables/src/lib.rs +++ b/eden/mononoke/tunables/src/lib.rs @@ -70,6 +70,9 @@ pub struct MononokeTunables { mutation_generate_for_draft: AtomicBool, warm_bookmark_cache_delay: AtomicI64, warm_bookmark_cache_poll_interval_ms: AtomicI64, + /// Don't read from the BookmarksSubscription when updating the WBC, and instead poll for the + /// entire list of bookmarks on every iteration. + warm_bookmark_cache_disable_subscription: AtomicBool, max_scuba_msg_length: AtomicI64, wishlist_read_qps: AtomicI64, wishlist_write_qps: AtomicI64,