mononoke/blobstore_healer: add comments and type annotations

Summary: Basically notes I took for myself to truely understand the code.

Reviewed By: StanislavGlebik

Differential Revision: D15908406

fbshipit-source-id: 3f21f7a1ddce8e15ceeeffdb5518fd7f5b1749c4
This commit is contained in:
Alex Hornby 2019-08-05 03:46:26 -07:00 committed by Facebook Github Bot
parent 978242fb35
commit 3d27faba08
2 changed files with 52 additions and 36 deletions

View File

@ -55,13 +55,15 @@ impl Healer {
}
}
/// Heal one batch of entries. It selects a set of entries which are not too young (bounded
/// by ENTRY_HEALING_MIN_AGE) up to `blobstore_sync_queue_limit` at once.
pub fn heal(&self, ctx: CoreContext) -> impl Future<Item = (), Error = Error> {
cloned!(
self.logger,
self.blobstore_sync_queue_limit,
self.rate_limiter,
self.sync_queue,
self.blobstores
self.blobstores,
);
let now = DateTime::now().into_chrono();
@ -73,7 +75,7 @@ impl Healer {
healing_deadline.clone(),
blobstore_sync_queue_limit,
)
.and_then(move |queue_entries| {
.and_then(move |queue_entries: Vec<BlobstoreSyncQueueEntry>| {
cloned!(rate_limiter);
let healing_futures: Vec<_> = queue_entries
@ -100,17 +102,20 @@ impl Healer {
healing_futures.len()
);
futures::stream::futures_unordered(healing_futures.into_iter())
futures::stream::futures_unordered(healing_futures)
.collect()
.and_then(move |cleaned_entries| {
let v = cleaned_entries.into_iter().flatten().collect();
cleanup_after_healing(ctx, sync_queue, v)
.and_then(move |cleaned_entries: Vec<Vec<BlobstoreSyncQueueEntry>>| {
let cleaned = cleaned_entries.into_iter().flatten().collect();
cleanup_after_healing(ctx, sync_queue, cleaned)
})
})
.map(|_| ())
}
}
/// Heal an individual blob. The `entries` are the blobstores which have successfully stored
/// this blob; we need to replicate them onto the remaining `blobstores`. If the blob is not
/// yet eligable (too young), then just return None, otherwise we return the healed entries
/// which have now been dealt with.
fn heal_blob(
ctx: CoreContext,
sync_queue: Arc<dyn BlobstoreSyncQueue>,
@ -122,7 +127,7 @@ fn heal_blob(
let seen_blobstores: HashSet<_> = entries
.iter()
.filter_map(|entry| {
let id = entry.blobstore_id.clone();
let id = entry.blobstore_id;
if blobstores.contains_key(&id) {
Some(id)
} else {
@ -167,21 +172,25 @@ fn heal_blob(
.map(|bid| {
let blobstore = blobstores
.get(&bid)
.expect("missing_blobstores contains only existing blobstores");
.expect("missing_blobstores contains unknown blobstore?");
blobstore
.put(ctx.clone(), key.clone(), blob.clone())
.then(move |result| Ok((bid, result.is_ok())))
})
.collect();
// XXX(jsgf) Don't really understand this. I'd expect it to filter the missing_blobstores
// by put_ok, and then return only those (ie, leave the entries which didn't store
// correctly in the queue). This logic seems to report success if everything was successful,
// otherwise it re-puts the successes into the queue (via report_partial_heal), and returns
// an empty "to be cleaned" vector.
join_all(heal_blobstores).and_then(move |heal_results| {
if heal_results.iter().all(|(_, result)| *result) {
if heal_results.iter().all(|(_, put_ok)| *put_ok) {
futures::future::ok(entries).left_future()
} else {
let healed_blobstores =
heal_results
.into_iter()
.filter_map(|(id, result)| if result { Some(id) } else { None });
let healed_blobstores = heal_results
.into_iter()
.filter_map(|(id, put_ok)| Some(id).filter(|_| put_ok));
report_partial_heal(ctx, sync_queue, key, healed_blobstores)
.map(|_| vec![])
.right_future()
@ -192,6 +201,11 @@ fn heal_blob(
Some(heal_future.right_future())
}
/// Fetch a blob by `key` from one of the `seen_blobstores`. This tries them one at at time
/// sequentially, until either it find the entry or it fails.
/// TODO: if one of the blobstores returns "not found" (None) rather than an error (or success),
/// we should add that blobstore to the missing set. (Currently it just fails, which will not
/// be recoverable.)
fn fetch_blob(
ctx: CoreContext,
blobstores: Arc<HashMap<BlobstoreId, Arc<dyn Blobstore>>>,
@ -224,7 +238,7 @@ fn fetch_blob(
Err(_) => return Ok(Loop::Continue(blobstores_to_fetch)),
Ok(None) => {
return Err(format_err!(
"Blobstore {:?} retruned None even though it should contain data",
"Blobstore {:?} returned None even though it should contain data",
bid
));
}
@ -236,6 +250,7 @@ fn fetch_blob(
.from_err()
}
/// Removed healed entries from the queue.
fn cleanup_after_healing(
ctx: CoreContext,
sync_queue: Arc<dyn BlobstoreSyncQueue>,
@ -244,6 +259,7 @@ fn cleanup_after_healing(
sync_queue.del(ctx, entries)
}
/// ??? Don't understand this. This is putting the entries we healed back into the queue?
fn report_partial_heal(
ctx: CoreContext,
sync_queue: Arc<dyn BlobstoreSyncQueue>,

View File

@ -57,9 +57,9 @@ fn maybe_schedule_healer_for_storage(
_ => bail_msg!("Repo doesn't use Multiplexed blobstore"),
};
let blobstores = {
let blobstores: HashMap<_, BoxFuture<Arc<dyn Blobstore + 'static>, _>> = {
let mut blobstores = HashMap::new();
for (id, args) in blobstores_args.into_iter() {
for (id, args) in blobstores_args {
match args {
BlobConfig::Manifold { bucket, prefix } => {
let blobstore = ThriftManifoldBlob::new(bucket)
@ -138,23 +138,24 @@ fn maybe_schedule_healer_for_storage(
replication_lag_db_conns.push(conn_builder.build_read_only());
}
let heal = blobstores.and_then(move |blobstores| {
let repo_healer = Healer::new(
logger.clone(),
blobstore_sync_queue_limit,
rate_limiter,
sync_queue,
Arc::new(blobstores),
);
let heal = blobstores.and_then(
move |blobstores: HashMap<_, Arc<dyn Blobstore + 'static>>| {
let repo_healer = Healer::new(
logger.clone(),
blobstore_sync_queue_limit,
rate_limiter,
sync_queue,
Arc::new(blobstores),
);
if dry_run {
// TODO(luk) use a proper context here and put the logger inside of it
let ctx = CoreContext::test_mock();
repo_healer.heal(ctx).boxify()
} else {
schedule_everlasting_healing(logger, repo_healer, replication_lag_db_conns)
}
});
if dry_run {
let ctx = CoreContext::new_with_logger(logger);
repo_healer.heal(ctx).boxify()
} else {
schedule_everlasting_healing(logger, repo_healer, replication_lag_db_conns)
}
},
);
Ok(myrouter::wait_for_myrouter(myrouter_port, db_address)
.and_then(|_| heal)
.boxify())
@ -168,8 +169,7 @@ fn schedule_everlasting_healing(
let replication_lag_db_conns = Arc::new(replication_lag_db_conns);
let fut = loop_fn((), move |()| {
// TODO(luk) use a proper context here and put the logger inside of it
let ctx = CoreContext::test_mock();
let ctx = CoreContext::new_with_logger(logger.clone());
cloned!(logger, replication_lag_db_conns);
repo_healer.heal(ctx).and_then(move |()| {
@ -236,7 +236,7 @@ fn setup_app<'a, 'b>() -> App<'a, 'b> {
--sync-queue-limit=[LIMIT] 'set limit for how many queue entries to process'
--dry-run 'performs a single healing and prints what would it do without doing it'
--db-regions=[REGIONS] 'comma-separated list of db regions where db replication lag is monitored'
--storage-id=[STORAGE_ID] 'id of storage to be healed'
--storage-id=[STORAGE_ID] 'id of storage to be healed, e.g. manifold_xdb_multiplex'
"#,
)
}