mononoke: Add a multiplex ID to blobstore configuration

Summary:
In order to uniquely identify a blobstore multiplexer configuration,
add an ID.

Reviewed By: krallin

Differential Revision: D19770058

fbshipit-source-id: 8e09d5531d1d27b337cf62a6126f88ce15de341b
This commit is contained in:
Harvey Hunt 2020-02-07 07:42:16 -08:00 committed by Facebook Github Bot
parent 0a2fd56bba
commit 6a0522aefa
10 changed files with 91 additions and 8 deletions

View File

@ -27,7 +27,8 @@ use fileblob::Fileblob;
use itertools::Either;
use manifoldblob::ThriftManifoldBlob;
use metaconfig_types::{
self, BlobConfig, BlobstoreId, MetadataDBConfig, ScrubAction, ShardedFilenodesParams,
self, BlobConfig, BlobstoreId, MetadataDBConfig, MultiplexId, ScrubAction,
ShardedFilenodesParams,
};
use multiplexedblob::{LoggingScrubHandler, MultiplexedBlobstore, ScrubBlobstore, ScrubHandler};
use prefixblob::PrefixBlobstore;
@ -396,6 +397,7 @@ fn make_blobstore_impl(
.into_future()
.boxify(),
Multiplexed {
multiplex_id,
scuba_table,
scuba_sample_rate,
blobstores,
@ -403,6 +405,7 @@ fn make_blobstore_impl(
has_components = true;
make_blobstore_multiplexed(
fb,
*multiplex_id,
scuba_table,
*scuba_sample_rate,
blobstores,
@ -414,6 +417,7 @@ fn make_blobstore_impl(
)
}
Scrub {
multiplex_id,
scuba_table,
scuba_sample_rate,
blobstores,
@ -422,6 +426,7 @@ fn make_blobstore_impl(
has_components = true;
make_blobstore_multiplexed(
fb,
*multiplex_id,
scuba_table,
*scuba_sample_rate,
blobstores,
@ -497,6 +502,7 @@ fn make_blobstore_impl(
pub fn make_blobstore_multiplexed(
fb: FacebookInit,
multiplex_id: MultiplexId,
scuba_table: &Option<String>,
scuba_sample_rate: NonZeroU64,
inner_config: &[(BlobstoreId, BlobConfig)],
@ -559,6 +565,7 @@ pub fn make_blobstore_multiplexed(
future::join_all(components).map({
move |components| match scrub_args {
Some((scrub_handler, scrub_action)) => Arc::new(ScrubBlobstore::new(
multiplex_id,
components,
queue,
scuba_table.map_or(ScubaSampleBuilder::with_discard(), |table| {
@ -570,6 +577,7 @@ pub fn make_blobstore_multiplexed(
))
as Arc<dyn Blobstore>,
None => Arc::new(MultiplexedBlobstore::new(
multiplex_id,
components,
queue,
scuba_table.map_or(ScubaSampleBuilder::with_discard(), |table| {

View File

@ -14,7 +14,7 @@ use futures::future::{self, Future, Loop};
use futures_ext::{BoxFuture, FutureExt};
use futures_stats::Timed;
use itertools::{Either, Itertools};
use metaconfig_types::BlobstoreId;
use metaconfig_types::{BlobstoreId, MultiplexId};
use mononoke_types::BlobstoreBytes;
use scuba::ScubaSampleBuilder;
use std::collections::{HashMap, HashSet};
@ -67,6 +67,7 @@ pub trait MultiplexedBlobstorePutHandler: Send + Sync {
}
pub struct MultiplexedBlobstoreBase {
multiplex_id: MultiplexId,
blobstores: Arc<[(BlobstoreId, Arc<dyn Blobstore>)]>,
handler: Arc<dyn MultiplexedBlobstorePutHandler>,
scuba: ScubaSampleBuilder,
@ -75,6 +76,7 @@ pub struct MultiplexedBlobstoreBase {
impl MultiplexedBlobstoreBase {
pub fn new(
multiplex_id: MultiplexId,
blobstores: Vec<(BlobstoreId, Arc<dyn Blobstore>)>,
handler: Arc<dyn MultiplexedBlobstorePutHandler>,
mut scuba: ScubaSampleBuilder,
@ -83,6 +85,7 @@ impl MultiplexedBlobstoreBase {
scuba.add_common_server_data();
Self {
multiplex_id,
blobstores: blobstores.into(),
handler,
scuba,
@ -358,7 +361,11 @@ impl Blobstore for MultiplexedBlobstoreBase {
impl fmt::Debug for MultiplexedBlobstoreBase {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "MultiplexedBlobstoreBase")?;
write!(
f,
"MultiplexedBlobstoreBase: multiplex_id: {}",
&self.multiplex_id
)?;
f.debug_map()
.entries(self.blobstores.iter().map(|(ref k, ref v)| (k, v)))
.finish()

View File

@ -14,7 +14,7 @@ use cloned::cloned;
use context::CoreContext;
use futures::future::{self, Future};
use futures_ext::{BoxFuture, FutureExt};
use metaconfig_types::BlobstoreId;
use metaconfig_types::{BlobstoreId, MultiplexId};
use mononoke_types::{BlobstoreBytes, DateTime};
use scuba::ScubaSampleBuilder;
use std::fmt;
@ -29,6 +29,7 @@ pub struct MultiplexedBlobstore {
impl MultiplexedBlobstore {
pub fn new(
multiplex_id: MultiplexId,
blobstores: Vec<(BlobstoreId, Arc<dyn Blobstore>)>,
queue: Arc<dyn BlobstoreSyncQueue>,
scuba: ScubaSampleBuilder,
@ -39,6 +40,7 @@ impl MultiplexedBlobstore {
});
Self {
blobstore: Arc::new(MultiplexedBlobstoreBase::new(
multiplex_id,
blobstores,
put_handler,
scuba,

View File

@ -16,7 +16,7 @@ use cloned::cloned;
use context::CoreContext;
use futures::future::{self, Future};
use futures_ext::{BoxFuture, FutureExt};
use metaconfig_types::{BlobstoreId, ScrubAction};
use metaconfig_types::{BlobstoreId, MultiplexId, ScrubAction};
use mononoke_types::BlobstoreBytes;
use scuba::ScubaSampleBuilder;
use slog::{info, warn};
@ -76,6 +76,7 @@ pub struct ScrubBlobstore {
impl ScrubBlobstore {
pub fn new(
multiplex_id: MultiplexId,
blobstores: Vec<(BlobstoreId, Arc<dyn Blobstore>)>,
queue: Arc<dyn BlobstoreSyncQueue>,
scuba: ScubaSampleBuilder,
@ -84,6 +85,7 @@ impl ScrubBlobstore {
scrub_action: ScrubAction,
) -> Self {
let inner = MultiplexedBlobstore::new(
multiplex_id,
blobstores.clone(),
queue.clone(),
scuba.clone(),

View File

@ -24,7 +24,7 @@ use futures::sync::oneshot;
use futures::Async;
use futures_ext::{BoxFuture, FutureExt};
use lock_ext::LockExt;
use metaconfig_types::{BlobstoreId, ScrubAction};
use metaconfig_types::{BlobstoreId, MultiplexId, ScrubAction};
use mononoke_types::BlobstoreBytes;
use nonzero_ext::nonzero;
use scuba::ScubaSampleBuilder;
@ -149,6 +149,7 @@ fn base(fb: FacebookInit) {
let bs1 = Arc::new(Tickable::new());
let log = Arc::new(LogHandler::new());
let bs = MultiplexedBlobstoreBase::new(
MultiplexId::new(1),
vec![
(BlobstoreId::new(0), bs0.clone()),
(BlobstoreId::new(1), bs1.clone()),
@ -282,6 +283,7 @@ fn multiplexed(fb: FacebookInit) {
let bid1 = BlobstoreId::new(1);
let bs1 = Arc::new(Tickable::new());
let bs = MultiplexedBlobstore::new(
MultiplexId::new(1),
vec![(bid0, bs0.clone()), (bid1, bs1.clone())],
queue.clone(),
ScubaSampleBuilder::with_discard(),
@ -354,6 +356,7 @@ fn scrubbed(fb: FacebookInit) {
let bid1 = BlobstoreId::new(1);
let bs1 = Arc::new(Tickable::new());
let bs = ScrubBlobstore::new(
MultiplexId::new(1),
vec![(bid0, bs0.clone()), (bid1, bs1.clone())],
queue.clone(),
ScubaSampleBuilder::with_discard(),
@ -423,6 +426,7 @@ fn scrubbed(fb: FacebookInit) {
let bid1 = BlobstoreId::new(1);
let bs1 = Arc::new(Tickable::new());
let bs = ScrubBlobstore::new(
MultiplexId::new(1),
vec![(bid0, bs0.clone()), (bid1, bs1.clone())],
queue.clone(),
ScubaSampleBuilder::with_discard(),
@ -508,6 +512,7 @@ fn queue_waits(fb: FacebookInit) {
let bs2 = Arc::new(Tickable::new());
let log = Arc::new(Tickable::new());
let bs = MultiplexedBlobstoreBase::new(
MultiplexId::new(1),
vec![
(BlobstoreId::new(0), bs0.clone()),
(BlobstoreId::new(1), bs1.clone()),

View File

@ -5,6 +5,7 @@ storage_config = "multiplex"
db_address="xdb.mononoke_production"
[storage.multiplex.blobstore.multiplexed]
multiplex_id = 1
components = [
{ blobstore_id = 1, blobstore = { manifold = { manifold_bucket = "mononoke_prod" } } },
{ blobstore_id = 2, blobstore = { manifold = { manifold_bucket = "mononoke_prod_replica" } } },

View File

@ -947,8 +947,8 @@ mod test {
use super::*;
use maplit::{btreemap, hashmap};
use metaconfig_types::{
BlobConfig, BlobstoreId, FilestoreParams, MetadataDBConfig, ShardedFilenodesParams,
SourceControlServiceMonitoring,
BlobConfig, BlobstoreId, FilestoreParams, MetadataDBConfig, MultiplexId,
ShardedFilenodesParams, SourceControlServiceMonitoring,
};
use nonzero_ext::nonzero;
use pretty_assertions::assert_eq;
@ -1353,6 +1353,7 @@ mod test {
sharded_filenodes = { shard_map = "db_address_shards", shard_num = 123 }
[storage.main.blobstore.multiplexed]
multiplex_id = 1
scuba_table = "blobstore_scuba_table"
components = [
{ blobstore_id = 0, blobstore = { manifold = { manifold_bucket = "bucket" } } },
@ -1459,6 +1460,7 @@ mod test {
RepoConfigs::read_configs(fb, tmp_dir.path()).expect("failed to read configs");
let multiplex = BlobConfig::Multiplexed {
multiplex_id: MultiplexId::new(1),
scuba_table: Some("blobstore_scuba_table".to_string()),
scuba_sample_rate: nonzero!(100u64),
blobstores: vec![
@ -1855,6 +1857,7 @@ mod test {
sharded_filenodes = { shard_map="some-shards", shard_num=123 }
[multiplex_store.blobstore.multiplexed]
multiplex_id = 1
components = [
{ blobstore_id = 1, blobstore = { blob_files = { path = "/tmp/foo" } } },
]
@ -1888,6 +1891,7 @@ mod test {
enabled: true,
storage_config: StorageConfig {
blobstore: BlobConfig::Multiplexed {
multiplex_id: MultiplexId::new(1),
scuba_table: None,
scuba_sample_rate: nonzero!(100u64),
blobstores: vec![

View File

@ -509,6 +509,45 @@ impl From<BlobstoreId> for ScubaValue {
}
}
/// Id used to identify storage configuration for a multiplexed blobstore.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub struct MultiplexId(i32);
impl MultiplexId {
/// Construct a MultiplexId from an i32.
pub fn new(id: i32) -> Self {
Self(id)
}
}
impl fmt::Display for MultiplexId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<MultiplexId> for Value {
fn from(id: MultiplexId) -> Self {
Value::Int(id.0.into())
}
}
impl ConvIr<MultiplexId> for MultiplexId {
fn new(v: Value) -> Result<Self, FromValueError> {
Ok(MultiplexId(from_value_opt(v)?))
}
fn commit(self) -> Self {
self
}
fn rollback(self) -> Value {
self.into()
}
}
impl FromValue for MultiplexId {
type Intermediate = MultiplexId;
}
/// Define storage needed for repo.
/// Storage consists of a blobstore and some kind of SQL DB for metadata. The configurations
/// can be broadly classified as "local" and "remote". "Local" is primarily for testing, and is
@ -623,6 +662,8 @@ pub enum BlobConfig {
},
/// Multiplex across multiple blobstores for redundancy
Multiplexed {
/// A unique ID that identifies this multiplex configuration
multiplex_id: MultiplexId,
/// A scuba table I guess
scuba_table: Option<String>,
/// Set of blobstores being multiplexed over
@ -632,6 +673,8 @@ pub enum BlobConfig {
},
/// Multiplex across multiple blobstores scrubbing for errors
Scrub {
/// A unique ID that identifies this multiplex configuration
multiplex_id: MultiplexId,
/// A scuba table I guess
scuba_table: Option<String>,
/// Set of blobstores being multiplexed over
@ -675,6 +718,7 @@ impl BlobConfig {
use BlobConfig::{Multiplexed, Scrub};
if let Multiplexed {
multiplex_id,
scuba_table,
scuba_sample_rate,
blobstores,
@ -686,6 +730,7 @@ impl BlobConfig {
store.set_scrubbed(scrub_action);
}
*self = Scrub {
multiplex_id: *multiplex_id,
scuba_table,
scuba_sample_rate: *scuba_sample_rate,
blobstores,
@ -727,6 +772,10 @@ impl TryFrom<&'_ RawBlobstoreConfig> for BlobConfig {
))?,
},
RawBlobstoreConfig::multiplexed(def) => BlobConfig::Multiplexed {
multiplex_id: def
.multiplex_id
.map(|id| MultiplexId::new(id))
.ok_or_else(|| anyhow!("missing multiplex_id from configuration"))?,
scuba_table: def.scuba_table.clone(),
scuba_sample_rate: def
.scuba_sample_rate

View File

@ -489,6 +489,7 @@ function setup_mononoke_storage_config {
$(db_config "$blobstorename")
[$blobstorename.blobstore.multiplexed]
multiplex_id = 1
components = [
CONFIG
local i

View File

@ -175,6 +175,7 @@ pub async fn open_blobstore(
(
Some(scrub_handler),
BlobConfig::Scrub {
multiplex_id,
scuba_table,
scuba_sample_rate,
blobstores,
@ -192,6 +193,7 @@ pub async fn open_blobstore(
make_blobstore_multiplexed(
fb,
multiplex_id,
&scuba_table,
scuba_sample_rate,
&blobstores,
@ -205,12 +207,14 @@ pub async fn open_blobstore(
(
None,
BlobConfig::Multiplexed {
multiplex_id,
scuba_table,
scuba_sample_rate,
blobstores,
},
) => make_blobstore_multiplexed(
fb,
multiplex_id,
&scuba_table,
scuba_sample_rate,
&blobstores,