Thread ConfigStore into blobstore creation

Summary: SQLBlob GC (next diff in stack) will need a ConfigStore in SQLBlob. Make one available to blobstore creation

Reviewed By: krallin

Differential Revision: D24460586

fbshipit-source-id: ea2d5149e0c548844f1fd2a0d241ed0647e137ae
This commit is contained in:
Simon Farnsworth 2020-10-27 04:12:57 -07:00 committed by Facebook GitHub Bot
parent 7b278b8bed
commit 4e59e26775
25 changed files with 145 additions and 84 deletions

View File

@ -147,6 +147,7 @@ throttledblob = { path = "blobstore/throttledblob" }
unbundle = { path = "repo_client/unbundle" }
unodes = { path = "derived_data/unodes" }
xdiff = { path = "../scm/lib/xdiff" }
cached_config = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
cachelib = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
failure_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }

View File

@ -46,6 +46,7 @@ sql_ext = { path = "../../common/rust/sql_ext" }
type_map = { path = "../../common/type_map" }
unodes = { path = "../../derived_data/unodes" }
virtually_sharded_blobstore = { path = "../../blobstore/virtually_sharded_blobstore" }
cached_config = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
cachelib = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
sql = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }

View File

@ -19,6 +19,7 @@ use cacheblob::{
new_cachelib_blobstore_no_lease, new_memcache_blobstore, CachelibBlobstoreOptions,
InProcessLease, LeaseOps, MemcacheOps,
};
use cached_config::ConfigStore;
use changeset_fetcher::{ChangesetFetcher, SimpleChangesetFetcher};
use changeset_info::ChangesetInfo;
use changesets::{CachingChangesets, Changesets, SqlChangesets};
@ -91,6 +92,7 @@ pub struct BlobrepoBuilder<'a> {
logger: &'a Logger,
derived_data_config: DerivedDataConfig,
segmented_changelog_config: SegmentedChangelogConfig,
config_store: &'a ConfigStore,
}
impl<'a> BlobrepoBuilder<'a> {
@ -104,6 +106,7 @@ impl<'a> BlobrepoBuilder<'a> {
readonly_storage: ReadOnlyStorage,
blobstore_options: BlobstoreOptions,
logger: &'a Logger,
config_store: &'a ConfigStore,
) -> Self {
Self {
fb,
@ -121,6 +124,7 @@ impl<'a> BlobrepoBuilder<'a> {
logger,
derived_data_config: config.derived_data_config.clone(),
segmented_changelog_config: config.segmented_changelog_config.clone(),
config_store,
}
}
@ -151,6 +155,7 @@ impl<'a> BlobrepoBuilder<'a> {
logger,
derived_data_config,
segmented_changelog_config,
config_store,
} = self;
let sql_factory = make_metadata_sql_factory(
@ -170,6 +175,7 @@ impl<'a> BlobrepoBuilder<'a> {
readonly_storage,
&blobstore_options,
&logger,
config_store,
);
let (sql_factory, blobstore) = future::try_join(sql_factory, blobstore).await?;

View File

@ -22,6 +22,7 @@ sql_construct = { path = "../../common/sql_construct" }
sql_ext = { path = "../../common/rust/sql_ext" }
sqlblob = { path = "../sqlblob" }
throttledblob = { path = "../throttledblob" }
cached_config = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
scuba = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }

View File

@ -11,6 +11,7 @@ use blobstore::{
};
use blobstore_sync_queue::SqlBlobstoreSyncQueue;
use cacheblob::CachelibBlobstoreOptions;
use cached_config::ConfigStore;
use chaosblob::{ChaosBlobstore, ChaosOptions};
use fbinit::FacebookInit;
use fileblob::Fileblob;
@ -92,6 +93,7 @@ pub fn make_blobstore<'a>(
readonly_storage: ReadOnlyStorage,
blobstore_options: &'a BlobstoreOptions,
logger: &'a Logger,
config_store: &'a ConfigStore,
) -> BoxFuture<'a, Result<Arc<dyn Blobstore>, Error>> {
async move {
let store = make_blobstore_put_ops(
@ -101,6 +103,7 @@ pub fn make_blobstore<'a>(
readonly_storage,
blobstore_options,
logger,
config_store,
)
.await?;
// Workaround for trait A {} trait B:A {} but Arc<dyn B> is not a Arc<dyn A>
@ -119,6 +122,7 @@ pub fn make_blobstore_put_ops<'a>(
readonly_storage: ReadOnlyStorage,
blobstore_options: &'a BlobstoreOptions,
logger: &'a Logger,
config_store: &'a ConfigStore,
) -> BoxFuture<'a, Result<Arc<dyn BlobstorePutOps>, Error>> {
// NOTE: This needs to return a BoxFuture because it recurses.
async move {
@ -148,6 +152,7 @@ pub fn make_blobstore_put_ops<'a>(
readonly_storage,
blobstore_options,
logger,
config_store,
)
.await?
}
@ -177,6 +182,7 @@ pub fn make_blobstore_put_ops<'a>(
readonly_storage,
blobstore_options,
logger,
config_store,
)
.await?
}
@ -296,6 +302,7 @@ pub fn make_blobstore_put_ops<'a>(
readonly_storage,
&blobstore_options,
logger,
config_store,
)
.await?;
@ -353,6 +360,7 @@ pub fn make_blobstore_put_ops<'a>(
readonly_storage,
&blobstore_options,
logger,
config_store,
)
.await?;
@ -417,7 +425,7 @@ pub fn make_blobstore_put_ops<'a>(
.boxed()
}
pub fn make_blobstore_multiplexed<'a>(
pub async fn make_blobstore_multiplexed<'a>(
fb: FacebookInit,
multiplex_id: MultiplexId,
queue_db: DatabaseConfig,
@ -430,98 +438,98 @@ pub fn make_blobstore_multiplexed<'a>(
readonly_storage: ReadOnlyStorage,
blobstore_options: &'a BlobstoreOptions,
logger: &'a Logger,
) -> impl Future<Output = Result<Arc<dyn BlobstorePutOps>, Error>> + 'a {
async move {
let component_readonly = match &scrub_args {
// Need to write to components to repair them.
Some((_, ScrubAction::Repair)) => ReadOnlyStorage(false),
_ => readonly_storage,
};
config_store: &'a ConfigStore,
) -> Result<Arc<dyn BlobstorePutOps>, Error> {
let component_readonly = match &scrub_args {
// Need to write to components to repair them.
Some((_, ScrubAction::Repair)) => ReadOnlyStorage(false),
_ => readonly_storage,
};
let mut applied_chaos = false;
let mut applied_chaos = false;
let components = future::try_join_all(inner_config.into_iter().map({
move |(blobstoreid, store_type, config)| {
let mut blobstore_options = blobstore_options.clone();
let components = future::try_join_all(inner_config.into_iter().map({
move |(blobstoreid, store_type, config)| {
let mut blobstore_options = blobstore_options.clone();
if blobstore_options.chaos_options.has_chaos() {
if applied_chaos {
blobstore_options = BlobstoreOptions {
chaos_options: ChaosOptions::new(None, None),
..blobstore_options
};
} else {
applied_chaos = true;
}
}
async move {
let store = make_blobstore_put_ops(
fb,
config,
mysql_options,
component_readonly,
&blobstore_options,
logger,
)
.await?;
Ok((blobstoreid, store_type, store))
if blobstore_options.chaos_options.has_chaos() {
if applied_chaos {
blobstore_options = BlobstoreOptions {
chaos_options: ChaosOptions::new(None, None),
..blobstore_options
};
} else {
applied_chaos = true;
}
}
}));
let queue = SqlBlobstoreSyncQueue::with_database_config(
fb,
&queue_db,
mysql_options,
readonly_storage.0,
);
async move {
let store = make_blobstore_put_ops(
fb,
config,
mysql_options,
component_readonly,
&blobstore_options,
logger,
config_store,
)
.await?;
let (components, queue) = future::try_join(components, queue).await?;
Ok((blobstoreid, store_type, store))
}
}
}));
// For now, `partition` could do this, but this will be easier to extend when we introduce more store types
let (normal_components, write_mostly_components) = {
let mut normal_components = vec![];
let mut write_mostly_components = vec![];
for (blobstore_id, store_type, store) in components.into_iter() {
match store_type {
MultiplexedStoreType::Normal => normal_components.push((blobstore_id, store)),
MultiplexedStoreType::WriteMostly => {
write_mostly_components.push((blobstore_id, store))
}
let queue = SqlBlobstoreSyncQueue::with_database_config(
fb,
&queue_db,
mysql_options,
readonly_storage.0,
);
let (components, queue) = future::try_join(components, queue).await?;
// For now, `partition` could do this, but this will be easier to extend when we introduce more store types
let (normal_components, write_mostly_components) = {
let mut normal_components = vec![];
let mut write_mostly_components = vec![];
for (blobstore_id, store_type, store) in components.into_iter() {
match store_type {
MultiplexedStoreType::Normal => normal_components.push((blobstore_id, store)),
MultiplexedStoreType::WriteMostly => {
write_mostly_components.push((blobstore_id, store))
}
}
(normal_components, write_mostly_components)
};
}
(normal_components, write_mostly_components)
};
let blobstore = match scrub_args {
Some((scrub_handler, scrub_action)) => Arc::new(ScrubBlobstore::new(
multiplex_id,
normal_components,
write_mostly_components,
minimum_successful_writes,
Arc::new(queue),
scuba_table.map_or(ScubaSampleBuilder::with_discard(), |table| {
ScubaSampleBuilder::new(fb, table)
}),
scuba_sample_rate,
scrub_handler,
scrub_action,
)) as Arc<dyn BlobstorePutOps>,
None => Arc::new(MultiplexedBlobstore::new(
multiplex_id,
normal_components,
write_mostly_components,
minimum_successful_writes,
Arc::new(queue),
scuba_table.map_or(ScubaSampleBuilder::with_discard(), |table| {
ScubaSampleBuilder::new(fb, table)
}),
scuba_sample_rate,
)) as Arc<dyn BlobstorePutOps>,
};
let blobstore = match scrub_args {
Some((scrub_handler, scrub_action)) => Arc::new(ScrubBlobstore::new(
multiplex_id,
normal_components,
write_mostly_components,
minimum_successful_writes,
Arc::new(queue),
scuba_table.map_or(ScubaSampleBuilder::with_discard(), |table| {
ScubaSampleBuilder::new(fb, table)
}),
scuba_sample_rate,
scrub_handler,
scrub_action,
)) as Arc<dyn BlobstorePutOps>,
None => Arc::new(MultiplexedBlobstore::new(
multiplex_id,
normal_components,
write_mostly_components,
minimum_successful_writes,
Arc::new(queue),
scuba_table.map_or(ScubaSampleBuilder::with_discard(), |table| {
ScubaSampleBuilder::new(fb, table)
}),
scuba_sample_rate,
)) as Arc<dyn BlobstorePutOps>,
};
Ok(blobstore)
}
Ok(blobstore)
}

View File

@ -1025,6 +1025,7 @@ fn open_repo_internal_with_repo_id<'a>(
readonly_storage,
blobstore_options,
&logger,
config_store,
);
if let Some(redaction_override) = redaction_override {
builder.set_redaction(redaction_override);

View File

@ -19,6 +19,7 @@ use futures_old::prelude::*;
use blobstore::{Blobstore, BlobstoreGetData};
use blobstore_factory::{make_blobstore, BlobstoreOptions, ReadOnlyStorage};
use cacheblob::{new_memcache_blobstore, CacheBlobstoreExt};
use cached_config::ConfigStore;
use cmdlib::args;
use context::CoreContext;
use futures_old::future;
@ -142,6 +143,7 @@ async fn get_blobstore(
logger: Logger,
readonly_storage: ReadOnlyStorage,
blobstore_options: BlobstoreOptions,
config_store: &ConfigStore,
) -> Result<Arc<dyn Blobstore>, Error> {
let blobconfig = get_blobconfig(storage_config.blobstore, inner_blobstore_id, scrub_action)?;
@ -152,6 +154,7 @@ async fn get_blobstore(
readonly_storage,
&blobstore_options,
&logger,
config_store,
)
.await
}
@ -184,6 +187,7 @@ pub async fn subcommand_blobstore_fetch<'a>(
logger.clone(),
readonly_storage,
blobstore_options,
config_store,
);
let common_config = args::load_common_config(config_store, &matches)?;

View File

@ -117,6 +117,7 @@ fn main(fb: fbinit::FacebookInit) {
blobstore_factory::ReadOnlyStorage(false),
&blobstore_options,
&logger,
config_store,
)
.await
.expect("Could not make blobstore");

View File

@ -15,6 +15,7 @@ use anyhow::{bail, format_err, Context, Error, Result};
use blobstore::Blobstore;
use blobstore_factory::{make_blobstore, BlobstoreOptions, ReadOnlyStorage};
use blobstore_sync_queue::{BlobstoreSyncQueue, SqlBlobstoreSyncQueue};
use cached_config::ConfigStore;
use chrono::Duration as ChronoDuration;
use clap::{value_t, App, Arg};
use cmdlib::{
@ -65,6 +66,7 @@ async fn maybe_schedule_healer_for_storage(
blobstore_options: &BlobstoreOptions,
iter_limit: Option<u64>,
heal_min_age: ChronoDuration,
config_store: &ConfigStore,
) -> Result<(), Error> {
let (blobstore_configs, multiplex_id, queue_db) = match storage_config.blobstore {
BlobConfig::Multiplexed {
@ -110,6 +112,7 @@ async fn maybe_schedule_healer_for_storage(
readonly_storage,
blobstore_options,
ctx.logger(),
config_store,
)
.await?;
@ -311,6 +314,7 @@ fn main(fb: FacebookInit) -> Result<()> {
&blobstore_options,
iter_limit,
healing_min_age,
config_store,
);
block_execute(

View File

@ -8,6 +8,7 @@
use std::sync::Arc;
use anyhow::{Error, Result};
use cached_config::ConfigStore;
use fbinit::FacebookInit;
use slog::Logger;
@ -22,6 +23,7 @@ pub async fn open_blobstore(
mysql_options: MysqlOptions,
blobstore_options: &BlobstoreOptions,
logger: &Logger,
config_store: &ConfigStore,
) -> Result<Arc<dyn Blobstore>> {
storage_config.blobstore.set_scrubbed(ScrubAction::Repair);
@ -32,6 +34,7 @@ pub async fn open_blobstore(
blobstore_factory::ReadOnlyStorage(false),
blobstore_options,
logger,
config_store,
)
.await
.map_err(Error::from)

View File

@ -134,6 +134,7 @@ fn main(fb: fbinit::FacebookInit) -> Result<()> {
mysql_options,
&blobstore_options,
&logger,
config_store,
)
.await?;

View File

@ -11,6 +11,7 @@ use anyhow::{anyhow, Error};
use blobstore::Blobstore;
use blobstore_factory::{make_blobstore, BlobstoreOptions, ReadOnlyStorage};
use cacheblob::new_memcache_blobstore;
use cached_config::ConfigStore;
use clap::{Arg, ArgMatches};
use cmdlib::{args, helpers};
use context::CoreContext;
@ -118,6 +119,7 @@ async fn run<'a>(ctx: CoreContext, matches: &'a ArgMatches<'a>) -> Result<(), Er
config,
mysql_options,
blobstore_options.clone(),
config_store,
)
.await?;
warmers.push(warmer);
@ -152,6 +154,7 @@ impl StreamingCloneWarmup {
config: &RepoConfig,
mysql_options: MysqlOptions,
blobstore_options: BlobstoreOptions,
config_store: &ConfigStore,
) -> Result<Self, Error> {
// Create blobstore that contains streaming clone chunks, without cachelib
// layer (we want to hit memcache even if it is available in cachelib), and
@ -163,6 +166,7 @@ impl StreamingCloneWarmup {
ReadOnlyStorage(true),
&blobstore_options,
ctx.logger(),
config_store,
)
.await?;
let blobstore = new_memcache_blobstore(ctx.fb, blobstore, "multiplexed", "")?;

View File

@ -291,6 +291,7 @@ async fn bootstrap_repositories<'a>(
readonly_storage,
remote_args_blobstore_options,
&logger,
config_store,
)
});
@ -318,6 +319,7 @@ async fn bootstrap_repositories<'a>(
readonly_storage,
blobstore_options.clone(),
false, // Don't record infinitepush writes
config_store,
)
.await?
.finalize(noop_hook_manager.clone())

View File

@ -139,6 +139,7 @@ async fn run_hook_tailer<'a>(
readonly_storage,
cmdlib::args::parse_blobstore_options(&matches),
&logger,
config_store,
);
let blobrepo = builder.build().await?;

View File

@ -279,6 +279,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
readonly_storage,
blobstore_options,
&logger,
config_store,
);
let hipster_acl = config.hipster_acl.as_ref();

View File

@ -82,6 +82,7 @@ async fn do_main<'a>(
let readonly_storage = cmdlib::args::parse_readonly_storage(&matches);
let blobstore_options = cmdlib::args::parse_blobstore_options(&matches);
let caching = cmdlib::args::init_cachelib(fb, &matches, None);
let config_store = cmdlib::args::init_config_store(fb, logger, matches)?;
let RepoConfigs { repos, common } = args::load_repo_configs(config_store, &matches)?;
let censored_scuba_params = common.censored_scuba_params;
@ -125,6 +126,7 @@ async fn do_main<'a>(
readonly_storage,
blobstore_options,
&logger,
config_store,
);
let repo = builder.build().await?;

View File

@ -186,6 +186,7 @@ impl Repo {
readonly_storage,
blobstore_options,
&logger,
config_store,
);
let blob_repo = builder.build().await?;

View File

@ -23,6 +23,7 @@ skiplist = { path = "../../reachabilityindex/skiplist" }
sql_construct = { path = "../../common/sql_construct" }
sql_ext = { path = "../../common/rust/sql_ext" }
streaming_clone = { path = "../streaming_clone" }
cached_config = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }

