mirror of
https://github.com/facebook/sapling.git
synced 2024-10-10 00:45:18 +03:00
mononoke/bookmarks: age out BookmarksSubscription if it's too old
Summary: We're expecting this to be refreshed every few seconds. If that does not happen, there's a point at which querying the log is just not efficient anymore, and if we get there we should just rebuild from scratch. This does that. Reviewed By: farnz Differential Revision: D28505644 fbshipit-source-id: 18e6cec8d8534de742781e658005a09301d3aa5f
This commit is contained in:
parent
99847de6bb
commit
5af44dad8e
@ -20,10 +20,12 @@ futures = { version = "0.3.13", features = ["async-await", "compat"] }
|
||||
futures_watchdog = { version = "0.1.0", path = "../../common/futures_watchdog" }
|
||||
mononoke_types = { version = "0.1.0", path = "../../mononoke_types" }
|
||||
rand = { version = "0.7", features = ["small_rng"] }
|
||||
slog = { version = "2.5", features = ["max_level_trace"] }
|
||||
sql = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
sql_construct = { version = "0.1.0", path = "../../common/sql_construct" }
|
||||
sql_ext = { version = "0.1.0", path = "../../common/rust/sql_ext" }
|
||||
stats = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
|
||||
tunables = { version = "0.1.0", path = "../../tunables" }
|
||||
|
||||
[dev-dependencies]
|
||||
ascii = "1.0"
|
||||
|
@ -12,17 +12,30 @@ use context::CoreContext;
|
||||
use mononoke_types::ChangesetId;
|
||||
use mononoke_types::RepositoryId;
|
||||
use rand::Rng;
|
||||
use slog::warn;
|
||||
use sql::queries;
|
||||
use stats::prelude::*;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::TryInto;
|
||||
use std::time::{Duration, Instant};
|
||||
use tunables::tunables;
|
||||
|
||||
use crate::store::{GetLargestLogId, SelectAllUnordered, SqlBookmarks};
|
||||
|
||||
define_stats! {
|
||||
prefix = "mononoke.dbbookmarks.subscription";
|
||||
aged_out: timeseries(Sum),
|
||||
}
|
||||
|
||||
const DEFAULT_SUBSCRIPTION_MAX_AGE_MS: u64 = 600_000; // 10 minutes.
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SqlBookmarksSubscription {
|
||||
sql_bookmarks: SqlBookmarks,
|
||||
freshness: Freshness,
|
||||
log_id: u64,
|
||||
bookmarks: HashMap<BookmarkName, (ChangesetId, BookmarkKind)>,
|
||||
last_refresh: Instant,
|
||||
}
|
||||
|
||||
impl SqlBookmarksSubscription {
|
||||
@ -80,13 +93,38 @@ impl SqlBookmarksSubscription {
|
||||
freshness,
|
||||
log_id,
|
||||
bookmarks,
|
||||
last_refresh: Instant::now(),
|
||||
})
|
||||
}
|
||||
|
||||
fn has_aged_out(&self) -> bool {
|
||||
let max_age_ms = match tunables().get_bookmark_subscription_max_age_ms().try_into() {
|
||||
Ok(duration) if duration > 0 => duration,
|
||||
_ => DEFAULT_SUBSCRIPTION_MAX_AGE_MS,
|
||||
};
|
||||
|
||||
self.last_refresh.elapsed() > Duration::from_millis(max_age_ms)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl BookmarksSubscription for SqlBookmarksSubscription {
|
||||
async fn refresh(&mut self, ctx: &CoreContext) -> Result<()> {
|
||||
if self.has_aged_out() {
|
||||
warn!(
|
||||
ctx.logger(),
|
||||
"BookmarksSubscription has aged out! Last refresh was {:?} ago.",
|
||||
self.last_refresh.elapsed()
|
||||
);
|
||||
|
||||
STATS::aged_out.add_value(1);
|
||||
|
||||
*self = Self::create(ctx, self.sql_bookmarks.clone(), self.freshness)
|
||||
.await
|
||||
.context("Failed to re-initialize expired BookmarksSubscription")?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let conn = self.sql_bookmarks.connection(ctx, self.freshness);
|
||||
|
||||
let changes =
|
||||
@ -131,6 +169,8 @@ impl BookmarksSubscription for SqlBookmarksSubscription {
|
||||
self.log_id = max_log_id;
|
||||
}
|
||||
|
||||
self.last_refresh = Instant::now();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -155,3 +195,52 @@ queries! {
|
||||
"
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
use fbinit::FacebookInit;
|
||||
use maplit::hashmap;
|
||||
use mononoke_types_mocks::{changesetid::ONES_CSID, repo::REPO_ZERO};
|
||||
use sql_construct::SqlConstruct;
|
||||
|
||||
use crate::builder::SqlBookmarksBuilder;
|
||||
|
||||
#[fbinit::test]
|
||||
async fn test_age_out(fb: FacebookInit) -> Result<()> {
|
||||
let ctx = CoreContext::test_mock(fb);
|
||||
let bookmarks = SqlBookmarksBuilder::with_sqlite_in_memory()?.with_repo_id(REPO_ZERO);
|
||||
let conn = bookmarks.connections.write_connection.clone();
|
||||
|
||||
let mut sub =
|
||||
SqlBookmarksSubscription::create(&ctx, bookmarks.clone(), Freshness::MostRecent)
|
||||
.await?;
|
||||
|
||||
// Insert a bookmark, but without going throguh the log. This won't happen in prod.
|
||||
// However, for the purposes of this test, it makes the update invisible to the
|
||||
// subscription, and only a full refresh will find it.
|
||||
let book = BookmarkName::new("book")?;
|
||||
let rows = vec![(
|
||||
&bookmarks.repo_id,
|
||||
&book,
|
||||
&ONES_CSID,
|
||||
&BookmarkKind::Publishing,
|
||||
)];
|
||||
crate::transaction::insert_bookmarks(&conn, rows).await?;
|
||||
|
||||
sub.refresh(&ctx).await?;
|
||||
assert_eq!(*sub.bookmarks(), hashmap! {});
|
||||
|
||||
sub.last_refresh =
|
||||
Instant::now() - Duration::from_millis(DEFAULT_SUBSCRIPTION_MAX_AGE_MS + 1);
|
||||
|
||||
sub.refresh(&ctx).await?;
|
||||
assert_eq!(
|
||||
*sub.bookmarks(),
|
||||
hashmap! { book => (ONES_CSID, BookmarkKind::Publishing)}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -73,6 +73,8 @@ pub struct MononokeTunables {
|
||||
/// Don't read from the BookmarksSubscription when updating the WBC, and instead poll for the
|
||||
/// entire list of bookmarks on every iteration.
|
||||
warm_bookmark_cache_disable_subscription: AtomicBool,
|
||||
/// Maximum age of bookmarks subscriptions.
|
||||
bookmark_subscription_max_age_ms: AtomicI64,
|
||||
max_scuba_msg_length: AtomicI64,
|
||||
wishlist_read_qps: AtomicI64,
|
||||
wishlist_write_qps: AtomicI64,
|
||||
|
Loading…
Reference in New Issue
Block a user