wal: Add sharded db config to queue

Summary:
**Context**: We want to roll out the new WAL multiplexed blobstore and need final tweaks before doing that. See https://fb.quip.com/vyTWAgGZSA2Z

This syncs configerator changes from D40677044 and updates our code so that WAL config now takes a sharded DB instead of unsharded. The code for dealing with sharded connections was already added on D40673033, this is just the config to enable it.

For now this is not enabled anywhere. The plan is to:
- Test it locally with locally built binaries
- Move a small-ish repo
- Move everything

This is once we have full feature parity, which is still not done (things like Scuba logging and scrub blobstores are missing)

Differential Revision: D40675512

fbshipit-source-id: 70f74a53dbf10eadc73ce9e210ec7719f866e30a
This commit is contained in:
Yan Soares Couto 2022-11-03 15:12:55 -07:00 committed by Facebook GitHub Bot
parent 6b5693800f
commit d12ebd7417
17 changed files with 144 additions and 48 deletions

View File

@ -1,4 +1,4 @@
// @generated SignedSource<<42c584de36ad80df10f6f80a5754d573>>
// @generated SignedSource<<7f3a2784e21d4beee543abfca1df86c0>>
// DO NOT EDIT THIS FILE MANUALLY!
// This file is a mechanical copy of the version in the configerator repo. To
// modify it, edit the copy in the configerator repo instead and copy it over by
@ -301,11 +301,12 @@ struct RawBlobstoreMultiplexed {
// The number of reads needed to decided a blob is not present
8: optional i64 not_present_read_quorum;
} (rust.exhaustive)
// See docs in fbcode/eden/mononoke/metaconfig/types/src/lib.rs:BlobConfig::MultiplexedWal
struct RawBlobstoreMultiplexedWal {
1: i32 multiplex_id;
2: list<RawBlobstoreIdConfig> components;
3: i64 write_quorum;
4: RawDbConfig queue_db;
4: RawShardedDbConfig queue_db;
// The scuba table to log stats per underlying blobstore
5: optional string scuba_table;
6: optional i64 scuba_sample_rate;
@ -410,6 +411,11 @@ union RawDbConfig {
2: RawDbRemote remote;
}
union RawShardedDbConfig {
1: RawDbLocal local;
2: RawDbShardedRemote remote;
}
struct RawRemoteMetadataConfig {
1: RawDbRemote primary;
2: RawDbShardableRemote filenodes;
@ -875,8 +881,7 @@ struct RawSparseProfilesConfig {
3: optional list<string> monitored_profiles;
}
struct RawLoggingDestinationLogger {
} (rust.exhaustive)
struct RawLoggingDestinationLogger {} (rust.exhaustive)
struct RawLoggingDestinationScribe {
// Scribe category to log to
@ -885,9 +890,9 @@ struct RawLoggingDestinationScribe {
union RawLoggingDestination {
// Log to Logger
1: RawLoggingDestinationLogger logger,
1: RawLoggingDestinationLogger logger;
// Log to a scribe category
2: RawLoggingDestinationScribe scribe,
2: RawLoggingDestinationScribe scribe;
}
struct RawUpdateLoggingConfig {

View File

@ -43,6 +43,7 @@ use metaconfig_types::MultiplexId;
use metaconfig_types::MultiplexedStoreType;
use metaconfig_types::PackConfig;
use metaconfig_types::ShardableRemoteDatabaseConfig;
use metaconfig_types::ShardedDatabaseConfig;
use multiplexedblob::MultiplexedBlobstore;
use multiplexedblob::ScrubAction;
use multiplexedblob::ScrubBlobstore;
@ -59,6 +60,7 @@ use samplingblob::SamplingBlobstorePutOps;
use scuba_ext::MononokeScubaSampleBuilder;
use slog::Logger;
use sql_construct::SqlConstructFromDatabaseConfig;
use sql_construct::SqlConstructFromShardedDatabaseConfig;
use sql_ext::facebook::MysqlOptions;
use sqlblob::CountedSqlblob;
use sqlblob::Sqlblob;
@ -574,7 +576,7 @@ fn make_blobstore_put_ops<'a>(
.watched(logger)
.await?
}
MultiplexedWAL {
MultiplexedWal {
multiplex_id,
blobstores,
write_quorum,
@ -781,7 +783,7 @@ async fn make_blobstore_multiplexed<'a>(
async fn make_multiplexed_wal<'a>(
fb: FacebookInit,
multiplex_id: MultiplexId,
queue_db: DatabaseConfig,
queue_db: ShardedDatabaseConfig,
scuba_table: Option<String>,
scuba_sample_rate: NonZeroU64,
inner_config: Vec<(BlobstoreId, MultiplexedStoreType, BlobConfig)>,
@ -806,7 +808,7 @@ async fn make_multiplexed_wal<'a>(
)
.await?;
let wal_queue = Arc::new(SqlBlobstoreWal::with_database_config(
let wal_queue = Arc::new(SqlBlobstoreWal::with_sharded_database_config(
fb,
&queue_db,
mysql_options,

View File

@ -623,7 +623,7 @@ impl MononokeApp {
None => storage_config.blobstore,
Some(id) => match storage_config.blobstore {
BlobConfig::Multiplexed { blobstores, .. }
| BlobConfig::MultiplexedWAL { blobstores, .. } => {
| BlobConfig::MultiplexedWal { blobstores, .. } => {
let sought_id = BlobstoreId::new(id);
blobstores
.into_iter()

View File

@ -110,7 +110,7 @@ fn get_blobconfig(blob_config: BlobConfig, inner_blobstore_id: Option<u64>) -> R
None => Ok(blob_config),
Some(inner_blobstore_id) => match blob_config {
BlobConfig::Multiplexed { blobstores, .. }
| BlobConfig::MultiplexedWAL { blobstores, .. } => {
| BlobConfig::MultiplexedWal { blobstores, .. } => {
let seeked_id = BlobstoreId::new(inner_blobstore_id);
blobstores
.into_iter()

View File

@ -59,7 +59,7 @@ fn get_blobconfig(blob_config: BlobConfig, inner_blobstore_id: Option<u64>) -> R
None => Ok(blob_config),
Some(inner_blobstore_id) => match blob_config {
BlobConfig::Multiplexed { blobstores, .. }
| BlobConfig::MultiplexedWAL { blobstores, .. } => {
| BlobConfig::MultiplexedWal { blobstores, .. } => {
let seeked_id = BlobstoreId::new(inner_blobstore_id);
blobstores
.into_iter()

View File

@ -51,6 +51,7 @@ use metaconfig_types::BlobConfig;
use metaconfig_types::BlobstoreId;
use metaconfig_types::DatabaseConfig;
use metaconfig_types::MultiplexedStoreType;
use metaconfig_types::ShardedDatabaseConfig;
use metaconfig_types::StorageConfig;
use mononoke_app::fb303::Fb303AppExtension;
use mononoke_app::MononokeApp;
@ -58,6 +59,7 @@ use mononoke_app::MononokeAppBuilder;
use slog::info;
use slog::o;
use sql_construct::SqlConstructFromDatabaseConfig;
use sql_construct::SqlConstructFromShardedDatabaseConfig;
use sql_ext::facebook::MysqlOptions;
use sync_healer::SyncHealer;
use wait_for_replication::WaitForReplication;
@ -196,7 +198,7 @@ async fn maybe_schedule_healer_for_storage(
));
Result::<_, Error>::Ok(healer)
}
BlobConfig::MultiplexedWAL {
BlobConfig::MultiplexedWal {
blobstores,
multiplex_id,
queue_db,
@ -281,14 +283,18 @@ fn setup_sync_queue(
fn setup_wal(
fb: FacebookInit,
mysql_options: &MysqlOptions,
queue_db: DatabaseConfig,
queue_db: ShardedDatabaseConfig,
readonly_storage: ReadOnlyStorage,
dry_run: bool,
shard_range: ShardRange,
) -> Result<Arc<dyn BlobstoreWal>> {
let wal =
SqlBlobstoreWal::with_database_config(fb, &queue_db, mysql_options, readonly_storage.0)
.context("While opening WAL")?;
let wal = SqlBlobstoreWal::with_sharded_database_config(
fb,
&queue_db,
mysql_options,
readonly_storage.0,
)
.context("While opening WAL")?;
let wal: Arc<dyn BlobstoreWal> = if dry_run {
Arc::new(DummyBlobstoreWal::new(wal))

View File

@ -262,7 +262,7 @@ async fn open_repo<'a>(
fn override_blobconfig(blob_config: &mut BlobConfig, inner_blobstore_id: u64) -> Result<(), Error> {
match blob_config {
BlobConfig::Multiplexed { ref blobstores, .. }
| BlobConfig::MultiplexedWAL { ref blobstores, .. } => {
| BlobConfig::MultiplexedWal { ref blobstores, .. } => {
let sought_id = BlobstoreId::new(inner_blobstore_id);
let inner_blob_config = blobstores
.iter()

View File

@ -68,7 +68,7 @@ fn get_blobconfig(
if let Some(inner_blobstore_id) = inner_blobstore_id {
blob_config = match blob_config {
BlobConfig::Multiplexed { blobstores, .. }
| BlobConfig::MultiplexedWAL { blobstores, .. } => {
| BlobConfig::MultiplexedWal { blobstores, .. } => {
let required_id = BlobstoreId::new(inner_blobstore_id);
blobstores
.into_iter()

View File

@ -227,7 +227,7 @@ fn parse_args(fb: FacebookInit) -> Result<Config, Error> {
multiplex_id,
..
}
| BlobConfig::MultiplexedWAL {
| BlobConfig::MultiplexedWal {
blobstores,
multiplex_id,
..

View File

@ -47,7 +47,7 @@ fn get_blobconfig(
if let Some(inner_blobstore_id) = inner_blobstore_id {
blob_config = match blob_config {
BlobConfig::Multiplexed { blobstores, .. }
| BlobConfig::MultiplexedWAL { blobstores, .. } => {
| BlobConfig::MultiplexedWal { blobstores, .. } => {
let required_id = BlobstoreId::new(inner_blobstore_id);
blobstores
.into_iter()

View File

@ -15,6 +15,7 @@ use metaconfig_types::MetadataDatabaseConfig;
use metaconfig_types::RemoteDatabaseConfig;
use metaconfig_types::RemoteMetadataDatabaseConfig;
use metaconfig_types::ShardableRemoteDatabaseConfig;
use metaconfig_types::ShardedDatabaseConfig;
use sql_ext::facebook::MysqlOptions;
use crate::construct::SqlConstruct;
@ -48,6 +49,37 @@ pub trait SqlConstructFromDatabaseConfig: FbSqlConstruct + SqlConstruct {
impl<T: SqlConstruct + FbSqlConstruct> SqlConstructFromDatabaseConfig for T {}
/// Trait that allows construction from sharded database config.
pub trait SqlConstructFromShardedDatabaseConfig: FbSqlShardedConstruct {
fn with_sharded_database_config(
fb: FacebookInit,
database_config: &ShardedDatabaseConfig,
mysql_options: &MysqlOptions,
readonly: bool,
) -> Result<Self> {
match database_config {
ShardedDatabaseConfig::Local(LocalDatabaseConfig { path }) => {
Self::with_sqlite_path(path.join("sqlite_dbs"), readonly)
}
ShardedDatabaseConfig::Remote(config) => Self::with_sharded_mysql(
fb,
config.shard_map.clone(),
config.shard_num.get(),
mysql_options,
readonly,
),
}
.with_context(|| {
format!(
"While connecting to {:?} (with options {:?})",
database_config, mysql_options
)
})
}
}
impl<T: FbSqlShardedConstruct> SqlConstructFromShardedDatabaseConfig for T {}
/// Trait that allows construction from the metadata database config.
pub trait SqlConstructFromMetadataDatabaseConfig: FbSqlConstruct + SqlConstruct {
fn with_metadata_database_config(

View File

@ -34,6 +34,7 @@ mod oss;
pub use config::SqlConstructFromDatabaseConfig;
pub use config::SqlConstructFromMetadataDatabaseConfig;
pub use config::SqlConstructFromShardedDatabaseConfig;
pub use config::SqlShardableConstructFromMetadataDatabaseConfig;
pub use construct::SqlConstruct;
pub use construct::SqlShardedConstruct;

View File

@ -16,9 +16,12 @@ use cached_config::ConfigStore;
use fbinit::FacebookInit;
use futures::try_join;
use metaconfig_types::BlobConfig;
use metaconfig_types::BlobstoreId;
use metaconfig_types::DatabaseConfig;
use metaconfig_types::MultiplexedStoreType;
#[cfg(fbcode_build)]
use metaconfig_types::ShardableRemoteDatabaseConfig;
use metaconfig_types::ShardedDatabaseConfig;
use metaconfig_types::StorageConfig;
use replication_lag_config::ReplicationLagBlobstoreConfig;
use replication_lag_config::ReplicationLagTableConfig;
@ -47,6 +50,28 @@ pub struct WaitForReplication {
const CONFIGS_PATH: &str = "scm/mononoke/mysql/replication_lag/config";
#[cfg(fbcode_build)]
fn blobstore_monitor(
my_admin: &MyAdmin,
blobstores: Vec<(BlobstoreId, MultiplexedStoreType, BlobConfig)>,
) -> Arc<dyn ReplicaLagMonitor> {
blobstores
.into_iter()
.find_map(|(_, _, config)| match config {
BlobConfig::Mysql {
remote: ShardableRemoteDatabaseConfig::Unsharded(remote),
} => Some(
Arc::new(my_admin.single_shard_lag_monitor(remote.db_address))
as Arc<dyn ReplicaLagMonitor>,
),
BlobConfig::Mysql {
remote: ShardableRemoteDatabaseConfig::Sharded(remote),
} => Some(Arc::new(my_admin.shardmap_lag_monitor(remote.shard_map))),
_ => None,
})
.unwrap_or_else(|| Arc::new(NoReplicaLagMonitor()))
}
impl WaitForReplication {
pub fn new(
fb: FacebookInit,
@ -61,32 +86,32 @@ impl WaitForReplication {
blobstores,
queue_db: DatabaseConfig::Remote(remote),
..
}
| BlobConfig::MultiplexedWAL {
blobstores,
queue_db: DatabaseConfig::Remote(remote),
..
} => {
#[cfg(fbcode_build)]
{
let my_admin = MyAdmin::new(fb)?;
let sync_queue = Arc::new(my_admin.single_shard_lag_monitor(remote.db_address))
as Arc<dyn ReplicaLagMonitor>;
let xdb_blobstore = blobstores
.into_iter()
.find_map(|(_, _, config)| match config {
BlobConfig::Mysql {
remote: ShardableRemoteDatabaseConfig::Unsharded(remote),
} => Some(
Arc::new(my_admin.single_shard_lag_monitor(remote.db_address))
as Arc<dyn ReplicaLagMonitor>,
),
BlobConfig::Mysql {
remote: ShardableRemoteDatabaseConfig::Sharded(remote),
} => Some(Arc::new(my_admin.shardmap_lag_monitor(remote.shard_map))),
_ => None,
})
.unwrap_or_else(|| Arc::new(NoReplicaLagMonitor()));
let xdb_blobstore = blobstore_monitor(&my_admin, blobstores);
(sync_queue, xdb_blobstore)
}
#[cfg(not(fbcode_build))]
{
let _ = (fb, remote, blobstores);
unimplemented!()
}
}
BlobConfig::MultiplexedWal {
blobstores,
queue_db: ShardedDatabaseConfig::Remote(remote),
..
} => {
#[cfg(fbcode_build)]
{
let my_admin = MyAdmin::new(fb)?;
let sync_queue = Arc::new(my_admin.shardmap_lag_monitor(remote.shard_map))
as Arc<dyn ReplicaLagMonitor>;
let xdb_blobstore = blobstore_monitor(&my_admin, blobstores);
(sync_queue, xdb_blobstore)
}
#[cfg(not(fbcode_build))]

View File

@ -29,6 +29,7 @@ use metaconfig_types::PackFormat;
use metaconfig_types::RemoteDatabaseConfig;
use metaconfig_types::RemoteMetadataDatabaseConfig;
use metaconfig_types::ShardableRemoteDatabaseConfig;
use metaconfig_types::ShardedDatabaseConfig;
use metaconfig_types::ShardedRemoteDatabaseConfig;
use metaconfig_types::StorageConfig;
use nonzero_ext::nonzero;
@ -45,6 +46,7 @@ use repos::RawEphemeralBlobstoreConfig;
use repos::RawFilestoreParams;
use repos::RawMetadataConfig;
use repos::RawMultiplexedStoreType;
use repos::RawShardedDbConfig;
use repos::RawStorageConfig;
use crate::convert::Convert;
@ -183,7 +185,7 @@ impl Convert for RawBlobstoreConfig {
));
}
BlobConfig::MultiplexedWAL {
BlobConfig::MultiplexedWal {
multiplex_id: MultiplexId::new(raw.multiplex_id),
blobstores: raw
.components
@ -343,6 +345,20 @@ impl Convert for RawDbConfig {
}
}
impl Convert for RawShardedDbConfig {
type Output = ShardedDatabaseConfig;
fn convert(self) -> Result<Self::Output> {
match self {
RawShardedDbConfig::local(raw) => Ok(ShardedDatabaseConfig::Local(raw.convert()?)),
RawShardedDbConfig::remote(raw) => Ok(ShardedDatabaseConfig::Remote(raw.convert()?)),
RawShardedDbConfig::UnknownField(f) => {
Err(anyhow!("unsupported database configuration ({})", f))
}
}
}
}
impl Convert for RawMetadataConfig {
type Output = MetadataDatabaseConfig;

View File

@ -861,7 +861,7 @@ pub enum BlobConfig {
queue_db: DatabaseConfig,
},
/// Multiplex across multiple blobstores for redundancy based on a WAL approach
MultiplexedWAL {
MultiplexedWal {
/// A unique ID that identifies this multiplex configuration
multiplex_id: MultiplexId,
/// Set of blobstores being multiplexed over
@ -869,7 +869,7 @@ pub enum BlobConfig {
/// The number of writes that must succeed for the multiplex `put` to succeed
write_quorum: usize,
/// DB config to use for the WAL
queue_db: DatabaseConfig,
queue_db: ShardedDatabaseConfig,
/// A scuba table to log stats per inner blobstore
scuba_table: Option<String>,
/// 1 in scuba_sample_rate samples will be logged for both
@ -927,7 +927,7 @@ impl BlobConfig {
match self {
Disabled | Files { .. } | Sqlite { .. } => true,
Manifold { .. } | Mysql { .. } | ManifoldWithTtl { .. } | S3 { .. } => false,
Multiplexed { blobstores, .. } | MultiplexedWAL { blobstores, .. } => blobstores
Multiplexed { blobstores, .. } | MultiplexedWal { blobstores, .. } => blobstores
.iter()
.map(|(_, _, config)| config)
.all(BlobConfig::is_local),
@ -943,7 +943,7 @@ impl BlobConfig {
ref mut scuba_sample_rate,
..
}
| Self::MultiplexedWAL {
| Self::MultiplexedWal {
ref mut scuba_sample_rate,
..
}
@ -1017,6 +1017,15 @@ impl DatabaseConfig {
}
}
/// Configuration for a sharded database
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub enum ShardedDatabaseConfig {
/// Local SQLite database
Local(LocalDatabaseConfig),
/// Remote MySQL sharded database
Remote(ShardedRemoteDatabaseConfig),
}
/// Configuration for the Metadata database when it is remote.
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct RemoteMetadataDatabaseConfig {

View File

@ -54,7 +54,7 @@ fn get_blobconfig(blob_config: BlobConfig, inner_blobstore_id: Option<u64>) -> R
None => Ok(blob_config),
Some(inner_blobstore_id) => match blob_config {
BlobConfig::Multiplexed { blobstores, .. }
| BlobConfig::MultiplexedWAL { blobstores, .. } => {
| BlobConfig::MultiplexedWal { blobstores, .. } => {
let seeked_id = BlobstoreId::new(inner_blobstore_id);
blobstores
.into_iter()

View File

@ -150,7 +150,7 @@ pub fn replace_blobconfig(
) -> Result<(), Error> {
match blob_config {
BlobConfig::Multiplexed { ref blobstores, .. }
| BlobConfig::MultiplexedWAL { ref blobstores, .. } => {
| BlobConfig::MultiplexedWal { ref blobstores, .. } => {
if is_scrubbing {
// Make sure the repair stats are set to zero for each store.
// Without this the new stats only show up when a repair is