View File

@ -8,6 +8,7 @@
use anyhow::{Context, Error};
use blobrepo::BlobRepo;
use blobrepo_factory::{BlobrepoBuilder, BlobstoreOptions, Caching, ReadOnlyStorage};
use cached_config::ConfigStore;
use context::CoreContext;
use futures::{compat::Future01CompatExt, future, FutureExt};
use hooks::HookManager;
@ -43,6 +44,7 @@ impl MononokeRepoBuilder {
readonly_storage: ReadOnlyStorage,
blobstore_options: BlobstoreOptions,
record_infinitepush_writes: bool,
config_store: &ConfigStore,
) -> Result<MononokeRepoBuilder, Error> {
let builder = BlobrepoBuilder::new(
ctx.fb,
@ -54,6 +56,7 @@ impl MononokeRepoBuilder {
readonly_storage,
blobstore_options.clone(),
ctx.logger(),
config_store,
);
let repo = builder.build().await?;

View File

@ -48,7 +48,7 @@ pub async fn create_repo_listeners(
tls_acceptor: SslAcceptor,
service: ReadyFlagService,
terminate_process: oneshot::Receiver<()>,
config_store: &ConfigStore,
config_store: &'static ConfigStore,
readonly_storage: ReadOnlyStorage,
blobstore_options: BlobstoreOptions,
scribe: Scribe,
@ -63,6 +63,7 @@ pub async fn create_repo_listeners(
readonly_storage,
blobstore_options,
&root_log,
config_store,
)
.compat()
.await?;

View File

@ -14,6 +14,7 @@ use blobrepo::BlobRepo;
use blobrepo_factory::{BlobstoreOptions, Caching, ReadOnlyStorage};
use blobstore_factory::make_blobstore;
use cache_warmup::cache_warmup;
use cached_config::ConfigStore;
use cloned::cloned;
use context::CoreContext;
use derived_data::BonsaiDerived;
@ -160,6 +161,7 @@ pub fn repo_handlers(
readonly_storage: ReadOnlyStorage,
blobstore_options: BlobstoreOptions,
root_log: &Logger,
config_store: &'static ConfigStore,
) -> BoxFuture<HashMap<String, RepoHandler>, Error> {
// compute eagerly to avoid lifetime issues
let repo_futs: Vec<BoxFuture<(String, IncompleteRepoHandler), Error>> = repos
@ -214,6 +216,7 @@ pub fn repo_handlers(
readonly_storage,
blobstore_options,
record_infinitepush_writes,
config_store,
)
.await?;
@ -264,6 +267,7 @@ pub fn repo_handlers(
readonly_storage,
wireproto_logging,
logger.clone(),
config_store,
)
.compat();
@ -415,6 +419,7 @@ fn create_wireproto_logging(
readonly_storage: ReadOnlyStorage,
wireproto_logging_config: WireprotoLoggingConfig,
logger: Logger,
config_store: &'static ConfigStore,
) -> impl Future<Item = WireprotoLogging, Error = Error> {
let WireprotoLoggingConfig {
storage_config_and_threshold,
@ -438,6 +443,7 @@ fn create_wireproto_logging(
readonly_storage,
&Default::default(),
&logger,
config_store,
)
.await?;

View File

@ -416,6 +416,7 @@ async fn do_main(
readonly_storage,
blobstore_options,
&logger,
config_store,
)
.build()
.await?;

View File

@ -33,6 +33,7 @@ samplingblob = { path = "../blobstore/samplingblob" }
scuba_ext = { path = "../common/scuba_ext" }
sql_ext = { path = "../common/rust/sql_ext" }
async_compression = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
cached_config = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }

View File

@ -12,6 +12,7 @@ use blobstore::{Blobstore, BlobstoreMetadata};
use blobstore_factory::{
make_blobstore_multiplexed, make_blobstore_put_ops, BlobstoreOptions, ReadOnlyStorage,
};
use cached_config::ConfigStore;
use context::CoreContext;
use fbinit::FacebookInit;
use inlinable_string::InlinableString;
@ -153,6 +154,7 @@ pub async fn open_blobstore(
repo_stats_key: String,
blobstore_options: BlobstoreOptions,
logger: Logger,
config_store: &ConfigStore,
) -> Result<Arc<dyn Blobstore>, Error> {
let mut blobconfig = get_blobconfig(blob_config, inner_blobstore_id)?;
let scrub_handler = scrub_action.map(|scrub_action| {
@ -200,6 +202,7 @@ pub async fn open_blobstore(
readonly_storage,
&blobstore_options,
&logger,
config_store,
)
.await?
}
@ -227,6 +230,7 @@ pub async fn open_blobstore(
readonly_storage,
&blobstore_options,
&logger,
config_store,
)
.await?
}
@ -238,6 +242,7 @@ pub async fn open_blobstore(
readonly_storage,
&blobstore_options,
&logger,
config_store,
)
.await?
}

View File

@ -813,6 +813,7 @@ pub fn setup_common<'a>(
repo_name.clone(),
blobstore_options.clone(),
logger.clone(),
config_store,
);
let sql_factory = make_metadata_sql_factory(