diff --git a/eden/mononoke/cmds/blobstore_healer/main.rs b/eden/mononoke/cmds/blobstore_healer/main.rs index 300d4a313d..a43ef3e8e9 100644 --- a/eden/mononoke/cmds/blobstore_healer/main.rs +++ b/eden/mononoke/cmds/blobstore_healer/main.rs @@ -30,8 +30,9 @@ use futures::{ future::{self, join_all, loop_fn, ok, Loop}, prelude::*, }; -use futures_ext::{spawn_future, BoxFuture, FutureExt}; +use futures_ext::{BoxFuture, FutureExt}; use futures_preview::compat::Future01CompatExt; +use futures_preview::future::{FutureExt as _, TryFutureExt}; use healer::Healer; use lazy_static::lazy_static; use metaconfig_types::{BlobConfig, MetadataDBConfig, StorageConfig}; @@ -273,36 +274,39 @@ fn schedule_healing( iter_limit: Option, heal_min_age: ChronoDuration, ) -> BoxFuture<(), Error> { - let regional_conns = Arc::new(regional_conns); - let fut = loop_fn(0, move |count: u64| { - match iter_limit { - Some(limit) => { - if count >= limit { - return future::ok(Loop::Break(())).left_future(); + async move { + let mut count = 0; + let mut last_batch_was_full_size = true; + + let regional_conns = Arc::new(regional_conns); + + loop { + count += 1; + if let Some(iter_limit) = iter_limit { + if count > iter_limit { + break Ok(()); } } - _ => (), - } - let max_replication_lag_fn = max_replication_lag(regional_conns.clone()); - let now = DateTime::now().into_chrono(); - let healing_deadline = DateTime::new(now - heal_min_age); - multiplex_healer - .heal(ctx.clone(), healing_deadline) - .and_then({ - let logger = ctx.logger().clone(); - move |last_batch_full_sized| { - ensure_small_db_replication_lag( - logger, - max_replication_lag_fn, - last_batch_full_sized, - ) - .map(move |_lag| Loop::Continue(count + 1)) - } - }) - .right_future() - }); - spawn_future(fut).boxify() + ensure_small_db_replication_lag( + ctx.logger().clone(), + max_replication_lag(regional_conns.clone()), + last_batch_was_full_size, + ) + .compat() + .await?; + + let now = DateTime::now().into_chrono(); + let healing_deadline = DateTime::new(now - heal_min_age); + last_batch_was_full_size = multiplex_healer + .heal(ctx.clone(), healing_deadline) + .compat() + .await?; + } + } + .boxed() + .compat() + .boxify() } fn max_replication_lag( diff --git a/eden/mononoke/tests/integration/test-blobstore_healer.t b/eden/mononoke/tests/integration/test-blobstore_healer.t index d31c59efe0..802e0451ae 100644 --- a/eden/mononoke/tests/integration/test-blobstore_healer.t +++ b/eden/mononoke/tests/integration/test-blobstore_healer.t @@ -23,11 +23,11 @@ Check that healer queue has all items Run the heal $ mononoke_blobstore_healer -q --iteration-limit=1 --heal-min-age-secs=0 --storage-id=blobstore --sync-queue-limit=100 2>&1 | strip_glog + Max replication lag is sqlite_region, 0s + As there are items remaining and lag < bound, carry on without pause Found 30 blobs to be healed... Doing it For 30 blobs did HealStats { queue_add: 0, queue_del: 90, put_success: 0, put_failure: 0 } Deleting 90 actioned queue entries - Max replication lag is sqlite_region, 0s - As the last batch was not full sized, wait at least one second Check that healer queue has drained $ sqlite3 "$TESTTMP/monsql/sqlite_dbs" "select count(*) FROM blobstore_sync_queue"; @@ -57,12 +57,12 @@ Run the heal, with write errors injected, simulating store still bad > uniq -c | sed 's/^ *//' > } $ mononoke_blobstore_healer --blobstore-write-chaos-rate 1 -q --iteration-limit=1 --heal-min-age-secs=0 --storage-id=blobstore --sync-queue-limit=100 2>&1 | strip_glog | count_log + 1 Max replication lag is sqlite_region, 0s + 1 As there are items remaining and lag < bound, carry on without pause 1 Found 30 blobs to be healed... Doing it 30 Adding source blobstores [BlobstoreId(1), BlobstoreId(2)] to the queue so that failed destination blob stores [BlobstoreId(0)] will be retried later 1 For 30 blobs did HealStats { queue_add: 60, queue_del: 60, put_success: 60, put_failure: 30 } 1 Deleting 60 actioned queue entries - 1 Max replication lag is sqlite_region, 0s - 1 As the last batch was not full sized, wait at least one second Check that healer queue still has the items, should not have drained $ sqlite3 "$TESTTMP/monsql/sqlite_dbs" "select count(*) FROM blobstore_sync_queue"; @@ -70,11 +70,11 @@ Check that healer queue still has the items, should not have drained Healer run again now store recovered $ mononoke_blobstore_healer -q --iteration-limit=1 --heal-min-age-secs=0 --storage-id=blobstore --sync-queue-limit=100 2>&1 | strip_glog | count_log + 1 Max replication lag is sqlite_region, 0s + 1 As there are items remaining and lag < bound, carry on without pause 1 Found 30 blobs to be healed... Doing it 1 For 30 blobs did HealStats { queue_add: 0, queue_del: 60, put_success: 30, put_failure: 0 } 1 Deleting 60 actioned queue entries - 1 Max replication lag is sqlite_region, 0s - 1 As the last batch was not full sized, wait at least one second Check that healer queue has drained $ sqlite3 "$TESTTMP/monsql/sqlite_dbs" "select count(*) FROM blobstore_sync_queue"; diff --git a/eden/mononoke/tests/integration/test-mononoke-admin.t b/eden/mononoke/tests/integration/test-mononoke-admin.t index fcb7074f4d..491946a003 100644 --- a/eden/mononoke/tests/integration/test-mononoke-admin.t +++ b/eden/mononoke/tests/integration/test-mononoke-admin.t @@ -19,11 +19,11 @@ setup configuration Drain the healer queue $ mononoke_blobstore_healer -q --iteration-limit=1 --heal-min-age-secs=0 --storage-id=blobstore --sync-queue-limit=100 2>&1 | strip_glog | sed -re '/^(Could not connect to a replica in)|(Monitoring regions:)/d' + Max replication lag is *, 0s (glob) + As there are items remaining and lag < bound, carry on without pause Found 30 blobs to be healed... Doing it For 30 blobs did HealStats { queue_add: 0, queue_del: 60, put_success: 0, put_failure: 0 } Deleting 60 actioned queue entries - Max replication lag is *, *s (glob) - As the last batch was not full sized, wait at least one second Check bookmarks $ mononoke_admin bookmarks log master_bookmark 2>&1 | grep master_bookmark