mononoke: Sample multiplexed blobstore get() and put() logging to scuba

Summary:
We don't want to log too much, we're only interested in a general
trend (i.e. is this blobstore returning loads of errors) rather than specific
blobstore operations.

Only log 1/100 requests, although we can tweak this in the future if required.

Also, fix up a typo in an error message.

Reviewed By: StanislavGlebik

Differential Revision: D14022740

fbshipit-source-id: e62e97fb4e4dc551a3249318092ee55937e7b4e2
This commit is contained in:
Harvey Hunt 2019-02-12 04:15:04 -08:00 committed by Facebook Github Bot
parent d8dcf75343
commit 591824ea9f

View File

@ -4,6 +4,7 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use rand::{thread_rng, Rng};
use std::collections::HashMap;
use std::env;
use std::fmt;
@ -29,6 +30,7 @@ use metaconfig_types::BlobstoreId;
use mononoke_types::BlobstoreBytes;
const SLOW_REQUEST_THRESHOLD: Duration = Duration::from_secs(5);
const SAMPLING_THRESHOLD: f32 = 1.0 - (1.0 / 100.0);
lazy_static! {
static ref TW_STATS: Vec<(&'static str, String)> = {
@ -54,7 +56,7 @@ lazy_static! {
pub enum ErrorKind {
#[fail(display = "Some blobstores failed, and other returned None: {:?}", _0)]
SomeFailedOthersNone(Arc<HashMap<BlobstoreId, Error>>),
#[fail(display = "All blobstores faield: {:?}", _0)]
#[fail(display = "All blobstores failed: {:?}", _0)]
AllFailed(Arc<HashMap<BlobstoreId, Error>>),
}
@ -92,6 +94,7 @@ impl MultiplexedBlobstoreBase {
impl Blobstore for MultiplexedBlobstoreBase {
fn get(&self, ctx: CoreContext, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
let should_log = thread_rng().gen::<f32>() > SAMPLING_THRESHOLD;
let requests = self
.blobstores
.iter()
@ -108,27 +111,31 @@ impl Blobstore for MultiplexedBlobstoreBase {
move |stats, result| {
// It won't log slow failed blobstores and because of that the
// results might be skewed.
if let Ok(Some(data)) = result {
if let Some(scuba_logger) = scuba_logger {
let mut sample = ScubaSample::new();
sample
.add("operation", "get")
.add("blobstore_id", blobstore_id)
.add("size", data.len())
.add(
"completion_time",
stats.completion_time.as_micros_unchecked(),
);
for (key, value) in TW_STATS.iter() {
sample.add(*key, value.clone());
}
// logging session uuid only for slow requests
if stats.completion_time >= SLOW_REQUEST_THRESHOLD {
sample.add("session", session.to_string());
}
scuba_logger.log(&sample);
}
if !should_log {
return future::ok(());
}
if let (Ok(Some(data)), Some(ref scuba_logger)) = (result, scuba_logger)
{
let mut sample = ScubaSample::new();
sample
.add("operation", "get")
.add("blobstore_id", blobstore_id)
.add("size", data.len())
.add(
"completion_time",
stats.completion_time.as_micros_unchecked(),
);
for (key, value) in TW_STATS.iter() {
sample.add(*key, value.clone());
}
// logging session uuid only for slow requests
if stats.completion_time >= SLOW_REQUEST_THRESHOLD {
sample.add("session", session.to_string());
}
scuba_logger.log(&sample);
}
future::ok(())
}
})
@ -173,6 +180,8 @@ impl Blobstore for MultiplexedBlobstoreBase {
fn put(&self, ctx: CoreContext, key: String, value: BlobstoreBytes) -> BoxFuture<(), Error> {
let size = value.len();
let write_order = Arc::new(AtomicUsize::new(0));
let should_log = thread_rng().gen::<f32>() > SAMPLING_THRESHOLD;
let requests = self.blobstores.iter().map(|(blobstore_id, blobstore)| {
blobstore
.put(ctx.clone(), key.clone(), value.clone())
@ -184,29 +193,33 @@ impl Blobstore for MultiplexedBlobstoreBase {
let session = ctx.session().clone();
cloned!(blobstore_id, write_order, size, self.scuba_logger);
move |stats, result| {
if let Some(scuba_logger) = scuba_logger {
let mut sample = ScubaSample::new();
sample
.add("operation", "put")
.add("blobstore_id", blobstore_id)
.add("size", size)
.add(
"completion_time",
stats.completion_time.as_micros_unchecked(),
);
match result {
Ok(_) => sample
.add("write_order", write_order.fetch_add(1, Ordering::SeqCst)),
Err(error) => sample.add("error", error.to_string()),
};
for (key, value) in TW_STATS.iter() {
sample.add(*key, value.clone());
if should_log {
if let Some(scuba_logger) = scuba_logger {
let mut sample = ScubaSample::new();
sample
.add("operation", "put")
.add("blobstore_id", blobstore_id)
.add("size", size)
.add(
"completion_time",
stats.completion_time.as_micros_unchecked(),
);
match result {
Ok(_) => sample.add(
"write_order",
write_order.fetch_add(1, Ordering::SeqCst),
),
Err(error) => sample.add("error", error.to_string()),
};
for (key, value) in TW_STATS.iter() {
sample.add(*key, value.clone());
}
// logging session uuid only for slow requests
if stats.completion_time >= SLOW_REQUEST_THRESHOLD {
sample.add("session", session.to_string());
}
scuba_logger.log(&sample);
}
// logging session uuid only for slow requests
if stats.completion_time >= SLOW_REQUEST_THRESHOLD {
sample.add("session", session.to_string());
}
scuba_logger.log(&sample);
}
future::ok(())
}