blobstore_healer: use buffered_weight_limited to avoid OOMs

Summary:
`blobstore_healer` works by healing blobs in turn, with some level of concurrency. Healing each blob consumes at least `O(blob_size)` or memory, so healing multiple blobs consumes their combined size of memory. Because blob sizes are not distributed uniformly, we cannot just calculate the desired concurrency level once and for all. Prior to this diff, this is what we did and whenever a few multi-gb blobs ended up in the same concurrently-healed batch, the healer OOMed. To help with this problem, this diff starts using dynamic concurrency - it assigns weight to each healed blob and only concurrently heals up to a certain total weight of blobs. This way, we can limit the total amount of memory consumed by our healer.

This solution is not perfect for a variety of reasons:
- if a single blob is larger than the total allowed weight, we'll still let it through. It's better than never healing it, but it means that OOMs are still possible in theory.
- we do not yet know the sizes of all the blobs in the queue. To mitigate that, I took a look at the known sizes distribution and saw that between 0 and 2KB is the most common size range. I defaulted to 1KB size of the unknown blob

Note 1: I had to make `heal_blob` consume it's args instead of borrowing them because `buffered_weight_limited` needs `'static` lifetime for the futures.

Note 2: When using `futures_ext`, I explicitly rename them to `futures_03_ext`, despite the fact that `blobstore_healer` does not depend on the older version. This is because `Cargo.toml` uses the same `[dependencies]` section for the combined dependencies of all the targets in the same `TARGETS` file. As there are other targets that claim the name of `futures_ext` for 0.1 version, I decided that it's easier to just use `_03_` name here than fix in other places. We can always change that of course.

Reviewed By: krallin

Differential Revision: D26106044

fbshipit-source-id: 4931d86d6e85d055ed0eefdd357b9ba6266a1c37
This commit is contained in:
Kostia Balytskyi 2021-01-29 10:10:34 -08:00 committed by Facebook GitHub Bot
parent eb566b5157
commit 5bc36ed39c
6 changed files with 105 additions and 26 deletions

View File

@ -165,6 +165,7 @@ failure_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", b
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master", version = "0.1.0" }
fbthrift = { git = "https://github.com/facebook/fbthrift.git", branch = "master", version = "0.0.1+unstable" }
futures_ext = { package = "futures_01_ext", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master", version = "0.1.0" }
futures_03_ext = { package = "futures_ext", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master", version = "0.1.0" }
futures_stats = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master", version = "0.1.0" }
lock_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master", version = "0.1.0" }
stats = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master", version = "0.1.0" }

View File

@ -11,8 +11,9 @@ use blobstore_sync_queue::{BlobstoreSyncQueue, BlobstoreSyncQueueEntry, Operatio
use context::CoreContext;
use futures::{
future::{join_all, FutureExt, TryFutureExt},
stream::{self, StreamExt, TryStreamExt},
stream::{self, TryStreamExt},
};
use futures_03_ext::{BufferedParams, FbStreamExt};
use itertools::{Either, Itertools};
use metaconfig_types::{BlobstoreId, MultiplexId};
use mononoke_types::{BlobstoreBytes, DateTime};
@ -35,11 +36,12 @@ mod tests;
const MIN_FETCH_FAILURE_DELAY: Duration = Duration::from_millis(1);
const MAX_FETCH_FAILURE_DELAY: Duration = Duration::from_millis(100);
const DEFAULT_BLOB_SIZE_BYTES: u64 = 1024;
pub struct Healer {
blobstore_sync_queue_limit: usize,
current_fetch_size: AtomicUsize,
heal_concurrency: usize,
buffered_params: BufferedParams,
sync_queue: Arc<dyn BlobstoreSyncQueue>,
blobstores: Arc<HashMap<BlobstoreId, Arc<dyn Blobstore>>>,
multiplex_id: MultiplexId,
@ -50,7 +52,7 @@ pub struct Healer {
impl Healer {
pub fn new(
blobstore_sync_queue_limit: usize,
heal_concurrency: usize,
buffered_params: BufferedParams,
sync_queue: Arc<dyn BlobstoreSyncQueue>,
blobstores: Arc<HashMap<BlobstoreId, Arc<dyn Blobstore>>>,
multiplex_id: MultiplexId,
@ -60,7 +62,7 @@ impl Healer {
Self {
blobstore_sync_queue_limit,
current_fetch_size: AtomicUsize::new(blobstore_sync_queue_limit),
heal_concurrency,
buffered_params,
sync_queue,
blobstores,
multiplex_id,
@ -120,13 +122,14 @@ impl Healer {
}
}
}
/// Heal one batch of entries. It selects a set of entries which are not too young (bounded
/// by healing_deadline) up to `blobstore_sync_queue_limit` at once.
/// Returns a tuple:
/// - first item indicates whether a full batch was fetcehd
/// - second item shows how many rows were deleted from the DB
pub async fn heal(&self, ctx: &CoreContext, healing_deadline: DateTime) -> Result<(bool, u64)> {
let heal_concurrency = self.heal_concurrency;
let buffered_params = self.buffered_params;
let drain_only = self.drain_only;
let multiplex_id = self.multiplex_id;
@ -168,7 +171,7 @@ impl Healer {
unique_operation_keys
);
let healing_futures: Vec<_> = queue_entries
let healing_futures: Vec<(_, u64)> = queue_entries
.into_iter()
.sorted_by_key(|entry| entry.blobstore_key.clone())
.group_by(|entry| entry.blobstore_key.clone())
@ -176,7 +179,7 @@ impl Healer {
.filter_map(|(key, entries)| {
let entries: Vec<_> = entries.collect();
if drain_only {
Some(
Some((
async move {
Ok((
HealStats {
@ -188,9 +191,23 @@ impl Healer {
entries,
))
}
.left_future(),
)
.boxed(),
1,
))
} else {
// The following relies on the fact that after a `.group_by` above
// all `entries` refer to the same blob, as well as that entries
// are not empty. If each of those is not true, it won't crash, but
// rather just assign an incorrect value to `healing_weight`, which
// is not critical, though undesired.
// In addition, if the blob does not have its size recorded, we'll
// just use the avg blob size as a healing weight.
let healing_weight = entries
.iter()
.filter_map(|entry| entry.blob_size)
.next()
.unwrap_or(DEFAULT_BLOB_SIZE_BYTES);
let heal_opt = heal_blob(
ctx,
&self.sync_queue,
@ -201,8 +218,10 @@ impl Healer {
&entries,
);
heal_opt.map(|heal| {
heal.map_ok(|heal_stats| (heal_stats, entries))
.right_future()
(
heal.map_ok(|heal_stats| (heal_stats, entries)).boxed(),
healing_weight,
)
})
}
})
@ -217,13 +236,14 @@ impl Healer {
info!(
ctx.logger(),
"Found {} blobs to be healed... Doing it with concurrency {}",
"Found {} blobs to be healed... Doing it with weight limit {}, max concurrency: {}",
last_batch_size,
heal_concurrency
buffered_params.weight_limit,
buffered_params.buffer_size,
);
let heal_res: Vec<_> = stream::iter(healing_futures)
.buffered(heal_concurrency)
.buffered_weight_limited(buffered_params)
.try_collect()
.await?;
let (chunk_stats, processed_entries): (Vec<_>, Vec<_>) = heal_res.into_iter().unzip();
@ -269,7 +289,7 @@ impl Sum for HealStats {
/// Heal an individual blob. The `entries` are the blobstores which have successfully stored
/// this blob; we need to replicate them onto the remaining `blobstores`. If the blob is not
/// yet eligable (too young), then just return None, otherwise we return the healed entries
/// yet eligible (too young), then just return None, otherwise we return the healed entries
/// which have now been dealt with and can be deleted.
fn heal_blob<'out>(
ctx: &'out CoreContext,

View File

@ -643,7 +643,18 @@ async fn healer_heal_with_failing_blobstore(fb: FacebookInit) -> Result<()> {
)];
sync_queue.add_many(&ctx, entries).await?;
let healer = Healer::new(1000, 10, sync_queue.clone(), stores, mp, None, false);
let healer = Healer::new(
1000,
BufferedParams {
buffer_size: 10,
weight_limit: 1_000_000_000,
},
sync_queue.clone(),
stores,
mp,
None,
false,
);
healer.heal(&ctx, DateTime::now()).await?;
@ -699,7 +710,18 @@ async fn healer_heal_with_default_multiplex_id(fb: FacebookInit) -> Result<()> {
// We aren't healing blobs for old_mp, so expect to only have 1 blob in each
// blobstore at the end of the test.
let healer = Healer::new(1000, 10, sync_queue.clone(), stores, mp, None, false);
let healer = Healer::new(
1000,
BufferedParams {
buffer_size: 10,
weight_limit: 1_000_000_000,
},
sync_queue.clone(),
stores,
mp,
None,
false,
);
healer.heal(&ctx, DateTime::now()).await?;
assert_eq!(0, sync_queue.len(&ctx, mp).await?);
@ -734,7 +756,18 @@ async fn healer_heal_complete_batch(fb: FacebookInit) -> Result<()> {
let sync_queue = Arc::new(SqlBlobstoreSyncQueue::with_sqlite_in_memory()?);
sync_queue.add_many(&ctx, entries).await?;
let healer = Healer::new(2, 10, sync_queue, stores, mp, None, false);
let healer = Healer::new(
2,
BufferedParams {
buffer_size: 10,
weight_limit: 1_000_000_000,
},
sync_queue,
stores,
mp,
None,
false,
);
let (complete_batch, _) = healer.heal(&ctx, DateTime::now()).await?;
assert!(complete_batch);
Ok(())
@ -763,7 +796,18 @@ async fn healer_heal_incomplete_batch(fb: FacebookInit) -> Result<()> {
let sync_queue = Arc::new(SqlBlobstoreSyncQueue::with_sqlite_in_memory()?);
sync_queue.add_many(&ctx, entries).await?;
let healer = Healer::new(20, 10, sync_queue, stores, mp, None, false);
let healer = Healer::new(
20,
BufferedParams {
buffer_size: 10,
weight_limit: 1_000_000_000,
},
sync_queue,
stores,
mp,
None,
false,
);
let (complete_batch, _) = healer.heal(&ctx, DateTime::now()).await?;
assert!(!complete_batch);
Ok(())

View File

@ -27,6 +27,7 @@ use context::{CoreContext, SessionContainer};
use dummy::{DummyBlobstore, DummyBlobstoreSyncQueue};
use fbinit::FacebookInit;
use futures::{compat::Future01CompatExt, future};
use futures_03_ext::BufferedParams;
use healer::Healer;
use lazy_static::lazy_static;
use metaconfig_types::{BlobConfig, DatabaseConfig, StorageConfig};
@ -47,6 +48,7 @@ const QUIET_ARG: &str = "quiet";
const ITER_LIMIT_ARG: &str = "iteration-limit";
const HEAL_MIN_AGE_ARG: &str = "heal-min-age-secs";
const HEAL_CONCURRENCY_ARG: &str = "heal-concurrency";
const HEAL_MAX_BYTES: &str = "heal-max-bytes";
lazy_static! {
/// Minimal age of entry to consider if it has to be healed
@ -59,7 +61,7 @@ async fn maybe_schedule_healer_for_storage(
dry_run: bool,
drain_only: bool,
blobstore_sync_queue_limit: usize,
heal_concurrency: usize,
buffered_params: BufferedParams,
storage_config: StorageConfig,
mysql_options: &MysqlOptions,
source_blobstore_key: Option<String>,
@ -166,7 +168,7 @@ async fn maybe_schedule_healer_for_storage(
let multiplex_healer = Healer::new(
blobstore_sync_queue_limit,
heal_concurrency,
buffered_params,
sync_queue,
Arc::new(blobstores),
multiplex_id,
@ -274,6 +276,13 @@ fn setup_app<'a, 'b>(app_name: &str) -> MononokeClapApp<'a, 'b> {
.takes_value(true)
.required(false)
.help("How maby blobs to heal concurrently."),
).arg(
Arg::with_name(HEAL_MAX_BYTES)
.long(HEAL_MAX_BYTES)
.takes_value(true)
.required(false)
.help("max combined size of concurrently healed blobs \
(approximate, will still let individual larger blobs through)")
)
}
@ -297,6 +306,7 @@ fn main(fb: FacebookInit) -> Result<()> {
let source_blobstore_key = matches.value_of("blobstore-key-like");
let blobstore_sync_queue_limit = value_t!(matches, "sync-queue-limit", usize).unwrap_or(10000);
let heal_concurrency = value_t!(matches, HEAL_CONCURRENCY_ARG, usize).unwrap_or(100);
let heal_max_bytes = value_t!(matches, HEAL_MAX_BYTES, u64).unwrap_or(10_000_000_000);
let dry_run = matches.is_present("dry-run");
let drain_only = matches.is_present("drain-only");
if drain_only && source_blobstore_key.is_none() {
@ -315,6 +325,10 @@ fn main(fb: FacebookInit) -> Result<()> {
let scuba = get_scuba_sample_builder(fb, &matches)?;
let ctx = SessionContainer::new_with_defaults(fb).new_context(logger.clone(), scuba);
let buffered_params = BufferedParams {
weight_limit: heal_max_bytes,
buffer_size: heal_concurrency,
};
let healer = maybe_schedule_healer_for_storage(
fb,
@ -322,7 +336,7 @@ fn main(fb: FacebookInit) -> Result<()> {
dry_run,
drain_only,
blobstore_sync_queue_limit,
heal_concurrency,
buffered_params,
storage_config,
&mysql_options,
source_blobstore_key.map(|s| s.to_string()),

View File

@ -26,7 +26,7 @@ Run the heal
Replication lag is * (glob)
Fetched 90 queue entires (before building healing futures)
Out of them 30 distinct blobstore keys, 30 distinct operation keys
Found 30 blobs to be healed... Doing it with concurrency 100
Found 30 blobs to be healed... Doing it with weight limit 10000000000, max concurrency: 100
For 30 blobs did HealStats { queue_add: 0, queue_del: 90, put_success: 0, put_failure: 0 }
Deleting 90 actioned queue entries
Iteration rows processed: * rows, *s; total: * rows, *s (glob)
@ -64,7 +64,7 @@ Run the heal, with write errors injected, simulating store still bad
1 Replication lag is * (glob)
1 Fetched 60 queue entires (before building healing futures)
1 Out of them 30 distinct blobstore keys, 30 distinct operation keys
1 Found 30 blobs to be healed... Doing it with concurrency 100
1 Found 30 blobs to be healed... Doing it with weight limit 10000000000, max concurrency: 100
30 Adding source blobstores [BlobstoreId(1), BlobstoreId(2)] to the queue so that failed destination blob stores [BlobstoreId(0)] will be retried later
1 For 30 blobs did HealStats { queue_add: 60, queue_del: 60, put_success: 60, put_failure: 30 }
1 Deleting 60 actioned queue entries
@ -80,7 +80,7 @@ Healer run again now store recovered
1 Replication lag is * (glob)
1 Fetched 60 queue entires (before building healing futures)
1 Out of them 30 distinct blobstore keys, 30 distinct operation keys
1 Found 30 blobs to be healed... Doing it with concurrency 100
1 Found 30 blobs to be healed... Doing it with weight limit 10000000000, max concurrency: 100
1 For 30 blobs did HealStats { queue_add: 0, queue_del: 60, put_success: 30, put_failure: 0 }
1 Deleting 60 actioned queue entries
1 Iteration rows processed: * rows, *s; total: * rows, *s (glob)

View File

@ -23,7 +23,7 @@ Drain the healer queue
Replication lag is * (glob)
Fetched 60 queue entires (before building healing futures)
Out of them 30 distinct blobstore keys, 30 distinct operation keys
Found 30 blobs to be healed... Doing it with concurrency 100
Found 30 blobs to be healed... Doing it with weight limit 10000000000, max concurrency: 100
For 30 blobs did HealStats { queue_add: 0, queue_del: 60, put_success: 0, put_failure: 0 }
Deleting 60 actioned queue entries
Iteration rows processed: * rows, *s; total: * rows, *s (glob)