mononoke/multiplexedblob: make sampling rate configurable

Summary: This will let us lower Scuba utilization from Fastreplay.

Reviewed By: HarveyHunt

Differential Revision: D19766018

fbshipit-source-id: 4eac19b929914db910ed13096b2a5910c134ed3a
This commit is contained in:
Thomas Orozco 2020-02-06 12:06:26 -08:00 committed by Facebook Github Bot
parent c992310114
commit 8d2b2f7af2
8 changed files with 121 additions and 107 deletions

View File

@ -6,6 +6,7 @@
* directory of this source tree.
*/
use std::num::NonZeroU64;
use std::{path::PathBuf, sync::Arc};
use anyhow::{format_err, Error};
@ -396,12 +397,14 @@ fn make_blobstore_impl(
.boxify(),
Multiplexed {
scuba_table,
scuba_sample_rate,
blobstores,
} => {
has_components = true;
make_blobstore_multiplexed(
fb,
scuba_table,
*scuba_sample_rate,
blobstores,
sql_factory,
mysql_options,
@ -412,6 +415,7 @@ fn make_blobstore_impl(
}
Scrub {
scuba_table,
scuba_sample_rate,
blobstores,
scrub_action,
} => {
@ -419,6 +423,7 @@ fn make_blobstore_impl(
make_blobstore_multiplexed(
fb,
scuba_table,
*scuba_sample_rate,
blobstores,
sql_factory,
mysql_options,
@ -493,6 +498,7 @@ fn make_blobstore_impl(
pub fn make_blobstore_multiplexed(
fb: FacebookInit,
scuba_table: &Option<String>,
scuba_sample_rate: NonZeroU64,
inner_config: &[(BlobstoreId, BlobConfig)],
sql_factory: Option<&SqlFactory>,
mysql_options: MysqlOptions,
@ -558,6 +564,7 @@ pub fn make_blobstore_multiplexed(
scuba_table.map_or(ScubaSampleBuilder::with_discard(), |table| {
ScubaSampleBuilder::new(fb, table)
}),
scuba_sample_rate,
scrub_handler,
scrub_action,
))
@ -568,6 +575,7 @@ pub fn make_blobstore_multiplexed(
scuba_table.map_or(ScubaSampleBuilder::with_discard(), |table| {
ScubaSampleBuilder::new(fb, table)
}),
scuba_sample_rate,
)) as Arc<dyn Blobstore>,
}
})

View File

@ -16,10 +16,10 @@ use futures_stats::Timed;
use itertools::{Either, Itertools};
use metaconfig_types::BlobstoreId;
use mononoke_types::BlobstoreBytes;
use rand::{thread_rng, Rng};
use scuba::ScubaSampleBuilder;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::num::NonZeroU64;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
@ -38,34 +38,6 @@ type BlobstoresWithEntry = HashSet<BlobstoreId>;
type BlobstoresReturnedNone = HashSet<BlobstoreId>;
type BlobstoresReturnedError = HashMap<BlobstoreId, Error>;
#[derive(Copy, Clone)]
enum Sampling {
Log(i32),
DoNotLog,
}
impl Sampling {
fn is_logged(&self) -> bool {
match self {
Self::Log(_) => true,
Self::DoNotLog => false,
}
}
fn roll() -> Self {
const SAMPLE_RATE: i32 = 100;
const SAMPLING_THRESHOLD: f32 = 1.0 - (1.0 / (SAMPLE_RATE as f32));
let should_log = thread_rng().gen::<f32>() > SAMPLING_THRESHOLD;
if should_log {
Self::Log(SAMPLE_RATE)
} else {
Self::DoNotLog
}
}
}
#[derive(Error, Debug, Clone)]
pub enum ErrorKind {
#[error("Some blobstores failed, and other returned None: {0:?}")]
@ -98,6 +70,7 @@ pub struct MultiplexedBlobstoreBase {
blobstores: Arc<[(BlobstoreId, Arc<dyn Blobstore>)]>,
handler: Arc<dyn MultiplexedBlobstorePutHandler>,
scuba: ScubaSampleBuilder,
scuba_sample_rate: NonZeroU64,
}
impl MultiplexedBlobstoreBase {
@ -105,6 +78,7 @@ impl MultiplexedBlobstoreBase {
blobstores: Vec<(BlobstoreId, Arc<dyn Blobstore>)>,
handler: Arc<dyn MultiplexedBlobstorePutHandler>,
mut scuba: ScubaSampleBuilder,
scuba_sample_rate: NonZeroU64,
) -> Self {
scuba.add_common_server_data();
@ -112,88 +86,19 @@ impl MultiplexedBlobstoreBase {
blobstores: blobstores.into(),
handler,
scuba,
scuba_sample_rate,
}
}
fn get_from_all(
&self,
ctx: &CoreContext,
key: &String,
operation: &'static str,
sampling: Sampling,
) -> Vec<BoxFuture<(BlobstoreId, Option<BlobstoreBytes>), (BlobstoreId, Error)>> {
let scuba = match sampling {
Sampling::Log(sample_rate) => {
let mut scuba = self.scuba.clone();
scuba.add("sample_rate", sample_rate);
Some(scuba)
}
Sampling::DoNotLog => None,
};
self.blobstores
.iter()
.map(|&(blobstore_id, ref blobstore)| {
blobstore
.get(ctx.clone(), key.clone())
.map({
cloned!(blobstore_id);
move |val| (blobstore_id, val)
})
.timeout(REQUEST_TIMEOUT)
.map_err({
cloned!(blobstore_id);
move |error| (blobstore_id, remap_timeout_error(error))
})
.timed({
cloned!(key, scuba);
let session = ctx.session_id().clone();
move |stats, result| {
let mut scuba = match scuba {
Some(scuba) => scuba,
None => {
return future::ok(());
}
};
scuba
.add("key", key.clone())
.add("operation", operation)
.add("blobstore_id", blobstore_id)
.add(
"completion_time",
stats.completion_time.as_micros_unchecked(),
);
// log session id only for slow requests
if stats.completion_time >= SLOW_REQUEST_THRESHOLD {
scuba.add("session", session.to_string());
}
match result {
Ok((_, Some(data))) => {
scuba.add("size", data.len());
}
Err((_, error)) => {
scuba.add("error", error.to_string());
}
Ok((_, None)) => {}
}
scuba.log();
future::ok(())
}
})
})
.collect()
}
pub fn scrub_get(
&self,
ctx: CoreContext,
key: String,
) -> BoxFuture<Option<BlobstoreBytes>, ErrorKind> {
let requests = self
.get_from_all(&ctx, &key, "scrub_get", Sampling::roll())
let mut scuba = self.scuba.clone();
scuba.sampled(self.scuba_sample_rate);
let requests = multiplexed_get(&ctx, self.blobstores.as_ref(), &key, "scrub_get", scuba)
.into_iter()
.map(|f| f.then(|r| Ok(r)));
@ -302,9 +207,12 @@ impl Blobstore for MultiplexedBlobstoreBase {
ctx.perf_counters()
.increment_counter(PerfCounterType::BlobGets);
let sampling = Sampling::roll();
let mut scuba = self.scuba.clone();
scuba.sampled(self.scuba_sample_rate);
let requests = self.get_from_all(&ctx, &key, "get", sampling);
let is_logged = scuba.sampling().is_logged();
let requests = multiplexed_get(&ctx, self.blobstores.as_ref(), &key, "get", scuba);
let state = (
requests, // pending requests
HashMap::<BlobstoreId, Error>::new(), // previous errors
@ -315,7 +223,7 @@ impl Blobstore for MultiplexedBlobstoreBase {
move |result| {
let requests = match result {
Ok(((_, value @ Some(_)), _, requests)) => {
if sampling.is_logged() {
if is_logged {
// Allow the other requests to complete so that we can record some
// metrics for the blobstore.
let requests_fut = future::join_all(
@ -457,6 +365,64 @@ impl fmt::Debug for MultiplexedBlobstoreBase {
}
}
fn multiplexed_get(
ctx: &CoreContext,
blobstores: &[(BlobstoreId, Arc<dyn Blobstore>)],
key: &String,
operation: &'static str,
scuba: ScubaSampleBuilder,
) -> Vec<BoxFuture<(BlobstoreId, Option<BlobstoreBytes>), (BlobstoreId, Error)>> {
blobstores
.iter()
.map(|&(blobstore_id, ref blobstore)| {
blobstore
.get(ctx.clone(), key.clone())
.map({
cloned!(blobstore_id);
move |val| (blobstore_id, val)
})
.timeout(REQUEST_TIMEOUT)
.map_err({
cloned!(blobstore_id);
move |error| (blobstore_id, remap_timeout_error(error))
})
.timed({
cloned!(key, mut scuba);
let session = ctx.session_id().clone();
move |stats, result| {
scuba
.add("key", key.clone())
.add("operation", operation)
.add("blobstore_id", blobstore_id)
.add(
"completion_time",
stats.completion_time.as_micros_unchecked(),
);
// log session id only for slow requests
if stats.completion_time >= SLOW_REQUEST_THRESHOLD {
scuba.add("session", session.to_string());
}
match result {
Ok((_, Some(data))) => {
scuba.add("size", data.len());
}
Err((_, error)) => {
// Always log errors
scuba.unsampled();
scuba.add("error", error.to_string());
}
Ok((_, None)) => {}
}
scuba.log();
future::ok(())
}
})
})
.collect()
}
fn multiplexed_put<F: Future<Item = BlobstoreId, Error = Error> + Send + 'static>(
ctx: CoreContext,
handler: Arc<dyn MultiplexedBlobstorePutHandler>,

View File

@ -18,6 +18,7 @@ use metaconfig_types::BlobstoreId;
use mononoke_types::{BlobstoreBytes, DateTime};
use scuba::ScubaSampleBuilder;
use std::fmt;
use std::num::NonZeroU64;
use std::sync::Arc;
#[derive(Clone)]
@ -31,6 +32,7 @@ impl MultiplexedBlobstore {
blobstores: Vec<(BlobstoreId, Arc<dyn Blobstore>)>,
queue: Arc<dyn BlobstoreSyncQueue>,
scuba: ScubaSampleBuilder,
scuba_sample_rate: NonZeroU64,
) -> Self {
let put_handler = Arc::new(QueueBlobstorePutHandler {
queue: queue.clone(),
@ -40,6 +42,7 @@ impl MultiplexedBlobstore {
blobstores,
put_handler,
scuba,
scuba_sample_rate,
)),
queue,
}

View File

@ -22,6 +22,7 @@ use scuba::ScubaSampleBuilder;
use slog::{info, warn};
use std::collections::HashMap;
use std::fmt;
use std::num::NonZeroU64;
use std::sync::{atomic::AtomicUsize, Arc};
pub trait ScrubHandler: Send + Sync {
@ -78,10 +79,16 @@ impl ScrubBlobstore {
blobstores: Vec<(BlobstoreId, Arc<dyn Blobstore>)>,
queue: Arc<dyn BlobstoreSyncQueue>,
scuba: ScubaSampleBuilder,
scuba_sample_rate: NonZeroU64,
scrub_handler: Arc<dyn ScrubHandler>,
scrub_action: ScrubAction,
) -> Self {
let inner = MultiplexedBlobstore::new(blobstores.clone(), queue.clone(), scuba.clone());
let inner = MultiplexedBlobstore::new(
blobstores.clone(),
queue.clone(),
scuba.clone(),
scuba_sample_rate,
);
Self {
inner,
scrub_handler,

View File

@ -26,6 +26,7 @@ use futures_ext::{BoxFuture, FutureExt};
use lock_ext::LockExt;
use metaconfig_types::{BlobstoreId, ScrubAction};
use mononoke_types::BlobstoreBytes;
use nonzero_ext::nonzero;
use scuba::ScubaSampleBuilder;
pub struct Tickable<T> {
@ -154,6 +155,7 @@ fn base(fb: FacebookInit) {
],
log.clone(),
ScubaSampleBuilder::with_discard(),
nonzero!(1u64),
);
let ctx = CoreContext::test_mock(fb);
@ -283,6 +285,7 @@ fn multiplexed(fb: FacebookInit) {
vec![(bid0, bs0.clone()), (bid1, bs1.clone())],
queue.clone(),
ScubaSampleBuilder::with_discard(),
nonzero!(1u64),
);
// non-existing key when one blobstore failing
@ -354,6 +357,7 @@ fn scrubbed(fb: FacebookInit) {
vec![(bid0, bs0.clone()), (bid1, bs1.clone())],
queue.clone(),
ScubaSampleBuilder::with_discard(),
nonzero!(1u64),
scrub_handler.clone(),
ScrubAction::ReportOnly,
);
@ -422,6 +426,7 @@ fn scrubbed(fb: FacebookInit) {
vec![(bid0, bs0.clone()), (bid1, bs1.clone())],
queue.clone(),
ScubaSampleBuilder::with_discard(),
nonzero!(1u64),
scrub_handler,
ScrubAction::Repair,
);
@ -510,6 +515,7 @@ fn queue_waits(fb: FacebookInit) {
],
log.clone(),
ScubaSampleBuilder::with_discard(),
nonzero!(1u64),
);
let ctx = CoreContext::test_mock(fb);

View File

@ -950,6 +950,7 @@ mod test {
BlobConfig, BlobstoreId, FilestoreParams, MetadataDBConfig, ShardedFilenodesParams,
SourceControlServiceMonitoring,
};
use nonzero_ext::nonzero;
use pretty_assertions::assert_eq;
use std::fs::{create_dir_all, write};
use std::num::NonZeroUsize;
@ -1459,6 +1460,7 @@ mod test {
let multiplex = BlobConfig::Multiplexed {
scuba_table: Some("blobstore_scuba_table".to_string()),
scuba_sample_rate: nonzero!(100u64),
blobstores: vec![
(
BlobstoreId::new(0),
@ -1887,6 +1889,7 @@ mod test {
storage_config: StorageConfig {
blobstore: BlobConfig::Multiplexed {
scuba_table: None,
scuba_sample_rate: nonzero!(100u64),
blobstores: vec![
(BlobstoreId::new(1), BlobConfig::Files {
path: "/tmp/foo".into()

View File

@ -17,6 +17,7 @@ use std::{
collections::HashMap,
convert::{TryFrom, TryInto},
fmt, mem,
num::NonZeroU64,
num::NonZeroUsize,
path::PathBuf,
str,
@ -28,6 +29,7 @@ use std::{
use ascii::AsciiString;
use bookmarks_types::BookmarkName;
use mononoke_types::{MPath, RepositoryId};
use nonzero_ext::nonzero;
use regex::Regex;
use repos::{
RawBlobstoreConfig, RawDbConfig, RawFilestoreParams, RawShardedFilenodesParams,
@ -625,6 +627,8 @@ pub enum BlobConfig {
scuba_table: Option<String>,
/// Set of blobstores being multiplexed over
blobstores: Vec<(BlobstoreId, BlobConfig)>,
/// 1 in scuba_sample_rate samples will be logged.
scuba_sample_rate: NonZeroU64,
},
/// Multiplex across multiple blobstores scrubbing for errors
Scrub {
@ -634,6 +638,8 @@ pub enum BlobConfig {
blobstores: Vec<(BlobstoreId, BlobConfig)>,
/// Whether to attempt repair
scrub_action: ScrubAction,
/// 1 in scuba_sample_rate samples will be logged.
scuba_sample_rate: NonZeroU64,
},
/// Store in a manifold bucket, but every object will have an expiration
ManifoldWithTtl {
@ -670,6 +676,7 @@ impl BlobConfig {
if let Multiplexed {
scuba_table,
scuba_sample_rate,
blobstores,
} = self
{
@ -680,6 +687,7 @@ impl BlobConfig {
}
*self = Scrub {
scuba_table,
scuba_sample_rate: *scuba_sample_rate,
blobstores,
scrub_action,
};
@ -720,6 +728,15 @@ impl TryFrom<&'_ RawBlobstoreConfig> for BlobConfig {
},
RawBlobstoreConfig::multiplexed(def) => BlobConfig::Multiplexed {
scuba_table: def.scuba_table.clone(),
scuba_sample_rate: def
.scuba_sample_rate
.map(|rate| {
NonZeroU64::new(rate.try_into()?).ok_or(anyhow!(
"scuba_sample_rate must be an integer larger than zero"
))
})
.transpose()?
.unwrap_or(nonzero!(100_u64)),
blobstores: def
.components
.iter()

View File

@ -176,6 +176,7 @@ pub async fn open_blobstore(
Some(scrub_handler),
BlobConfig::Scrub {
scuba_table,
scuba_sample_rate,
blobstores,
scrub_action,
},
@ -192,6 +193,7 @@ pub async fn open_blobstore(
make_blobstore_multiplexed(
fb,
&scuba_table,
scuba_sample_rate,
&blobstores,
Some(&sql_factory),
mysql_options,
@ -204,11 +206,13 @@ pub async fn open_blobstore(
None,
BlobConfig::Multiplexed {
scuba_table,
scuba_sample_rate,
blobstores,
},
) => make_blobstore_multiplexed(
fb,
&scuba_table,
scuba_sample_rate,
&blobstores,
Some(&sql_factory),
mysql_options,