Give the blobstore healer a way to cope with temporary falls in MySQL capacity

Summary:
The query we use to select blobs to heal is naturally expensive, due to the use of a subquery. This means that finding the perfect queue limit is hard, and we depend on task restarts to handle brief overload of MySQL.

Give us a fast fall in batch size (halve on each failure), and slow climb back (10% climb on each success), and a random delay after each failure before retrying.

Reviewed By: StanislavGlebik

Differential Revision: D23028518

fbshipit-source-id: f2909fe792280f81d604be99fabb8b714c1e6999
This commit is contained in:
Simon Farnsworth 2020-08-10 15:22:22 -07:00 committed by Facebook GitHub Bot
parent c5ce8557b5
commit 3086b241c6

View File

@ -16,6 +16,7 @@ use futures::{
use itertools::{Either, Itertools};
use metaconfig_types::{BlobstoreId, MultiplexId};
use mononoke_types::{BlobstoreBytes, DateTime};
use rand::{thread_rng, Rng};
use scuba_ext::ScubaSampleBuilderExt;
use slog::{info, warn};
use std::{
@ -23,14 +24,22 @@ use std::{
future::Future,
iter::Sum,
ops::Add,
sync::Arc,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
#[cfg(test)]
mod tests;
const MIN_FETCH_FAILURE_DELAY: Duration = Duration::from_millis(1);
const MAX_FETCH_FAILURE_DELAY: Duration = Duration::from_millis(100);
pub struct Healer {
blobstore_sync_queue_limit: usize,
current_fetch_size: AtomicUsize,
heal_concurrency: usize,
sync_queue: Arc<dyn BlobstoreSyncQueue>,
blobstores: Arc<HashMap<BlobstoreId, Arc<dyn Blobstore>>>,
@ -51,6 +60,7 @@ impl Healer {
) -> Self {
Self {
blobstore_sync_queue_limit,
current_fetch_size: AtomicUsize::new(blobstore_sync_queue_limit),
heal_concurrency,
sync_queue,
blobstores,
@ -60,27 +70,70 @@ impl Healer {
}
}
async fn fetch_entries(
&self,
ctx: &CoreContext,
healing_deadline: DateTime,
) -> Result<(usize, Vec<BlobstoreSyncQueueEntry>)> {
let mut fetch_size = self.current_fetch_size.load(Ordering::Relaxed);
loop {
match self
.sync_queue
.iter(
ctx,
self.blobstore_key_like.as_ref(),
self.multiplex_id,
healing_deadline.clone(),
fetch_size,
)
.await
{
Ok(queue_entries) => {
// Success. Update fetch size for next loop
let new_fetch_size =
if fetch_size == self.current_fetch_size.load(Ordering::Relaxed) {
// Fetch size didn't change during the loop, which implies that we succeeded
// on the first attempt (since all failures decrease it) - increase it for next
// time if it's not yet at the limit. Growth is at least 1 each loop
// so that if fetch_size / 10 == 0, we still climb back
self.blobstore_sync_queue_limit
.min(fetch_size + fetch_size / 10 + 1)
} else {
fetch_size
};
self.current_fetch_size
.store(new_fetch_size, Ordering::Relaxed);
return Ok((fetch_size, queue_entries));
}
Err(e) => {
// Error, so fall in size fast
fetch_size = fetch_size / 2;
if fetch_size < 1 {
return Err(e);
}
let delay =
thread_rng().gen_range(MIN_FETCH_FAILURE_DELAY, MAX_FETCH_FAILURE_DELAY);
tokio::time::delay_for(delay).await;
}
}
}
}
/// Heal one batch of entries. It selects a set of entries which are not too young (bounded
/// by healing_deadline) up to `blobstore_sync_queue_limit` at once.
/// Returns a tuple:
/// - first item indicates whether a full batch was fetcehd
/// - second item shows how many rows were deleted from the DB
pub async fn heal(&self, ctx: &CoreContext, healing_deadline: DateTime) -> Result<(bool, u64)> {
let max_batch_size = self.blobstore_sync_queue_limit;
let heal_concurrency = self.heal_concurrency;
let drain_only = self.drain_only;
let multiplex_id = self.multiplex_id;
let queue_entries = self
.sync_queue
.iter(
ctx,
self.blobstore_key_like.as_ref(),
self.multiplex_id,
healing_deadline.clone(),
self.blobstore_sync_queue_limit,
)
.await?;
let (max_batch_size, queue_entries) =
self.fetch_entries(ctx, healing_deadline.clone()).await?;
let entries = queue_entries
.iter()
.map(|e| format!("{:?}", e))