blobstore healer: minimalistic implementation

Summary:
This version still misses:
- proper production-ready logging
- smarter handling of case where the queue entries related to each other do not fit in the limit or `older_than` limit, so the healer will heal much more entries without realizing it shouldn't do so.

Reviewed By: aslpavel

Differential Revision: D13528686

fbshipit-source-id: 0245becea7e4f0ac69383a7885ff3746d81c4add
This commit is contained in:
Lukas Piatkowski 2019-01-15 10:08:00 -08:00 committed by Facebook Github Bot
parent 74ed84b7c3
commit eba422a209
9 changed files with 763 additions and 37 deletions

View File

@ -359,10 +359,10 @@ impl BlobRepo {
.map(|blobstore| -> Arc<Blobstore> { Arc::new(blobstore) }) .map(|blobstore| -> Arc<Blobstore> { Arc::new(blobstore) })
.boxify() .boxify()
} }
RemoteBlobstoreArgs::Mysql(ref args) => { RemoteBlobstoreArgs::Mysql(args) => {
let blobstore: Arc<Blobstore> = Arc::new(Sqlblob::with_myrouter( let blobstore: Arc<Blobstore> = Arc::new(Sqlblob::with_myrouter(
repoid, repoid,
&args.shardmap, args.shardmap,
myrouter_port, myrouter_port,
args.shard_num, args.shard_num,
)); ));

View File

@ -20,20 +20,17 @@ extern crate sql_ext;
#[macro_use] #[macro_use]
extern crate stats; extern crate stats;
use std::sync::Arc;
use context::CoreContext;
use sql::Connection;
pub use sql_ext::SqlConstructors;
use cloned::cloned; use cloned::cloned;
use context::CoreContext;
use failure::{format_err, Error}; use failure::{format_err, Error};
use futures::{future, Future, IntoFuture}; use futures::{future, Future, IntoFuture};
use futures_ext::{BoxFuture, FutureExt}; use futures_ext::{BoxFuture, FutureExt};
use metaconfig::BlobstoreId; use metaconfig::BlobstoreId;
use mononoke_types::{DateTime, RepositoryId, Timestamp}; use mononoke_types::{DateTime, RepositoryId, Timestamp};
use sql::Connection;
pub use sql_ext::SqlConstructors;
use stats::Timeseries; use stats::Timeseries;
use std::sync::Arc;
define_stats! { define_stats! {
prefix = "mononoke.blobstore_sync_queue"; prefix = "mononoke.blobstore_sync_queue";
@ -74,6 +71,7 @@ pub trait BlobstoreSyncQueue: Send + Sync {
fn iter( fn iter(
&self, &self,
ctx: CoreContext, ctx: CoreContext,
repo_id: RepositoryId,
older_than: DateTime, older_than: DateTime,
limit: usize, limit: usize,
) -> BoxFuture<Vec<BlobstoreSyncQueueEntry>, Error>; ) -> BoxFuture<Vec<BlobstoreSyncQueueEntry>, Error>;
@ -96,10 +94,11 @@ impl BlobstoreSyncQueue for Arc<BlobstoreSyncQueue> {
fn iter( fn iter(
&self, &self,
ctx: CoreContext, ctx: CoreContext,
repo_id: RepositoryId,
older_than: DateTime, older_than: DateTime,
limit: usize, limit: usize,
) -> BoxFuture<Vec<BlobstoreSyncQueueEntry>, Error> { ) -> BoxFuture<Vec<BlobstoreSyncQueueEntry>, Error> {
(**self).iter(ctx, older_than, limit) (**self).iter(ctx, repo_id, older_than, limit)
} }
fn del(&self, ctx: CoreContext, entries: Vec<BlobstoreSyncQueueEntry>) -> BoxFuture<(), Error> { fn del(&self, ctx: CoreContext, entries: Vec<BlobstoreSyncQueueEntry>) -> BoxFuture<(), Error> {
@ -147,7 +146,7 @@ queries! {
FROM blobstore_sync_queue" FROM blobstore_sync_queue"
} }
read GetRangeOfEntries(older_than: Timestamp, limit: usize) -> ( read GetRangeOfEntries(repo_id: RepositoryId, older_than: Timestamp, limit: usize) -> (
RepositoryId, RepositoryId,
String, String,
BlobstoreId, BlobstoreId,
@ -156,7 +155,8 @@ queries! {
) { ) {
"SELECT repo_id, blobstore_key, blobstore_id, add_timestamp, id "SELECT repo_id, blobstore_key, blobstore_id, add_timestamp, id
FROM blobstore_sync_queue FROM blobstore_sync_queue
WHERE add_timestamp >= {older_than} WHERE repo_id = {repo_id}
AND add_timestamp <= {older_than}
ORDER BY id ORDER BY id
LIMIT {limit}" LIMIT {limit}"
} }
@ -208,33 +208,40 @@ impl BlobstoreSyncQueue for SqlBlobstoreSyncQueue {
InsertEntry::query( InsertEntry::query(
&self.write_connection, &self.write_connection,
&[(&repo_id, &blobstore_key, &blobstore_id, &timestamp.into())], &[(&repo_id, &blobstore_key, &blobstore_id, &timestamp.into())],
).map(|result| result.affected_rows() == 1) )
.boxify() .map(|result| result.affected_rows() == 1)
.boxify()
} }
fn iter( fn iter(
&self, &self,
_ctx: CoreContext, _ctx: CoreContext,
repo_id: RepositoryId,
older_than: DateTime, older_than: DateTime,
limit: usize, limit: usize,
) -> BoxFuture<Vec<BlobstoreSyncQueueEntry>, Error> { ) -> BoxFuture<Vec<BlobstoreSyncQueueEntry>, Error> {
STATS::iters.add_value(1); STATS::iters.add_value(1);
// query // query
GetRangeOfEntries::query(&self.read_master_connection, &older_than.into(), &limit) GetRangeOfEntries::query(
.map(|rows| { &self.read_master_connection,
rows.into_iter() &repo_id,
.map(|(repo_id, blobstore_key, blobstore_id, timestamp, id)| { &older_than.into(),
BlobstoreSyncQueueEntry { &limit,
repo_id, )
blobstore_key, .map(|rows| {
blobstore_id, rows.into_iter()
timestamp: timestamp.into(), .map(|(repo_id, blobstore_key, blobstore_id, timestamp, id)| {
id: Some(id), BlobstoreSyncQueueEntry {
} repo_id,
}) blobstore_key,
.collect() blobstore_id,
}) timestamp: timestamp.into(),
.boxify() id: Some(id),
}
})
.collect()
})
.boxify()
} }
fn del( fn del(

View File

@ -53,10 +53,16 @@ fn test_simple() {
assert_eq!(entries.len(), 2); assert_eq!(entries.len(), 2);
// iter // iter
let some_entries = rt.block_on(queue.iter(ctx.clone(), t0, 1)) let some_entries = rt
.block_on(queue.iter(ctx.clone(), repo_id, t1, 1))
.expect("DateTime range iteration faield"); .expect("DateTime range iteration faield");
assert_eq!(some_entries.len(), 1); assert_eq!(some_entries.len(), 1);
let entries = rt.block_on(queue.iter(ctx.clone(), t0, 100)) let some_entries = rt
.block_on(queue.iter(ctx.clone(), repo_id, t0, 100))
.expect("DateTime range iteration faield");
assert_eq!(some_entries.len(), 1);
let entries = rt
.block_on(queue.iter(ctx.clone(), repo_id, t1, 100))
.expect("Iterating over entries failed"); .expect("Iterating over entries failed");
assert_eq!(entries.len(), 2); assert_eq!(entries.len(), 2);
@ -67,7 +73,8 @@ fn test_simple() {
.expect("Failed to remove entries"); .expect("Failed to remove entries");
// iter // iter
let entries = rt.block_on(queue.iter(ctx.clone(), t0, 100)) let entries = rt
.block_on(queue.iter(ctx.clone(), repo_id, t1, 100))
.expect("Iterating over entries failed"); .expect("Iterating over entries failed");
assert_eq!(entries.len(), 0) assert_eq!(entries.len(), 0)
} }

View File

@ -511,14 +511,18 @@ pub fn init_cachelib<'a>(matches: &ArgMatches<'a>) {
.unwrap(); .unwrap();
} }
fn find_repo_type<'a>(matches: &ArgMatches<'a>) -> Result<(String, RepoType)> { pub fn read_configs<'a>(matches: &ArgMatches<'a>) -> Result<RepoConfigs> {
let repo_id = get_repo_id(matches);
let config_path = matches let config_path = matches
.value_of("mononoke-config-path") .value_of("mononoke-config-path")
.expect("Mononoke config path must be specified"); .expect("Mononoke config path must be specified");
let configs = RepoConfigs::read_configs(config_path)?; RepoConfigs::read_configs(config_path)
}
fn find_repo_type<'a>(matches: &ArgMatches<'a>) -> Result<(String, RepoType)> {
let repo_id = get_repo_id(matches);
let configs = read_configs(matches)?;
let repo_config = configs let repo_config = configs
.repos .repos
.into_iter() .into_iter()

View File

@ -0,0 +1,99 @@
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
//! This dummy crate contains dummy implementation of traits that are being used only in the
//! --dry-run mode to test the healer
use blobstore::Blobstore;
use blobstore_sync_queue::{BlobstoreSyncQueue, BlobstoreSyncQueueEntry};
use context::CoreContext;
use failure::Error;
use futures::prelude::*;
use futures_ext::{BoxFuture, FutureExt};
use mononoke_types::{BlobstoreBytes, DateTime, RepositoryId};
use slog::Logger;
#[derive(Debug)]
pub struct DummyBlobstore<B> {
inner: B,
logger: Logger,
}
impl<B: Blobstore> DummyBlobstore<B> {
pub fn new(inner: B, logger: Logger) -> Self {
Self { inner, logger }
}
}
impl<B: Blobstore> Blobstore for DummyBlobstore<B> {
fn get(&self, ctx: CoreContext, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
self.inner.get(ctx, key)
}
fn put(&self, _ctx: CoreContext, key: String, value: BlobstoreBytes) -> BoxFuture<(), Error> {
info!(
self.logger,
"I would have written blob {} of size {}",
key,
value.len()
);
Ok(()).into_future().boxify()
}
fn is_present(&self, ctx: CoreContext, key: String) -> BoxFuture<bool, Error> {
self.inner.is_present(ctx, key)
}
fn assert_present(&self, ctx: CoreContext, key: String) -> BoxFuture<(), Error> {
self.inner.assert_present(ctx, key)
}
}
pub struct DummyBlobstoreSyncQueue<Q> {
inner: Q,
logger: Logger,
}
impl<Q: BlobstoreSyncQueue> DummyBlobstoreSyncQueue<Q> {
pub fn new(inner: Q, logger: Logger) -> Self {
Self { inner, logger }
}
}
impl<Q: BlobstoreSyncQueue> BlobstoreSyncQueue for DummyBlobstoreSyncQueue<Q> {
fn add(&self, _ctx: CoreContext, entry: BlobstoreSyncQueueEntry) -> BoxFuture<bool, Error> {
info!(self.logger, "I would have written {:#?}", entry);
Ok(false).into_future().boxify()
}
fn iter(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
older_than: DateTime,
limit: usize,
) -> BoxFuture<Vec<BlobstoreSyncQueueEntry>, Error> {
self.inner.iter(ctx, repo_id, older_than, limit)
}
fn del(
&self,
_ctx: CoreContext,
entries: Vec<BlobstoreSyncQueueEntry>,
) -> BoxFuture<(), Error> {
info!(self.logger, "I would have deleted {:#?}", entries);
Ok(()).into_future().boxify()
}
fn get(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
key: String,
) -> BoxFuture<Vec<BlobstoreSyncQueueEntry>, Error> {
self.inner.get(ctx, repo_id, key)
}
}

View File

@ -0,0 +1,269 @@
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use blobstore::Blobstore;
use blobstore_sync_queue::{BlobstoreSyncQueue, BlobstoreSyncQueueEntry};
use chrono::Duration as ChronoDuration;
use context::CoreContext;
use failure::{err_msg, prelude::*};
use futures::{
future::{join_all, loop_fn, Loop},
prelude::*,
};
use futures_ext::FutureExt;
use itertools::Itertools;
use metaconfig::BlobstoreId;
use mononoke_types::{BlobstoreBytes, DateTime, RepositoryId};
use rate_limiter::RateLimiter;
use slog::Logger;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
lazy_static! {
/// Minimal age of entry to consider if it has to be healed
static ref ENTRY_HEALING_MIN_AGE: ChronoDuration = ChronoDuration::minutes(2);
}
pub struct RepoHealer {
logger: Logger,
blobstore_sync_queue_limit: usize,
repo_id: RepositoryId,
rate_limiter: RateLimiter,
sync_queue: Arc<BlobstoreSyncQueue>,
blobstores: Arc<HashMap<BlobstoreId, Arc<Blobstore>>>,
}
impl RepoHealer {
pub fn new(
logger: Logger,
blobstore_sync_queue_limit: usize,
repo_id: RepositoryId,
rate_limiter: RateLimiter,
sync_queue: Arc<BlobstoreSyncQueue>,
blobstores: Arc<HashMap<BlobstoreId, Arc<Blobstore>>>,
) -> Self {
Self {
logger,
blobstore_sync_queue_limit,
repo_id,
rate_limiter,
sync_queue,
blobstores,
}
}
pub fn heal(&self, ctx: CoreContext) -> impl Future<Item = (), Error = Error> {
cloned!(
self.logger,
self.blobstore_sync_queue_limit,
self.repo_id,
self.rate_limiter,
self.sync_queue,
self.blobstores
);
let now = DateTime::now().into_chrono();
let healing_deadline = DateTime::new(now - *ENTRY_HEALING_MIN_AGE);
sync_queue
.iter(
ctx.clone(),
repo_id,
healing_deadline.clone(),
blobstore_sync_queue_limit,
)
.and_then(move |queue_entries| {
cloned!(rate_limiter);
let healing_futures: Vec<_> = queue_entries
.into_iter()
.group_by(|entry| entry.blobstore_key.clone())
.into_iter()
.filter_map(|(key, entries)| {
cloned!(ctx, sync_queue, blobstores, healing_deadline);
heal_blob(
ctx,
repo_id,
sync_queue,
blobstores,
healing_deadline,
key,
entries.collect(),
)
})
.map(move |healing_future| rate_limiter.execute(healing_future))
.collect();
info!(
logger,
"Found {} blobs to be healed... Doing it",
healing_futures.len()
);
join_all(healing_futures)
})
.map(|_| ())
}
}
fn heal_blob(
ctx: CoreContext,
repo_id: RepositoryId,
sync_queue: Arc<BlobstoreSyncQueue>,
blobstores: Arc<HashMap<BlobstoreId, Arc<Blobstore>>>,
healing_deadline: DateTime,
key: String,
entries: Vec<BlobstoreSyncQueueEntry>,
) -> Option<impl Future<Item = (), Error = Error>> {
let seen_blobstores: HashSet<_> = entries
.iter()
.filter_map(|entry| {
let id = entry.blobstore_id.clone();
if blobstores.contains_key(&id) {
Some(id)
} else {
None
}
})
.collect();
let missing_blobstores: HashSet<_> = blobstores
.iter()
.filter_map(|(key, _)| {
if seen_blobstores.contains(key) {
None
} else {
Some(key.clone())
}
})
.collect();
if missing_blobstores.is_empty() {
// All blobstores have been synchronized
return Some(cleanup_after_healing(ctx, sync_queue, entries).left_future());
}
if !entries
.iter()
.any(|entry| entry.timestamp < healing_deadline)
{
// The oldes entry is not old enough to be eligible for healing
return None;
}
let heal_future = fetch_blob(
ctx.clone(),
blobstores.clone(),
key.clone(),
seen_blobstores,
)
.and_then(move |blob| {
let heal_blobstores: Vec<_> = missing_blobstores
.into_iter()
.map(|bid| {
let blobstore = blobstores
.get(&bid)
.expect("missing_blobstores contains only existing blobstores");
blobstore
.put(ctx.clone(), key.clone(), blob.clone())
.then(move |result| Ok((bid, result.is_ok())))
})
.collect();
join_all(heal_blobstores).and_then(move |heal_results| {
if heal_results.iter().all(|(_, result)| *result) {
cleanup_after_healing(ctx, sync_queue, entries).left_future()
} else {
let healed_blobstores =
heal_results
.into_iter()
.filter_map(|(id, result)| if result { Some(id) } else { None });
report_partial_heal(ctx, repo_id, sync_queue, key, healed_blobstores).right_future()
}
})
});
Some(heal_future.right_future())
}
fn fetch_blob(
ctx: CoreContext,
blobstores: Arc<HashMap<BlobstoreId, Arc<Blobstore>>>,
key: String,
seen_blobstores: HashSet<BlobstoreId>,
) -> impl Future<Item = BlobstoreBytes, Error = Error> {
let blobstores_to_fetch: Vec<_> = seen_blobstores.iter().cloned().collect();
let err_context = format!(
"While fetching blob '{}', seen in blobstores: {:?}",
key, seen_blobstores
);
loop_fn(blobstores_to_fetch, move |mut blobstores_to_fetch| {
let bid = match blobstores_to_fetch.pop() {
None => {
return Err(err_msg("None of the blobstores to fetch responded"))
.into_future()
.left_future();
}
Some(bid) => bid,
};
let blobstore = blobstores
.get(&bid)
.expect("blobstores_to_fetch contains only existing blobstores");
blobstore
.get(ctx.clone(), key.clone())
.then(move |result| match result {
Err(_) => return Ok(Loop::Continue(blobstores_to_fetch)),
Ok(None) => {
return Err(format_err!(
"Blobstore {:?} retruned None even though it should contain data",
bid
));
}
Ok(Some(blob)) => Ok(Loop::Break(blob)),
})
.right_future()
})
.chain_err(err_context)
.from_err()
}
fn cleanup_after_healing(
ctx: CoreContext,
sync_queue: Arc<BlobstoreSyncQueue>,
entries: Vec<BlobstoreSyncQueueEntry>,
) -> impl Future<Item = (), Error = Error> {
sync_queue.del(ctx, entries)
}
fn report_partial_heal(
ctx: CoreContext,
repo_id: RepositoryId,
sync_queue: Arc<BlobstoreSyncQueue>,
blobstore_key: String,
healed_blobstores: impl IntoIterator<Item = BlobstoreId>,
) -> impl Future<Item = (), Error = Error> {
let timestamp = DateTime::now();
join_all(healed_blobstores.into_iter().map({
move |blobstore_id| {
cloned!(ctx, repo_id, blobstore_key, timestamp);
sync_queue.add(
ctx,
BlobstoreSyncQueueEntry {
repo_id,
blobstore_key,
blobstore_id,
timestamp,
id: None,
},
)
}
}))
.map(|_| ())
}

View File

@ -0,0 +1,259 @@
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
#![deny(warnings)]
#![feature(never_type)]
extern crate blobstore;
extern crate blobstore_sync_queue;
extern crate chrono;
extern crate clap;
#[macro_use]
extern crate cloned;
extern crate cmdlib;
extern crate context;
#[macro_use]
extern crate failure_ext as failure;
extern crate futures;
extern crate futures_ext;
extern crate glusterblob;
extern crate itertools;
#[macro_use]
extern crate lazy_static;
extern crate manifoldblob;
extern crate mercurial_types;
extern crate metaconfig;
extern crate mononoke_types;
#[macro_use]
extern crate slog;
extern crate sqlblob;
extern crate tokio;
extern crate tokio_timer;
mod dummy;
mod healer;
mod rate_limiter;
use blobstore::{Blobstore, PrefixBlobstore};
use blobstore_sync_queue::{BlobstoreSyncQueue, SqlBlobstoreSyncQueue, SqlConstructors};
use clap::{value_t, App};
use cmdlib::args;
use context::CoreContext;
use dummy::{DummyBlobstore, DummyBlobstoreSyncQueue};
use failure::{err_msg, prelude::*};
use futures::{
future::{join_all, loop_fn, ok, Loop},
prelude::*,
};
use futures_ext::{spawn_future, BoxFuture, FutureExt};
use glusterblob::Glusterblob;
use healer::RepoHealer;
use manifoldblob::ThriftManifoldBlob;
use metaconfig::{RemoteBlobstoreArgs, RepoConfig, RepoType};
use mononoke_types::RepositoryId;
use rate_limiter::RateLimiter;
use slog::Logger;
use sqlblob::Sqlblob;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_timer::Delay;
const MIN_HEALER_ITERATION_DELAY: Duration = Duration::from_secs(60);
fn maybe_schedule_healer_for_repo(
dry_run: bool,
blobstore_sync_queue_limit: usize,
logger: Logger,
rate_limiter: RateLimiter,
config: RepoConfig,
myrouter_port: u16,
) -> Result<BoxFuture<(), Error>> {
ensure_msg!(config.enabled, "Repo is disabled");
let (db_address, blobstores_args) = match config.repotype {
RepoType::BlobRemote {
ref db_address,
blobstores_args: RemoteBlobstoreArgs::Multiplexed(ref blobstores),
..
} => (db_address.clone(), blobstores.clone()),
_ => bail_msg!("Repo doesn't use Multiplexed blobstore"),
};
let blobstores = {
let mut blobstores = HashMap::new();
for (id, args) in blobstores_args.into_iter() {
match args {
RemoteBlobstoreArgs::Manifold(args) => {
let blobstore = ThriftManifoldBlob::new(args.bucket)
.chain_err("While opening ThriftManifoldBlob")?;
let blobstore =
PrefixBlobstore::new(blobstore, format!("flat/{}", args.prefix));
let blobstore: Arc<Blobstore> = Arc::new(blobstore);
blobstores.insert(id, ok(blobstore).boxify());
}
RemoteBlobstoreArgs::Gluster(args) => {
let blobstore = Glusterblob::with_smc(args.tier, args.export, args.basepath)
.map(|blobstore| -> Arc<Blobstore> { Arc::new(blobstore) })
.boxify();
blobstores.insert(id, blobstore);
}
RemoteBlobstoreArgs::Mysql(args) => {
let blobstore: Arc<Blobstore> = Arc::new(Sqlblob::with_myrouter(
RepositoryId::new(config.repoid),
args.shardmap,
myrouter_port,
args.shard_num,
));
blobstores.insert(id, ok(blobstore).boxify());
}
RemoteBlobstoreArgs::Multiplexed(_) => {
bail_msg!("Unsupported nested Multiplexed blobstore")
}
}
}
if !dry_run {
blobstores
} else {
blobstores
.into_iter()
.map(|(id, blobstore)| {
let logger = logger.new(o!("blobstore" => format!("{:?}", id)));
let blobstore = blobstore
.map(move |blobstore| -> Arc<Blobstore> {
Arc::new(DummyBlobstore::new(blobstore, logger))
})
.boxify();
(id, blobstore)
})
.collect()
}
};
let blobstores = join_all(
blobstores
.into_iter()
.map(|(key, value)| value.map(move |value| (key, value))),
)
.map(|blobstores| blobstores.into_iter().collect::<HashMap<_, _>>());
let sync_queue: Arc<BlobstoreSyncQueue> = {
let sync_queue = SqlBlobstoreSyncQueue::with_myrouter(db_address, myrouter_port);
if !dry_run {
Arc::new(sync_queue)
} else {
let logger = logger.new(o!("sync_queue" => ""));
Arc::new(DummyBlobstoreSyncQueue::new(sync_queue, logger))
}
};
Ok(blobstores
.and_then(move |blobstores| {
let repo_healer = RepoHealer::new(
logger,
blobstore_sync_queue_limit,
RepositoryId::new(config.repoid),
rate_limiter,
sync_queue,
Arc::new(blobstores),
);
if dry_run {
// TODO(luk) use a proper context here and put the logger inside of it
let ctx = CoreContext::test_mock();
repo_healer.heal(ctx).boxify()
} else {
schedule_everlasting_healing(repo_healer)
}
})
.boxify())
}
fn schedule_everlasting_healing(repo_healer: RepoHealer) -> BoxFuture<(), Error> {
let fut = loop_fn((), move |()| {
let start = Instant::now();
// TODO(luk) use a proper context here and put the logger inside of it
let ctx = CoreContext::test_mock();
repo_healer.heal(ctx).and_then(move |()| {
let next_iter_deadline = start + MIN_HEALER_ITERATION_DELAY;
Delay::new(next_iter_deadline)
.map(|()| Loop::Continue(()))
.from_err()
})
});
spawn_future(fut).boxify()
}
fn setup_app<'a, 'b>() -> App<'a, 'b> {
let app = args::MononokeApp {
safe_writes: true,
hide_advanced_args: false,
local_instances: true,
default_glog: true,
};
app.build("blobstore healer job")
.version("0.0.0")
.about("Monitors blobstore_sync_queue to heal blobstores with missing data")
.args_from_usage(
r#"
--sync-queue-limit=[LIMIT] 'set limit for how many queue entries to process'
--dry-run 'performs a single healing and prints what would it do without doing it'
"#,
)
}
fn main() -> Result<()> {
let matches = setup_app().get_matches();
let logger = args::get_logger(&matches);
let myrouter_port =
args::parse_myrouter_port(&matches).ok_or(err_msg("Missing --myrouter-port"))?;
let rate_limiter = RateLimiter::new(100);
let repo_configs = args::read_configs(&matches)?;
let blobstore_sync_queue_limit = value_t!(matches, "sync-queue-limit", usize).unwrap_or(10000);
let dry_run = matches.is_present("dry-run");
let healers: Vec<_> = repo_configs
.repos
.into_iter()
.filter_map(move |(name, config)| {
let logger = logger.new(o!(
"repo" => format!("{} ({})", name, config.repoid),
));
let scheduled = maybe_schedule_healer_for_repo(
dry_run,
blobstore_sync_queue_limit,
logger.clone(),
rate_limiter.clone(),
config,
myrouter_port,
);
match scheduled {
Err(err) => {
error!(logger, "Did not schedule, because of: {:#?}", err);
None
}
Ok(scheduled) => {
info!(logger, "Successfully scheduled");
Some(scheduled)
}
}
})
.collect();
let mut runtime = tokio::runtime::Runtime::new()?;
let result = runtime.block_on(join_all(healers).map(|_| ()));
runtime.shutdown_on_idle();
result
}

View File

@ -0,0 +1,81 @@
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use failure::Error;
use futures::{
future::{self, lazy, IntoFuture},
prelude::*,
sync::{mpsc, oneshot},
};
use futures_ext::{spawn_future, BoxFuture, FutureExt};
use tokio;
#[derive(Clone)]
pub struct RateLimiter {
ensure_worker_scheduled: future::Shared<BoxFuture<(), ()>>,
worker: mpsc::UnboundedSender<BoxFuture<(), Error>>,
}
impl RateLimiter {
pub fn new(max_num_of_concurrent_futures: usize) -> Self {
let (send, recv) = mpsc::unbounded();
let ensure_worker_scheduled = lazy(move || {
tokio::spawn(
recv.then(move |work| {
let work = match work {
Ok(work) => work,
Err(()) => Ok(()).into_future().boxify(),
};
Ok(spawn_future(work).then(|_| Ok(())))
})
.buffer_unordered(max_num_of_concurrent_futures)
.for_each(|()| Ok(()))
.then(|_: Result<(), !>| -> Result<(), ()> {
// The Err is !, this code is to guarantee that a Worker will never stop
Ok(())
}),
);
Ok(())
})
.boxify()
.shared();
Self {
ensure_worker_scheduled,
worker: send,
}
}
pub fn execute<F, I, E>(&self, work: F) -> BoxFuture<I, E>
where
F: Future<Item = I, Error = E> + Send + 'static,
I: Send + 'static,
E: From<::futures::Canceled> + Send + 'static,
{
cloned!(self.worker);
self.ensure_worker_scheduled
.clone()
.then(move |scheduling_result| {
scheduling_result.expect("The scheduling cannot fail");
let (send, recv) = oneshot::channel();
worker
.unbounded_send(
work.then(move |result| {
let _ = send.send(result);
Ok(())
})
.boxify(),
)
.expect("This send should never fail since the receiver is always alive");
recv.from_err().and_then(|result| result)
})
.boxify()
}
}

View File

@ -37,7 +37,7 @@ pub mod repoconfig;
pub use repoconfig::{ pub use repoconfig::{
BlobstoreId, CacheWarmupParams, LfsParams, ManifoldArgs, MysqlBlobstoreArgs, PushrebaseParams, BlobstoreId, CacheWarmupParams, LfsParams, ManifoldArgs, MysqlBlobstoreArgs, PushrebaseParams,
RemoteBlobstoreArgs, RepoConfigs, RepoReadOnly, RepoType, RemoteBlobstoreArgs, RepoConfig, RepoConfigs, RepoReadOnly, RepoType,
}; };
pub use errors::{Error, ErrorKind}; pub use errors::{Error, ErrorKind};