wal multiplex: define new config

Summary:
This diff declares a new config for the WAL multiplexed blobstore. It is different from the current multiplex in the way it defines write/read quorums.

The main reason for a separate config is ability to iteratively switch repos and different parts of the Mononoke tools from the old multiplex to the new one.

Reviewed By: yancouto

Differential Revision: D40108825

fbshipit-source-id: 9dae447557702474b6427ad247164a879da90861
This commit is contained in:
Aida Getoeva 2022-10-21 12:24:33 -07:00 committed by Facebook GitHub Bot
parent 95d2a8d786
commit 375e37a307
18 changed files with 290 additions and 86 deletions

View File

@ -1,4 +1,4 @@
// @generated SignedSource<<3772e34f69e6141b1817cd398101609b>>
// @generated SignedSource<<c0bb5ff6cebc1cec59e4fbdc82f92cb8>>
// DO NOT EDIT THIS FILE MANUALLY!
// This file is a mechanical copy of the version in the configerator repo. To
// modify it, edit the copy in the configerator repo instead and copy it over by
@ -88,7 +88,7 @@ struct RawRepoDefinition {
// Key into RawRepoConfigs.acl_region_configs for the definition of
// ACL regions for this repo.
11: optional string acl_region_config;
// Default hashing scheme used for revisions given by clients
// when they interact with the repo without specifying this explicitly.
12: optional RawCommitIdentityScheme default_commit_identity_scheme;
@ -100,10 +100,10 @@ enum RawCommitIdentityScheme {
/// Commits are identified by the 32-byte hash of Mononoke's bonsai
/// changeset.
BONSAI = 1,
/// Commits are identified by the 20-byte hash of the Mercurial commit.
HG = 2,
/// Commits are identified by the 20-byte hash of the Git commit.
GIT = 3,
}
@ -301,6 +301,15 @@ struct RawBlobstoreMultiplexed {
// The number of reads needed to decided a blob is not present
8: optional i64 not_present_read_quorum;
} (rust.exhaustive)
struct RawBlobstoreMultiplexedWal {
1: i32 multiplex_id;
2: list<RawBlobstoreIdConfig> components;
3: i64 write_quorum;
4: RawDbConfig queue_db;
// The scuba table to log stats per underlying blobstore
5: optional string scuba_table;
6: optional i64 scuba_sample_rate;
} (rust.exhaustive)
struct RawBlobstoreManifoldWithTtl {
1: string manifold_bucket;
2: string manifold_prefix;
@ -356,6 +365,7 @@ union RawBlobstoreConfig {
9: RawBlobstoreLogging logging;
10: RawBlobstorePack pack;
11: RawBlobstoreS3 s3;
12: RawBlobstoreMultiplexedWal multiplexed_wal;
}
// A write-mostly blobstore is one that is not read from in normal operation.

View File

@ -26,6 +26,7 @@ futures_watchdog = { version = "0.1.0", path = "../../common/futures_watchdog" }
logblob = { version = "0.1.0", path = "../logblob" }
metaconfig_types = { version = "0.1.0", path = "../../metaconfig/types" }
multiplexedblob = { version = "0.1.0", path = "../multiplexedblob" }
multiplexedblob_wal = { version = "0.1.0", path = "../multiplexedblob_wal" }
packblob = { version = "0.1.0", path = "../packblob" }
prefixblob = { version = "0.1.0", path = "../prefixblob" }
rand_distr = "0.4"

View File

@ -22,6 +22,7 @@ use blobstore::ErrorKind;
use blobstore::PutBehaviour;
use blobstore::DEFAULT_PUT_BEHAVIOUR;
use blobstore_sync_queue::SqlBlobstoreSyncQueue;
use blobstore_sync_queue::SqlBlobstoreWal;
use cacheblob::CachelibBlobstoreOptions;
use cached_config::ConfigStore;
use chaosblob::ChaosBlobstore;
@ -48,6 +49,8 @@ use multiplexedblob::ScrubBlobstore;
use multiplexedblob::ScrubHandler;
use multiplexedblob::ScrubOptions;
use multiplexedblob::ScrubWriteMostly;
use multiplexedblob_wal::Scuba as WalScuba;
use multiplexedblob_wal::WalMultiplexedBlobstore;
use packblob::PackBlob;
use packblob::PackOptions;
use readonlyblob::ReadOnlyBlobstore;
@ -571,6 +574,34 @@ fn make_blobstore_put_ops<'a>(
.watched(logger)
.await?
}
MultiplexedWAL {
multiplex_id,
blobstores,
write_quorum,
queue_db,
scuba_table,
scuba_sample_rate,
} => {
needs_wrappers = false;
make_multiplexed_wal(
fb,
multiplex_id,
queue_db,
scuba_table,
scuba_sample_rate,
blobstores,
write_quorum,
mysql_options,
readonly_storage,
blobstore_options,
logger,
config_store,
scrub_handler,
component_sampler,
)
.watched(logger)
.await?
}
Logging {
blobconfig,
scuba_table,
@ -688,50 +719,16 @@ async fn make_blobstore_multiplexed<'a>(
scrub_handler: &'a Arc<dyn ScrubHandler>,
component_sampler: Option<&'a Arc<dyn ComponentSamplingHandler>>,
) -> Result<Arc<dyn BlobstorePutOps>, Error> {
let component_readonly = blobstore_options
.scrub_options
.as_ref()
.map_or(ReadOnlyStorage(false), |v| {
ReadOnlyStorage(v.scrub_action != ScrubAction::Repair)
});
let mut applied_chaos = false;
let components = future::try_join_all(inner_config.into_iter().map({
move |(blobstoreid, store_type, config)| {
let mut blobstore_options = blobstore_options.clone();
if blobstore_options.chaos_options.has_chaos() {
if applied_chaos {
blobstore_options = BlobstoreOptions {
chaos_options: ChaosOptions::new(None, None),
..blobstore_options
};
} else {
applied_chaos = true;
}
}
async move {
let store = make_blobstore_put_ops(
fb,
config,
mysql_options,
component_readonly,
&blobstore_options,
logger,
config_store,
scrub_handler,
component_sampler,
Some(blobstoreid),
)
.watched(logger)
.await?;
Result::<_, Error>::Ok((blobstoreid, store_type, store))
}
}
}))
let (normal_components, write_mostly_components) = setup_inner_blobstores(
fb,
inner_config,
mysql_options,
blobstore_options,
logger,
config_store,
scrub_handler,
component_sampler,
)
.await?;
let queue = SqlBlobstoreSyncQueue::with_database_config(
@ -741,21 +738,6 @@ async fn make_blobstore_multiplexed<'a>(
readonly_storage.0,
)?;
// For now, `partition` could do this, but this will be easier to extend when we introduce more store types
let (normal_components, write_mostly_components) = {
let mut normal_components = vec![];
let mut write_mostly_components = vec![];
for (blobstore_id, store_type, store) in components.into_iter() {
match store_type {
MultiplexedStoreType::Normal => normal_components.push((blobstore_id, store)),
MultiplexedStoreType::WriteMostly => {
write_mostly_components.push((blobstore_id, store))
}
}
}
(normal_components, write_mostly_components)
};
let blobstore = match &blobstore_options.scrub_options {
Some(scrub_options) => Arc::new(ScrubBlobstore::new(
multiplex_id,
@ -795,3 +777,129 @@ async fn make_blobstore_multiplexed<'a>(
Ok(blobstore)
}
async fn make_multiplexed_wal<'a>(
fb: FacebookInit,
multiplex_id: MultiplexId,
queue_db: DatabaseConfig,
scuba_table: Option<String>,
scuba_sample_rate: NonZeroU64,
inner_config: Vec<(BlobstoreId, MultiplexedStoreType, BlobConfig)>,
write_quorum: usize,
mysql_options: &'a MysqlOptions,
readonly_storage: ReadOnlyStorage,
blobstore_options: &'a BlobstoreOptions,
logger: &'a Logger,
config_store: &'a ConfigStore,
scrub_handler: &'a Arc<dyn ScrubHandler>,
component_sampler: Option<&'a Arc<dyn ComponentSamplingHandler>>,
) -> Result<Arc<dyn BlobstorePutOps>, Error> {
let (normal_components, write_mostly_components) = setup_inner_blobstores(
fb,
inner_config,
mysql_options,
blobstore_options,
logger,
config_store,
scrub_handler,
component_sampler,
)
.await?;
let wal_queue = Arc::new(SqlBlobstoreWal::with_database_config(
fb,
&queue_db,
mysql_options,
readonly_storage.0,
)?);
let scuba = WalScuba::new_from_raw(fb, scuba_table, scuba_sample_rate)?;
let blobstore = match &blobstore_options.scrub_options {
Some(_scrub_options) => {
// TODO(aida): Support Scrubbing multiplex
bail!("Scrub blobstore is not supported for the WAl multiplexed storage");
}
None => Arc::new(WalMultiplexedBlobstore::new(
multiplex_id,
wal_queue,
normal_components,
write_mostly_components,
write_quorum,
None, /* use default timeouts */
scuba,
)?) as Arc<dyn BlobstorePutOps>,
};
Ok(blobstore)
}
type InnerBlobstore = (BlobstoreId, Arc<dyn BlobstorePutOps>);
async fn setup_inner_blobstores<'a>(
fb: FacebookInit,
inner_config: Vec<(BlobstoreId, MultiplexedStoreType, BlobConfig)>,
mysql_options: &'a MysqlOptions,
blobstore_options: &'a BlobstoreOptions,
logger: &'a Logger,
config_store: &'a ConfigStore,
scrub_handler: &'a Arc<dyn ScrubHandler>,
component_sampler: Option<&'a Arc<dyn ComponentSamplingHandler>>,
) -> Result<(Vec<InnerBlobstore>, Vec<InnerBlobstore>), Error> {
let component_readonly = blobstore_options
.scrub_options
.as_ref()
.map_or(ReadOnlyStorage(false), |v| {
ReadOnlyStorage(v.scrub_action != ScrubAction::Repair)
});
let mut applied_chaos = false;
let components = future::try_join_all(inner_config.into_iter().map({
move |(blobstoreid, store_type, config)| {
let mut blobstore_options = blobstore_options.clone();
if blobstore_options.chaos_options.has_chaos() {
if applied_chaos {
blobstore_options = BlobstoreOptions {
chaos_options: ChaosOptions::new(None, None),
..blobstore_options
};
} else {
applied_chaos = true;
}
}
async move {
let store = make_blobstore_put_ops(
fb,
config,
mysql_options,
component_readonly,
&blobstore_options,
logger,
config_store,
scrub_handler,
component_sampler,
Some(blobstoreid),
)
.watched(logger)
.await?;
Result::<_, Error>::Ok((blobstoreid, store_type, store))
}
}
}))
.await?;
// For now, `partition` could do this, but this will be easier to extend when we introduce more store types
let mut normal_components = vec![];
let mut write_mostly_components = vec![];
for (blobstore_id, store_type, store) in components.into_iter() {
match store_type {
MultiplexedStoreType::Normal => normal_components.push((blobstore_id, store)),
MultiplexedStoreType::WriteMostly => {
write_mostly_components.push((blobstore_id, store))
}
}
}
Ok((normal_components, write_mostly_components))
}

View File

@ -15,6 +15,7 @@ blobstore_stats = { version = "0.1.0", path = "../blobstore_stats" }
blobstore_sync_queue = { version = "0.1.0", path = "../../blobstore_sync_queue" }
cloned = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
context = { version = "0.1.0", path = "../../server/context" }
fbinit = { version = "0.1.2", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
futures = { version = "0.3.22", features = ["async-await", "compat"] }
futures_stats = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
metaconfig_types = { version = "0.1.0", path = "../../metaconfig/types" }
@ -27,6 +28,6 @@ tokio = { version = "1.21.2", features = ["full", "test-util", "tracing"] }
[dev-dependencies]
blobstore_test_utils = { version = "0.1.0", path = "../test_utils" }
bytes = { version = "1.1", features = ["serde"] }
fbinit = { version = "0.1.2", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
fbinit-tokio = { version = "0.1.2", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
nonzero_ext = "0.2"
sql_construct = { version = "0.1.0", path = "../../common/sql_construct" }

View File

@ -30,6 +30,7 @@ use cloned::cloned;
use context::CoreContext;
use context::PerfCounterType;
use context::SessionClass;
use fbinit::FacebookInit;
use futures::stream::FuturesUnordered;
use futures::Future;
use futures::StreamExt;
@ -90,11 +91,21 @@ pub struct Scuba {
}
impl Scuba {
pub fn new(mut scuba: MononokeScubaSampleBuilder, sample_rate: u64) -> Result<Self> {
scuba.add_common_server_data();
pub fn new_from_raw(
fb: FacebookInit,
scuba_table: Option<String>,
sample_rate: NonZeroU64,
) -> Result<Self> {
let scuba = scuba_table
.map_or(Ok(MononokeScubaSampleBuilder::with_discard()), |table| {
MononokeScubaSampleBuilder::new(fb, &table)
})?;
let sample_rate =
NonZeroU64::new(sample_rate).ok_or_else(|| anyhow!("Scuba sample rate cannot be 0"))?;
Self::new(scuba, sample_rate)
}
pub fn new(mut scuba: MononokeScubaSampleBuilder, sample_rate: NonZeroU64) -> Result<Self> {
scuba.add_common_server_data();
Ok(Self { scuba, sample_rate })
}

View File

@ -34,6 +34,7 @@ use futures::task::Poll;
use metaconfig_types::BlobstoreId;
use metaconfig_types::MultiplexId;
use mononoke_types::BlobstoreBytes;
use nonzero_ext::nonzero;
use scuba_ext::MononokeScubaSampleBuilder;
use sql_construct::SqlConstruct;
@ -43,7 +44,7 @@ use crate::WalMultiplexedBlobstore;
#[fbinit::test]
async fn test_quorum_is_valid(_fb: FacebookInit) -> Result<()> {
let scuba = Scuba::new(MononokeScubaSampleBuilder::with_discard(), 1u64)?;
let scuba = Scuba::new(MononokeScubaSampleBuilder::with_discard(), nonzero!(1u64))?;
let wal = Arc::new(SqlBlobstoreWal::with_sqlite_in_memory()?);
// Check the quorum cannot be zero
@ -945,7 +946,7 @@ fn setup_multiplex(
)> {
let (tickable_queue, wal_queue) = setup_queue();
let (tickable_blobstores, blobstores) = setup_blobstores(num);
let scuba = Scuba::new(MononokeScubaSampleBuilder::with_discard(), 1u64)?;
let scuba = Scuba::new(MononokeScubaSampleBuilder::with_discard(), nonzero!(1u64))?;
let multiplex = WalMultiplexedBlobstore::new(
MultiplexId::new(1),
wal_queue,

View File

@ -622,7 +622,8 @@ impl MononokeApp {
let blob_config = match repo_blobstore_args.inner_blobstore_id {
None => storage_config.blobstore,
Some(id) => match storage_config.blobstore {
BlobConfig::Multiplexed { blobstores, .. } => {
BlobConfig::Multiplexed { blobstores, .. }
| BlobConfig::MultiplexedWAL { blobstores, .. } => {
let sought_id = BlobstoreId::new(id);
blobstores
.into_iter()

View File

@ -109,7 +109,8 @@ fn get_blobconfig(blob_config: BlobConfig, inner_blobstore_id: Option<u64>) -> R
match inner_blobstore_id {
None => Ok(blob_config),
Some(inner_blobstore_id) => match blob_config {
BlobConfig::Multiplexed { blobstores, .. } => {
BlobConfig::Multiplexed { blobstores, .. }
| BlobConfig::MultiplexedWAL { blobstores, .. } => {
let seeked_id = BlobstoreId::new(inner_blobstore_id);
blobstores
.into_iter()

View File

@ -58,7 +58,8 @@ fn get_blobconfig(blob_config: BlobConfig, inner_blobstore_id: Option<u64>) -> R
match inner_blobstore_id {
None => Ok(blob_config),
Some(inner_blobstore_id) => match blob_config {
BlobConfig::Multiplexed { blobstores, .. } => {
BlobConfig::Multiplexed { blobstores, .. }
| BlobConfig::MultiplexedWAL { blobstores, .. } => {
let seeked_id = BlobstoreId::new(inner_blobstore_id);
blobstores
.into_iter()

View File

@ -261,7 +261,8 @@ async fn open_repo<'a>(
fn override_blobconfig(blob_config: &mut BlobConfig, inner_blobstore_id: u64) -> Result<(), Error> {
match blob_config {
BlobConfig::Multiplexed { ref blobstores, .. } => {
BlobConfig::Multiplexed { ref blobstores, .. }
| BlobConfig::MultiplexedWAL { ref blobstores, .. } => {
let sought_id = BlobstoreId::new(inner_blobstore_id);
let inner_blob_config = blobstores
.iter()

View File

@ -76,7 +76,8 @@ fn get_blobconfig(
// If the outer store is a mux, find th requested inner store
if let Some(inner_blobstore_id) = inner_blobstore_id {
blob_config = match blob_config {
BlobConfig::Multiplexed { blobstores, .. } => {
BlobConfig::Multiplexed { blobstores, .. }
| BlobConfig::MultiplexedWAL { blobstores, .. } => {
let required_id = BlobstoreId::new(inner_blobstore_id);
blobstores
.into_iter()

View File

@ -219,14 +219,21 @@ fn parse_args(fb: FacebookInit) -> Result<Config, Error> {
primary: RemoteDatabaseConfig { db_address },
..
}),
blobstore:
BlobConfig::Multiplexed {
blobstores,
multiplex_id,
..
},
blobstore,
..
} => (blobstores, multiplex_id, db_address),
} => match blobstore {
BlobConfig::Multiplexed {
blobstores,
multiplex_id,
..
}
| BlobConfig::MultiplexedWAL {
blobstores,
multiplex_id,
..
} => (blobstores, multiplex_id, db_address),
storage => return Err(format_err!("unsupported storage: {:?}", storage)),
},
storage => return Err(format_err!("unsupported storage: {:?}", storage)),
};
let blobstore_args = blobstores

View File

@ -46,7 +46,8 @@ fn get_blobconfig(
// If the outer store is a mux, find th requested inner store
if let Some(inner_blobstore_id) = inner_blobstore_id {
blob_config = match blob_config {
BlobConfig::Multiplexed { blobstores, .. } => {
BlobConfig::Multiplexed { blobstores, .. }
| BlobConfig::MultiplexedWAL { blobstores, .. } => {
let required_id = BlobstoreId::new(inner_blobstore_id);
blobstores
.into_iter()

View File

@ -61,6 +61,11 @@ impl WaitForReplication {
blobstores,
queue_db: DatabaseConfig::Remote(remote),
..
}
| BlobConfig::MultiplexedWAL {
blobstores,
queue_db: DatabaseConfig::Remote(remote),
..
} => {
#[cfg(fbcode_build)]
{

View File

@ -173,6 +173,38 @@ impl Convert for RawBlobstoreConfig {
.convert()?,
}
}
RawBlobstoreConfig::multiplexed_wal(raw) => {
let write_quorum: usize = raw.write_quorum.try_into()?;
if write_quorum > raw.components.len() {
return Err(anyhow!(
"Not enough blobstores for {} write quorum (have {})",
write_quorum,
raw.components.len()
));
}
BlobConfig::MultiplexedWAL {
multiplex_id: MultiplexId::new(raw.multiplex_id),
blobstores: raw
.components
.into_iter()
.map(|comp| {
Ok((
BlobstoreId::new(comp.blobstore_id.try_into()?),
comp.store_type
.convert()?
.unwrap_or(MultiplexedStoreType::Normal),
comp.blobstore.convert()?,
))
})
.collect::<Result<Vec<_>>>()?,
write_quorum,
queue_db: raw.queue_db.convert()?,
scuba_table: raw.scuba_table,
scuba_sample_rate: parse_scuba_sample_rate(raw.scuba_sample_rate)?,
}
}
RawBlobstoreConfig::manifold_with_ttl(raw) => {
let ttl = Duration::from_secs(raw.ttl_secs.try_into()?);
BlobConfig::ManifoldWithTtl {

View File

@ -860,6 +860,22 @@ pub enum BlobConfig {
/// DB config to use for the sync queue
queue_db: DatabaseConfig,
},
/// Multiplex across multiple blobstores for redundancy based on a WAL approach
MultiplexedWAL {
/// A unique ID that identifies this multiplex configuration
multiplex_id: MultiplexId,
/// Set of blobstores being multiplexed over
blobstores: Vec<(BlobstoreId, MultiplexedStoreType, BlobConfig)>,
/// The number of writes that must succeed for the multiplex `put` to succeed
write_quorum: usize,
/// DB config to use for the WAL
queue_db: DatabaseConfig,
/// A scuba table to log stats per inner blobstore
scuba_table: Option<String>,
/// 1 in scuba_sample_rate samples will be logged for both
/// multiplex and per blobstore scuba tables
scuba_sample_rate: NonZeroU64,
},
/// Store in a manifold bucket, but every object will have an expiration
ManifoldWithTtl {
/// Bucket of the backing Manifold blobstore to connect to
@ -911,7 +927,7 @@ impl BlobConfig {
match self {
Disabled | Files { .. } | Sqlite { .. } => true,
Manifold { .. } | Mysql { .. } | ManifoldWithTtl { .. } | S3 { .. } => false,
Multiplexed { blobstores, .. } => blobstores
Multiplexed { blobstores, .. } | MultiplexedWAL { blobstores, .. } => blobstores
.iter()
.map(|(_, _, config)| config)
.all(BlobConfig::is_local),
@ -927,6 +943,10 @@ impl BlobConfig {
ref mut scuba_sample_rate,
..
}
| Self::MultiplexedWAL {
ref mut scuba_sample_rate,
..
}
| Self::Logging {
ref mut scuba_sample_rate,
..

View File

@ -53,7 +53,8 @@ fn get_blobconfig(blob_config: BlobConfig, inner_blobstore_id: Option<u64>) -> R
match inner_blobstore_id {
None => Ok(blob_config),
Some(inner_blobstore_id) => match blob_config {
BlobConfig::Multiplexed { blobstores, .. } => {
BlobConfig::Multiplexed { blobstores, .. }
| BlobConfig::MultiplexedWAL { blobstores, .. } => {
let seeked_id = BlobstoreId::new(inner_blobstore_id);
blobstores
.into_iter()

View File

@ -149,7 +149,8 @@ pub fn replace_blobconfig(
is_scrubbing: bool,
) -> Result<(), Error> {
match blob_config {
BlobConfig::Multiplexed { ref blobstores, .. } => {
BlobConfig::Multiplexed { ref blobstores, .. }
| BlobConfig::MultiplexedWAL { ref blobstores, .. } => {
if is_scrubbing {
// Make sure the repair stats are set to zero for each store.
// Without this the new stats only show up when a repair is