From 3d27faba08c50c7937a1c1f46bf04a3bc95f46cf Mon Sep 17 00:00:00 2001 From: Alex Hornby Date: Mon, 5 Aug 2019 03:46:26 -0700 Subject: [PATCH] mononoke/blobstore_healer: add comments and type annotations Summary: Basically notes I took for myself to truely understand the code. Reviewed By: StanislavGlebik Differential Revision: D15908406 fbshipit-source-id: 3f21f7a1ddce8e15ceeeffdb5518fd7f5b1749c4 --- cmds/blobstore_healer/healer.rs | 46 ++++++++++++++++++++++----------- cmds/blobstore_healer/main.rs | 42 +++++++++++++++--------------- 2 files changed, 52 insertions(+), 36 deletions(-) diff --git a/cmds/blobstore_healer/healer.rs b/cmds/blobstore_healer/healer.rs index 9a3dbcb500..3489dc3a33 100644 --- a/cmds/blobstore_healer/healer.rs +++ b/cmds/blobstore_healer/healer.rs @@ -55,13 +55,15 @@ impl Healer { } } + /// Heal one batch of entries. It selects a set of entries which are not too young (bounded + /// by ENTRY_HEALING_MIN_AGE) up to `blobstore_sync_queue_limit` at once. pub fn heal(&self, ctx: CoreContext) -> impl Future { cloned!( self.logger, self.blobstore_sync_queue_limit, self.rate_limiter, self.sync_queue, - self.blobstores + self.blobstores, ); let now = DateTime::now().into_chrono(); @@ -73,7 +75,7 @@ impl Healer { healing_deadline.clone(), blobstore_sync_queue_limit, ) - .and_then(move |queue_entries| { + .and_then(move |queue_entries: Vec| { cloned!(rate_limiter); let healing_futures: Vec<_> = queue_entries @@ -100,17 +102,20 @@ impl Healer { healing_futures.len() ); - futures::stream::futures_unordered(healing_futures.into_iter()) + futures::stream::futures_unordered(healing_futures) .collect() - .and_then(move |cleaned_entries| { - let v = cleaned_entries.into_iter().flatten().collect(); - cleanup_after_healing(ctx, sync_queue, v) + .and_then(move |cleaned_entries: Vec>| { + let cleaned = cleaned_entries.into_iter().flatten().collect(); + cleanup_after_healing(ctx, sync_queue, cleaned) }) }) - .map(|_| ()) } } +/// Heal an individual blob. The `entries` are the blobstores which have successfully stored +/// this blob; we need to replicate them onto the remaining `blobstores`. If the blob is not +/// yet eligable (too young), then just return None, otherwise we return the healed entries +/// which have now been dealt with. fn heal_blob( ctx: CoreContext, sync_queue: Arc, @@ -122,7 +127,7 @@ fn heal_blob( let seen_blobstores: HashSet<_> = entries .iter() .filter_map(|entry| { - let id = entry.blobstore_id.clone(); + let id = entry.blobstore_id; if blobstores.contains_key(&id) { Some(id) } else { @@ -167,21 +172,25 @@ fn heal_blob( .map(|bid| { let blobstore = blobstores .get(&bid) - .expect("missing_blobstores contains only existing blobstores"); + .expect("missing_blobstores contains unknown blobstore?"); blobstore .put(ctx.clone(), key.clone(), blob.clone()) .then(move |result| Ok((bid, result.is_ok()))) }) .collect(); + // XXX(jsgf) Don't really understand this. I'd expect it to filter the missing_blobstores + // by put_ok, and then return only those (ie, leave the entries which didn't store + // correctly in the queue). This logic seems to report success if everything was successful, + // otherwise it re-puts the successes into the queue (via report_partial_heal), and returns + // an empty "to be cleaned" vector. join_all(heal_blobstores).and_then(move |heal_results| { - if heal_results.iter().all(|(_, result)| *result) { + if heal_results.iter().all(|(_, put_ok)| *put_ok) { futures::future::ok(entries).left_future() } else { - let healed_blobstores = - heal_results - .into_iter() - .filter_map(|(id, result)| if result { Some(id) } else { None }); + let healed_blobstores = heal_results + .into_iter() + .filter_map(|(id, put_ok)| Some(id).filter(|_| put_ok)); report_partial_heal(ctx, sync_queue, key, healed_blobstores) .map(|_| vec![]) .right_future() @@ -192,6 +201,11 @@ fn heal_blob( Some(heal_future.right_future()) } +/// Fetch a blob by `key` from one of the `seen_blobstores`. This tries them one at at time +/// sequentially, until either it find the entry or it fails. +/// TODO: if one of the blobstores returns "not found" (None) rather than an error (or success), +/// we should add that blobstore to the missing set. (Currently it just fails, which will not +/// be recoverable.) fn fetch_blob( ctx: CoreContext, blobstores: Arc>>, @@ -224,7 +238,7 @@ fn fetch_blob( Err(_) => return Ok(Loop::Continue(blobstores_to_fetch)), Ok(None) => { return Err(format_err!( - "Blobstore {:?} retruned None even though it should contain data", + "Blobstore {:?} returned None even though it should contain data", bid )); } @@ -236,6 +250,7 @@ fn fetch_blob( .from_err() } +/// Removed healed entries from the queue. fn cleanup_after_healing( ctx: CoreContext, sync_queue: Arc, @@ -244,6 +259,7 @@ fn cleanup_after_healing( sync_queue.del(ctx, entries) } +/// ??? Don't understand this. This is putting the entries we healed back into the queue? fn report_partial_heal( ctx: CoreContext, sync_queue: Arc, diff --git a/cmds/blobstore_healer/main.rs b/cmds/blobstore_healer/main.rs index ae80794c00..1b7f4d6e3c 100644 --- a/cmds/blobstore_healer/main.rs +++ b/cmds/blobstore_healer/main.rs @@ -57,9 +57,9 @@ fn maybe_schedule_healer_for_storage( _ => bail_msg!("Repo doesn't use Multiplexed blobstore"), }; - let blobstores = { + let blobstores: HashMap<_, BoxFuture, _>> = { let mut blobstores = HashMap::new(); - for (id, args) in blobstores_args.into_iter() { + for (id, args) in blobstores_args { match args { BlobConfig::Manifold { bucket, prefix } => { let blobstore = ThriftManifoldBlob::new(bucket) @@ -138,23 +138,24 @@ fn maybe_schedule_healer_for_storage( replication_lag_db_conns.push(conn_builder.build_read_only()); } - let heal = blobstores.and_then(move |blobstores| { - let repo_healer = Healer::new( - logger.clone(), - blobstore_sync_queue_limit, - rate_limiter, - sync_queue, - Arc::new(blobstores), - ); + let heal = blobstores.and_then( + move |blobstores: HashMap<_, Arc>| { + let repo_healer = Healer::new( + logger.clone(), + blobstore_sync_queue_limit, + rate_limiter, + sync_queue, + Arc::new(blobstores), + ); - if dry_run { - // TODO(luk) use a proper context here and put the logger inside of it - let ctx = CoreContext::test_mock(); - repo_healer.heal(ctx).boxify() - } else { - schedule_everlasting_healing(logger, repo_healer, replication_lag_db_conns) - } - }); + if dry_run { + let ctx = CoreContext::new_with_logger(logger); + repo_healer.heal(ctx).boxify() + } else { + schedule_everlasting_healing(logger, repo_healer, replication_lag_db_conns) + } + }, + ); Ok(myrouter::wait_for_myrouter(myrouter_port, db_address) .and_then(|_| heal) .boxify()) @@ -168,8 +169,7 @@ fn schedule_everlasting_healing( let replication_lag_db_conns = Arc::new(replication_lag_db_conns); let fut = loop_fn((), move |()| { - // TODO(luk) use a proper context here and put the logger inside of it - let ctx = CoreContext::test_mock(); + let ctx = CoreContext::new_with_logger(logger.clone()); cloned!(logger, replication_lag_db_conns); repo_healer.heal(ctx).and_then(move |()| { @@ -236,7 +236,7 @@ fn setup_app<'a, 'b>() -> App<'a, 'b> { --sync-queue-limit=[LIMIT] 'set limit for how many queue entries to process' --dry-run 'performs a single healing and prints what would it do without doing it' --db-regions=[REGIONS] 'comma-separated list of db regions where db replication lag is monitored' - --storage-id=[STORAGE_ID] 'id of storage to be healed' + --storage-id=[STORAGE_ID] 'id of storage to be healed, e.g. manifold_xdb_multiplex' "#, ) }