mononoke/warm_bookmarks_cache: use a BookmarksSubscription to update

Summary:
Like it says in the title. This adds support for using a BookmarksSubscription
to update bookmarks. This is controlled by a killswitch.

One little downside here is that if the subscription falls too far behind, we
might have too much data to fetch from MySQL, so turning this on via a
killswitch isn't optimal.

In the next diff in this stack, I'll update the BookmarksSubscription to
support a max age on last refresh and have it rebuild from scratch when we hit
this. I still want to have a killswitch here for now though, because
re-creating a subscription is more expensive than our current bookmarks
fetching (it's 2 queries in a transaction instead of 1). The long-term
intention is to remove this, though.

Reviewed By: StanislavGlebik

Differential Revision: D28505642

fbshipit-source-id: eefd23bea095c9d0351795ab599beb51bfac373f
This commit is contained in:
Thomas Orozco 2021-05-20 07:06:27 -07:00 committed by Facebook GitHub Bot
parent 5a4469b13e
commit 99847de6bb
2 changed files with 61 additions and 25 deletions

View File

@ -7,6 +7,7 @@
#![deny(warnings)]
use std::borrow::Cow;
use std::collections::{HashMap, HashSet, VecDeque};
use std::convert::TryInto;
use std::ops::RangeBounds;
@ -227,6 +228,7 @@ impl WarmBookmarksCache {
BookmarksCoordinator::new(
bookmarks.clone(),
sub,
repo.clone(),
warmers.clone(),
bookmark_update_delay,
@ -507,6 +509,7 @@ pub async fn find_all_underived_and_latest_derived(
struct BookmarksCoordinator {
bookmarks: Arc<RwLock<HashMap<BookmarkName, (ChangesetId, BookmarkKind)>>>,
sub: Box<dyn BookmarksSubscription>,
repo: BlobRepo,
warmers: Arc<Vec<Warmer>>,
bookmark_update_delay: BookmarkUpdateDelay,
@ -516,12 +519,14 @@ struct BookmarksCoordinator {
impl BookmarksCoordinator {
fn new(
bookmarks: Arc<RwLock<HashMap<BookmarkName, (ChangesetId, BookmarkKind)>>>,
sub: Box<dyn BookmarksSubscription>,
repo: BlobRepo,
warmers: Arc<Vec<Warmer>>,
bookmark_update_delay: BookmarkUpdateDelay,
) -> Self {
Self {
bookmarks,
sub,
repo,
warmers,
bookmark_update_delay,
@ -529,27 +534,38 @@ impl BookmarksCoordinator {
}
}
async fn update(&self, ctx: &CoreContext) -> Result<(), Error> {
async fn update(&mut self, ctx: &CoreContext) -> Result<(), Error> {
// Report delay and remove finished updaters
report_delay_and_remove_finished_updaters(&ctx, &self.live_updaters, &self.repo.name());
let cur_bookmarks = self.bookmarks.with_read(|bookmarks| bookmarks.clone());
let new_bookmarks = self
.repo
.get_bonsai_publishing_bookmarks_maybe_stale(ctx.clone())
.map_ok(|(book, cs_id)| {
let kind = *book.kind();
(book.into_name(), (cs_id, kind))
})
.try_collect::<HashMap<_, _>>()
.await
.context("Error fetching bookmarks")?;
let new_bookmarks = if tunables().get_warm_bookmark_cache_disable_subscription() {
let books = self
.repo
.get_bonsai_publishing_bookmarks_maybe_stale(ctx.clone())
.map_ok(|(book, cs_id)| {
let kind = *book.kind();
(book.into_name(), (cs_id, kind))
})
.try_collect::<HashMap<_, _>>()
.await
.context("Error fetching bookmarks")?;
Cow::Owned(books)
} else {
self.sub
.refresh(ctx)
.await
.context("Error refreshing subscription")?;
Cow::Borrowed(self.sub.bookmarks())
};
let mut changed_bookmarks = vec![];
// Find bookmarks that were moved/created and spawn an updater
// for them
for (key, new_value) in &new_bookmarks {
for (key, new_value) in new_bookmarks.iter() {
let cur_value = cur_bookmarks.get(key);
if Some(new_value) != cur_value {
let book = Bookmark::new(key.clone(), new_value.1);
@ -633,7 +649,7 @@ impl BookmarksCoordinator {
}
// Loop that finds bookmarks that were modified and spawns separate bookmark updaters for them
pub fn spawn(self, ctx: CoreContext, terminate: oneshot::Receiver<()>) {
pub fn spawn(mut self, ctx: CoreContext, terminate: oneshot::Receiver<()>) {
let fut = async move {
info!(ctx.logger(), "Started warm bookmark cache updater");
let infinite_loop = async {
@ -1065,8 +1081,11 @@ mod tests {
.await?;
bookmark(&ctx, &repo, "master").set_to(master).await?;
let coordinator = BookmarksCoordinator::new(
let mut coordinator = BookmarksCoordinator::new(
bookmarks.clone(),
repo.bookmarks()
.create_subscription(&ctx, Freshness::MostRecent)
.await?,
repo.clone(),
warmers,
BookmarkUpdateDelay::Disallow,
@ -1075,14 +1094,14 @@ mod tests {
let master_book = BookmarkName::new("master")?;
update_and_wait_for_bookmark(
&ctx,
&coordinator,
&mut coordinator,
&master_book,
Some((master, BookmarkKind::PullDefaultPublishing)),
)
.await?;
bookmark(&ctx, &repo, "master").delete().await?;
update_and_wait_for_bookmark(&ctx, &coordinator, &master_book, None).await?;
update_and_wait_for_bookmark(&ctx, &mut coordinator, &master_book, None).await?;
Ok(())
}
@ -1144,12 +1163,14 @@ mod tests {
async fn update_and_wait_for_bookmark(
ctx: &CoreContext,
coordinator: &BookmarksCoordinator,
coordinator: &mut BookmarksCoordinator,
book: &BookmarkName,
expected_value: Option<(ChangesetId, BookmarkKind)>,
) -> Result<(), Error> {
coordinator.update(ctx).await?;
let coordinator = &coordinator;
wait(|| async move {
let val = coordinator
.bookmarks
@ -1240,8 +1261,11 @@ mod tests {
warmers.push(warmer);
let warmers = Arc::new(warmers);
let coordinator = BookmarksCoordinator::new(
let mut coordinator = BookmarksCoordinator::new(
bookmarks.clone(),
repo.bookmarks()
.create_subscription(&ctx, Freshness::MostRecent)
.await?,
repo.clone(),
warmers,
BookmarkUpdateDelay::Disallow,
@ -1250,7 +1274,7 @@ mod tests {
let master_book = BookmarkName::new("master")?;
update_and_wait_for_bookmark(
&ctx,
&coordinator,
&mut coordinator,
&master_book,
Some((master, BookmarkKind::PullDefaultPublishing)),
)
@ -1268,8 +1292,11 @@ mod tests {
warmers.push(create_derived_data_warmer::<RootUnodeManifestId>(&ctx));
let warmers = Arc::new(warmers);
let coordinator = BookmarksCoordinator::new(
let mut coordinator = BookmarksCoordinator::new(
bookmarks.clone(),
repo.bookmarks()
.create_subscription(&ctx, Freshness::MostRecent)
.await?,
repo.clone(),
warmers,
BookmarkUpdateDelay::Disallow,
@ -1277,7 +1304,7 @@ mod tests {
update_and_wait_for_bookmark(
&ctx,
&coordinator,
&mut coordinator,
&failing_book,
Some((failing_cs_id, BookmarkKind::PullDefaultPublishing)),
)
@ -1349,8 +1376,11 @@ mod tests {
.await?;
bookmark(&ctx, &repo, "master").set_to(master).await?;
let coordinator = BookmarksCoordinator::new(
let mut coordinator = BookmarksCoordinator::new(
bookmarks.clone(),
repo.bookmarks()
.create_subscription(&ctx, Freshness::MostRecent)
.await?,
repo.clone(),
warmers.clone(),
BookmarkUpdateDelay::Disallow,
@ -1406,8 +1436,11 @@ mod tests {
.create_publishing(new_cs_id)
.await?;
let coordinator = BookmarksCoordinator::new(
let mut coordinator = BookmarksCoordinator::new(
bookmarks.clone(),
repo.bookmarks()
.create_subscription(&ctx, Freshness::MostRecent)
.await?,
repo.clone(),
warmers,
BookmarkUpdateDelay::Disallow,
@ -1416,7 +1449,7 @@ mod tests {
let publishing_book = BookmarkName::new("publishing")?;
update_and_wait_for_bookmark(
&ctx,
&coordinator,
&mut coordinator,
&publishing_book,
Some((new_cs_id, BookmarkKind::Publishing)),
)
@ -1430,7 +1463,7 @@ mod tests {
update_and_wait_for_bookmark(
&ctx,
&coordinator,
&mut coordinator,
&publishing_book,
Some((new_cs_id, BookmarkKind::PullDefaultPublishing)),
)

View File

@ -70,6 +70,9 @@ pub struct MononokeTunables {
mutation_generate_for_draft: AtomicBool,
warm_bookmark_cache_delay: AtomicI64,
warm_bookmark_cache_poll_interval_ms: AtomicI64,
/// Don't read from the BookmarksSubscription when updating the WBC, and instead poll for the
/// entire list of bookmarks on every iteration.
warm_bookmark_cache_disable_subscription: AtomicBool,
max_scuba_msg_length: AtomicI64,
wishlist_read_qps: AtomicI64,
wishlist_write_qps: AtomicI64,