mononoke: blobstore_healer performance improvements

Summary:
blobstore_healer performance improvements

* speed up delete by using bulk delete
* speed up rep lag check by only doing it once if lag within bounds
* log some stats from each healer chunk run to help diagnose issues
* dont log the full storage config on each log line ( this was making it hard to read the logs )

Reviewed By: StanislavGlebik

Differential Revision: D17093809

fbshipit-source-id: 34299ad1835c5548b9502ad9d5a3e18eff1ab95b
This commit is contained in:
Alex Hornby 2019-08-29 07:58:32 -07:00 committed by Facebook Github Bot
parent 774399c6b4
commit 0dcf6e695a
3 changed files with 119 additions and 42 deletions

View File

@ -17,6 +17,7 @@ use mononoke_types::{DateTime, Timestamp};
use sql::{queries, Connection};
pub use sql_ext::SqlConstructors;
use stats::{define_stats, Timeseries};
use std::iter::IntoIterator;
use std::sync::Arc;
define_stats! {
@ -127,10 +128,9 @@ queries! {
VALUES {values}"
}
write DeleteEntry(id: u64) {
write DeleteEntries(>list ids: u64) {
none,
"DELETE FROM blobstore_sync_queue
WHERE id = {id}"
"DELETE FROM blobstore_sync_queue WHERE id in {ids}"
}
read GetRangeOfEntries(older_than: Timestamp, limit: usize) -> (
@ -378,11 +378,9 @@ impl BlobstoreSyncQueue for SqlBlobstoreSyncQueue {
ids.into_future()
.and_then({
cloned!(self.write_connection);
move |ids| {
future::join_all(ids.into_iter().map({
cloned!(write_connection);
move |id| DeleteEntry::query(&write_connection, &id)
}))
move |ids: Vec<u64>| {
let ids: Vec<&u64> = ids.iter().collect();
DeleteEntries::query(&write_connection, &ids[..])
}
})
.map(|_| ())

View File

@ -18,6 +18,8 @@ use metaconfig_types::BlobstoreId;
use mononoke_types::{BlobstoreBytes, DateTime};
use slog::{info, warn, Logger};
use std::collections::{HashMap, HashSet};
use std::iter::Sum;
use std::ops::Add;
use std::sync::Arc;
lazy_static! {
@ -80,26 +82,69 @@ impl Healer {
let entries: Vec<_> = entries.collect();
let heal_opt =
heal_blob(ctx, sync_queue, blobstores, healing_deadline, key, &entries);
heal_opt.map(|fut| fut.map(|()| entries))
heal_opt.map(|fut| fut.map(|heal_stats| (heal_stats, entries)))
})
.collect();
if healing_futures.len() == 0 {
info!(logger, "All caught up, nothing to do");
return futures::future::ok(()).left_future();
}
info!(
logger,
"Found {} blobs to be healed... Doing it",
healing_futures.len()
);
futures::stream::futures_unordered(healing_futures)
.collect()
.and_then(move |cleaned_entries: Vec<Vec<BlobstoreSyncQueueEntry>>| {
let cleaned = cleaned_entries.into_iter().flatten().collect();
cleanup_after_healing(ctx, sync_queue, cleaned)
})
.and_then(
move |heal_res: Vec<(HealStats, Vec<BlobstoreSyncQueueEntry>)>| {
let (chunk_stats, processed_entries): (Vec<_>, Vec<_>) =
heal_res.into_iter().unzip();
let summary_stats: HealStats = chunk_stats.into_iter().sum();
info!(
logger,
"For {} blobs did {:?}",
processed_entries.len(),
summary_stats
);
let entries_to_remove =
processed_entries.into_iter().flatten().collect();
cleanup_after_healing(ctx, sync_queue, entries_to_remove)
},
)
.right_future()
})
}
}
#[derive(Default, Debug, PartialEq)]
struct HealStats {
queue_add: usize,
queue_del: usize,
put_success: usize,
put_failure: usize,
}
impl Add for HealStats {
type Output = Self;
fn add(self, other: Self) -> Self {
Self {
queue_add: self.queue_add + other.queue_add,
queue_del: self.queue_del + other.queue_del,
put_success: self.put_success + other.put_success,
put_failure: self.put_failure + other.put_failure,
}
}
}
impl Sum for HealStats {
fn sum<I: Iterator<Item = HealStats>>(iter: I) -> HealStats {
iter.fold(Default::default(), Add::add)
}
}
/// Heal an individual blob. The `entries` are the blobstores which have successfully stored
/// this blob; we need to replicate them onto the remaining `blobstores`. If the blob is not
/// yet eligable (too young), then just return None, otherwise we return the healed entries
@ -111,13 +156,15 @@ fn heal_blob(
healing_deadline: DateTime,
key: String,
entries: &[BlobstoreSyncQueueEntry],
) -> Option<impl Future<Item = (), Error = Error>> {
) -> Option<impl Future<Item = HealStats, Error = Error>> {
// This is needed as we load by key, and a given key may have entries both before and after
// the deadline. We leave the key rather than re-add to avoid entries always being too new.
if !entries.iter().all(|e| e.timestamp < healing_deadline) {
return None;
}
let num_entries: usize = entries.len();
let (seen_blobstores, unknown_seen_blobstores): (HashSet<_>, HashSet<_>) =
entries.iter().partition_map(|entry| {
let id = entry.blobstore_id;
@ -128,6 +175,8 @@ fn heal_blob(
}
});
let num_unknown_entries: usize = unknown_seen_blobstores.len();
if !unknown_seen_blobstores.is_empty() {
warn!(
ctx.logger(),
@ -149,7 +198,14 @@ fn heal_blob(
if stores_to_heal.is_empty() || seen_blobstores.is_empty() {
// All blobstores have been synchronized or all are unknown to be requeued
return Some(
requeue_partial_heal(ctx, sync_queue, key, unknown_seen_blobstores).left_future(),
requeue_partial_heal(ctx, sync_queue, key, unknown_seen_blobstores)
.map(move |()| HealStats {
queue_del: num_entries,
queue_add: num_unknown_entries,
put_success: 0,
put_failure: 0,
})
.left_future(),
);
}
@ -196,6 +252,7 @@ fn heal_blob(
Either::Right(id)
}
});
if !unhealed_stores.is_empty() || !unknown_seen_blobstores.is_empty() {
// Add good_sources to the healed_stores as we should write all
// known good blobstores so that the stores_to_heal logic run on read
@ -206,6 +263,14 @@ fn heal_blob(
for b in fetch_data.good_sources {
healed_stores.insert(b);
}
let heal_stats = HealStats {
queue_del: num_entries,
queue_add: healed_stores.len() + num_unknown_entries,
put_success: healed_stores.len(),
put_failure: unhealed_stores.len(),
};
// Add unknown stores to queue as well so we try them later
for b in unknown_seen_blobstores {
healed_stores.insert(b);
@ -217,9 +282,17 @@ fn heal_blob(
healed_stores,
unhealed_stores
);
requeue_partial_heal(ctx, sync_queue, key, healed_stores).left_future()
requeue_partial_heal(ctx, sync_queue, key, healed_stores)
.map(|()| heal_stats)
.left_future()
} else {
futures::future::ok(()).right_future()
let heal_stats = HealStats {
queue_del: num_entries,
queue_add: num_unknown_entries,
put_success: healed_stores.len(),
put_failure: unhealed_stores.len(),
};
futures::future::ok(heal_stats).right_future()
}
})
});
@ -300,6 +373,11 @@ fn cleanup_after_healing(
sync_queue: Arc<dyn BlobstoreSyncQueue>,
entries: Vec<BlobstoreSyncQueueEntry>,
) -> impl Future<Item = (), Error = Error> {
info!(
ctx.logger(),
"Deleting {} actioned queue entries",
entries.len()
);
sync_queue.del(ctx, entries)
}
@ -332,6 +410,7 @@ fn requeue_partial_heal(
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use futures::Future;
use futures_ext::BoxFuture;
use std::iter::FromIterator;
@ -570,7 +649,7 @@ mod tests {
);
let r = fut.wait();
assert!(r.is_ok());
assert_eq!(Some(()), r.unwrap(), "expecting to delete entries");
assert_matches!(r.unwrap(), Some(_), "expecting to delete entries");
assert_eq!(1, underlying_stores.get(&bids[0]).unwrap().len());
assert_eq!(1, underlying_stores.get(&bids[1]).unwrap().len());
assert_eq!(
@ -659,7 +738,7 @@ mod tests {
);
let r = fut.wait();
assert!(r.is_ok());
assert_eq!(Some(()), r.unwrap(), "expecting to delete entries");
assert_matches!(r.unwrap(), Some(_), "expecting to delete entries");
assert_eq!(0, sync_queue.len());
assert_eq!(1, underlying_stores.get(&bids[0]).unwrap().len());
assert_eq!(1, underlying_stores.get(&bids[1]).unwrap().len());
@ -690,7 +769,7 @@ mod tests {
);
let r = fut.wait();
assert!(r.is_ok());
assert_eq!(Some(()), r.unwrap(), "expecting to delete entries");
assert_matches!(r.unwrap(), Some(_), "expecting to delete entries");
assert_eq!(1, sync_queue.len(), "expecting 1 new entries on queue");
assert_eq!(
0,
@ -722,7 +801,7 @@ mod tests {
);
let r = fut.wait();
assert!(r.is_ok());
assert_eq!(Some(()), r.unwrap(), "expecting to delete entries");
assert_matches!(r.unwrap(), Some(_), "expecting to delete entries");
assert_eq!(3, sync_queue.len(), "expecting 3 new entries on queue, i.e. all sources for known stores, plus the unknown store");
assert_eq!(
1,
@ -775,7 +854,7 @@ mod tests {
);
let r = fut.wait();
assert!(r.is_ok());
assert_eq!(Some(()), r.unwrap(), "expecting to delete entries");
assert_matches!(r.unwrap(), Some(_), "expecting to delete entries");
assert_eq!(1, underlying_stores.get(&bids[0]).unwrap().len());
assert_eq!(
1,
@ -819,7 +898,7 @@ mod tests {
);
let r = fut.wait();
assert!(r.is_ok());
assert_eq!(Some(()), r.unwrap(), "expecting to delete entries");
assert_matches!(r.unwrap(), Some(_), "expecting to delete entries");
assert_eq!(1, underlying_stores.get(&bids[0]).unwrap().len());
assert_eq!(
1,
@ -868,7 +947,7 @@ mod tests {
);
let r = fut.wait();
assert!(r.is_ok());
assert_eq!(Some(()), r.unwrap(), "expecting to delete entries");
assert_matches!(r.unwrap(), Some(_), "expecting to delete entries");
assert_eq!(1, underlying_stores.get(&bids[0]).unwrap().len());
assert_eq!(
1,

View File

@ -173,9 +173,14 @@ fn ensure_small_db_replication_lag(
replication_lag_db_conns: Arc<Vec<Connection>>,
) -> impl Future<Item = (), Error = Error> {
// Make sure we've slept at least once before continuing
let already_slept = false;
let last_max_lag: Option<usize> = None;
loop_fn(last_max_lag, move |last_max_lag| {
if last_max_lag.is_some() && last_max_lag.unwrap() < MAX_ALLOWED_REPLICATION_LAG_SECS {
// No need check rep lag again, was ok on last loop
return ok(Loop::Break(())).left_future();
}
loop_fn(already_slept, move |already_slept| {
// Check what max replication lag on replicas, and sleep for that long.
// This is done in order to avoid overloading the db.
let mut lag_secs_futs = vec![];
@ -189,23 +194,20 @@ fn ensure_small_db_replication_lag(
}
cloned!(logger);
join_all(lag_secs_futs).and_then(move |lags| {
let max_lag = lags.into_iter().max().unwrap_or(0);
info!(logger, "Replication lag is {} secs", max_lag);
if max_lag < MAX_ALLOWED_REPLICATION_LAG_SECS && already_slept {
ok(Loop::Break(())).left_future()
} else {
let max_lag = Duration::from_secs(max_lag as u64);
join_all(lag_secs_futs)
.and_then(move |lags| {
let max_lag_secs: usize = lags.into_iter().max().unwrap_or(0);
info!(logger, "Max replication lag is {} secs", max_lag_secs);
let max_lag = Duration::from_secs(max_lag_secs as u64);
let start = Instant::now();
let next_iter_deadline = start + max_lag;
Delay::new(next_iter_deadline)
.map(|()| Loop::Continue(true))
.map(move |()| Loop::Continue(Some(max_lag_secs)))
.from_err()
.right_future()
}
})
})
.right_future()
})
}
@ -248,11 +250,9 @@ fn main() -> Result<()> {
let blobstore_sync_queue_limit = value_t!(matches, "sync-queue-limit", usize).unwrap_or(10000);
let dry_run = matches.is_present("dry-run");
let healer = {
let logger = logger.new(o!(
"storage" => format!("{:#?}", storage_config),
));
info!(logger, "Using storage_config {:#?}", storage_config);
let healer = {
let scheduled = maybe_schedule_healer_for_storage(
dry_run,
blobstore_sync_queue_limit,