mononoke/blobstore_healer: limit concurrency of healing

Summary: Let's not heal 10000 blobs in parallel, that's a little too much data.

Reviewed By: farnz

Differential Revision: D22186543

fbshipit-source-id: 939fb5bc83b283090e979ac5fe3efc96191826d3
This commit is contained in:
Thomas Orozco 2020-06-23 08:58:49 -07:00 committed by Facebook GitHub Bot
parent e288354caf
commit edf93f8676
2 changed files with 19 additions and 3 deletions

View File

@ -25,6 +25,7 @@ use std::sync::Arc;
pub struct Healer {
blobstore_sync_queue_limit: usize,
heal_concurrency: usize,
sync_queue: Arc<dyn BlobstoreSyncQueue>,
blobstores: Arc<HashMap<BlobstoreId, Arc<dyn Blobstore>>>,
multiplex_id: MultiplexId,
@ -35,6 +36,7 @@ pub struct Healer {
impl Healer {
pub fn new(
blobstore_sync_queue_limit: usize,
heal_concurrency: usize,
sync_queue: Arc<dyn BlobstoreSyncQueue>,
blobstores: Arc<HashMap<BlobstoreId, Arc<dyn Blobstore>>>,
multiplex_id: MultiplexId,
@ -43,6 +45,7 @@ impl Healer {
) -> Self {
Self {
blobstore_sync_queue_limit,
heal_concurrency,
sync_queue,
blobstores,
multiplex_id,
@ -65,6 +68,7 @@ impl Healer {
);
let max_batch_size = self.blobstore_sync_queue_limit;
let heal_concurrency = self.heal_concurrency;
let drain_only = self.drain_only;
let multiplex_id = self.multiplex_id;
@ -137,7 +141,8 @@ impl Healer {
"Found {} blobs to be healed... Doing it", last_batch_size
);
futures_old::stream::futures_unordered(healing_futures)
futures_old::stream::iter_ok(healing_futures)
.buffered(heal_concurrency)
.collect()
.and_then(
move |heal_res: Vec<(HealStats, Vec<BlobstoreSyncQueueEntry>)>| {
@ -1179,7 +1184,7 @@ mod tests {
.compat()
.await?;
let healer = Healer::new(1000, sync_queue.clone(), stores, mp, None, false);
let healer = Healer::new(1000, 10, sync_queue.clone(), stores, mp, None, false);
healer.heal(ctx.clone(), DateTime::now()).compat().await?;
@ -1238,7 +1243,7 @@ mod tests {
// We aren't healing blobs for old_mp, so expect to only have 1 blob in each
// blobstore at the end of the test.
let healer = Healer::new(1000, sync_queue.clone(), stores, mp, None, false);
let healer = Healer::new(1000, 10, sync_queue.clone(), stores, mp, None, false);
healer.heal(ctx.clone(), DateTime::now()).compat().await?;
assert_eq!(0, sync_queue.len(ctx.clone(), mp).compat().await?);

View File

@ -44,6 +44,7 @@ const CONFIGERATOR_REGIONS_CONFIG: &str = "myrouter/regions.json";
const QUIET_ARG: &'static str = "quiet";
const ITER_LIMIT_ARG: &'static str = "iteration-limit";
const HEAL_MIN_AGE_ARG: &'static str = "heal-min-age-secs";
const HEAL_CONCURRENCY_ARG: &str = "heal-concurrency";
lazy_static! {
/// Minimal age of entry to consider if it has to be healed
@ -100,6 +101,7 @@ async fn maybe_schedule_healer_for_storage(
dry_run: bool,
drain_only: bool,
blobstore_sync_queue_limit: usize,
heal_concurrency: usize,
storage_config: StorageConfig,
mysql_options: MysqlOptions,
source_blobstore_key: Option<String>,
@ -205,6 +207,7 @@ async fn maybe_schedule_healer_for_storage(
let multiplex_healer = Healer::new(
blobstore_sync_queue_limit,
heal_concurrency,
sync_queue,
Arc::new(blobstores),
multiplex_id,
@ -297,6 +300,12 @@ fn setup_app<'a, 'b>(app_name: &str) -> App<'a, 'b> {
.takes_value(true)
.required(false)
.help("Seconds. If specified, override default minimum age to heal of 120 seconds"),
).arg(
Arg::with_name(HEAL_CONCURRENCY_ARG)
.long(HEAL_CONCURRENCY_ARG)
.takes_value(true)
.required(false)
.help("How maby blobs to heal concurrently."),
);
args::add_fb303_args(app)
}
@ -319,6 +328,7 @@ fn main(fb: FacebookInit) -> Result<()> {
.ok_or(format_err!("Storage id `{}` not found", storage_id))?;
let source_blobstore_key = matches.value_of("blobstore-key-like");
let blobstore_sync_queue_limit = value_t!(matches, "sync-queue-limit", usize).unwrap_or(10000);
let heal_concurrency = value_t!(matches, HEAL_CONCURRENCY_ARG, usize).unwrap_or(100);
let dry_run = matches.is_present("dry-run");
let drain_only = matches.is_present("drain-only");
if drain_only && source_blobstore_key.is_none() {
@ -344,6 +354,7 @@ fn main(fb: FacebookInit) -> Result<()> {
dry_run,
drain_only,
blobstore_sync_queue_limit,
heal_concurrency,
storage_config,
mysql_options,
source_blobstore_key.map(|s| s.to_string()),