Add a write-mostly blobstore mode for populating blobstores

Summary:
We're going to add an SQL blobstore to our existing multiplex, which won't have all the blobs initially.

In order to populate it safely, we want to have normal operations filling it with the latest data, and then backfill from Manifold; once we're confident all the data is in here, we can switch to normal mode, and never have an excessive number of reads of blobs that we know aren't in the new blobstore.

Reviewed By: krallin

Differential Revision: D22820501

fbshipit-source-id: 5f1c78ad94136b97ae3ac273a83792ab9ac591a9
This commit is contained in:
Simon Farnsworth 2020-08-03 04:34:30 -07:00 committed by Facebook GitHub Bot
parent 578207d0dc
commit a9b8793d2d
13 changed files with 495 additions and 39 deletions

View File

@ -1,4 +1,4 @@
// @generated SignedSource<<2d5fffd0140b4cfc55b254c5e4df7f1b>> // @generated SignedSource<<b596766fc0a1f4dd8098ac13ed9bdfc3>>
// DO NOT EDIT THIS FILE MANUALLY! // DO NOT EDIT THIS FILE MANUALLY!
// This file is a mechanical copy of the version in the configerator repo. To // This file is a mechanical copy of the version in the configerator repo. To
// modify it, edit the copy in the configerator repo instead and copy it over by // modify it, edit the copy in the configerator repo instead and copy it over by
@ -161,6 +161,9 @@ struct RawBlobstoreMultiplexed {
3: optional i64 scuba_sample_rate, 3: optional i64 scuba_sample_rate,
4: optional i32 multiplex_id, 4: optional i32 multiplex_id,
5: optional RawDbConfig queue_db, 5: optional RawDbConfig queue_db,
// The number of components that must successfully `put` a blob before the
// multiplex as a whole claims that it successfully `put` the blob
6: optional i64 minimum_successful_writes,
} }
struct RawBlobstoreManifoldWithTtl { struct RawBlobstoreManifoldWithTtl {
1: string manifold_bucket, 1: string manifold_bucket,
@ -195,9 +198,22 @@ union RawBlobstoreConfig {
10: RawBlobstorePack pack, 10: RawBlobstorePack pack,
} }
// A write-mostly blobstore is one that is not read from in normal operation.
// Mononoke will read from it in two cases:
// 1. Verifying that data is present in all blobstores (scrub etc)
// 2. Where all "normal" (not write-mostly) blobstores fail to return a blob (error or missing)
union RawMultiplexedStoreType {
1: RawMultiplexedStoreNormal normal,
2: RawMultiplexedStoreWriteMostly write_mostly,
}
struct RawMultiplexedStoreNormal {}
struct RawMultiplexedStoreWriteMostly {}
struct RawBlobstoreIdConfig { struct RawBlobstoreIdConfig {
1: i64 blobstore_id, 1: i64 blobstore_id,
2: RawBlobstoreConfig blobstore, 2: RawBlobstoreConfig blobstore,
3: optional RawMultiplexedStoreType store_type,
} }
struct RawDbLocal { struct RawDbLocal {

View File

@ -18,7 +18,7 @@ use futures::{
}; };
use logblob::LogBlob; use logblob::LogBlob;
use metaconfig_types::{ use metaconfig_types::{
BlobConfig, BlobstoreId, DatabaseConfig, MultiplexId, ScrubAction, BlobConfig, BlobstoreId, DatabaseConfig, MultiplexId, MultiplexedStoreType, ScrubAction,
ShardableRemoteDatabaseConfig, ShardableRemoteDatabaseConfig,
}; };
use multiplexedblob::{LoggingScrubHandler, MultiplexedBlobstore, ScrubBlobstore, ScrubHandler}; use multiplexedblob::{LoggingScrubHandler, MultiplexedBlobstore, ScrubBlobstore, ScrubHandler};
@ -318,7 +318,7 @@ pub fn make_blobstore_multiplexed<'a>(
queue_db: DatabaseConfig, queue_db: DatabaseConfig,
scuba_table: Option<String>, scuba_table: Option<String>,
scuba_sample_rate: NonZeroU64, scuba_sample_rate: NonZeroU64,
inner_config: Vec<(BlobstoreId, BlobConfig)>, inner_config: Vec<(BlobstoreId, MultiplexedStoreType, BlobConfig)>,
scrub_args: Option<(Arc<dyn ScrubHandler>, ScrubAction)>, scrub_args: Option<(Arc<dyn ScrubHandler>, ScrubAction)>,
mysql_options: MysqlOptions, mysql_options: MysqlOptions,
readonly_storage: ReadOnlyStorage, readonly_storage: ReadOnlyStorage,
@ -335,7 +335,7 @@ pub fn make_blobstore_multiplexed<'a>(
let mut applied_chaos = false; let mut applied_chaos = false;
let components = future::try_join_all(inner_config.into_iter().map({ let components = future::try_join_all(inner_config.into_iter().map({
move |(blobstoreid, config)| { move |(blobstoreid, store_type, config)| {
let mut blobstore_options = blobstore_options.clone(); let mut blobstore_options = blobstore_options.clone();
if blobstore_options.chaos_options.has_chaos() { if blobstore_options.chaos_options.has_chaos() {
@ -360,7 +360,7 @@ pub fn make_blobstore_multiplexed<'a>(
) )
.await?; .await?;
Ok((blobstoreid, store)) Ok((blobstoreid, store_type, store))
} }
} }
})); }));
@ -374,10 +374,26 @@ pub fn make_blobstore_multiplexed<'a>(
let (components, queue) = future::try_join(components, queue).await?; let (components, queue) = future::try_join(components, queue).await?;
// For now, `partition` could do this, but this will be easier to extend when we introduce more store types
let (normal_components, write_mostly_components) = {
let mut normal_components = vec![];
let mut write_mostly_components = vec![];
for (blobstore_id, store_type, store) in components.into_iter() {
match store_type {
MultiplexedStoreType::Normal => normal_components.push((blobstore_id, store)),
MultiplexedStoreType::WriteMostly => {
write_mostly_components.push((blobstore_id, store))
}
}
}
(normal_components, write_mostly_components)
};
let blobstore = match scrub_args { let blobstore = match scrub_args {
Some((scrub_handler, scrub_action)) => Arc::new(ScrubBlobstore::new( Some((scrub_handler, scrub_action)) => Arc::new(ScrubBlobstore::new(
multiplex_id, multiplex_id,
components, normal_components,
write_mostly_components,
Arc::new(queue), Arc::new(queue),
scuba_table.map_or(ScubaSampleBuilder::with_discard(), |table| { scuba_table.map_or(ScubaSampleBuilder::with_discard(), |table| {
ScubaSampleBuilder::new(fb, table) ScubaSampleBuilder::new(fb, table)
@ -388,7 +404,8 @@ pub fn make_blobstore_multiplexed<'a>(
)) as Arc<dyn Blobstore>, )) as Arc<dyn Blobstore>,
None => Arc::new(MultiplexedBlobstore::new( None => Arc::new(MultiplexedBlobstore::new(
multiplex_id, multiplex_id,
components, normal_components,
write_mostly_components,
Arc::new(queue), Arc::new(queue),
scuba_table.map_or(ScubaSampleBuilder::with_discard(), |table| { scuba_table.map_or(ScubaSampleBuilder::with_discard(), |table| {
ScubaSampleBuilder::new(fb, table) ScubaSampleBuilder::new(fb, table)

View File

@ -75,7 +75,19 @@ pub trait MultiplexedBlobstorePutHandler: Send + Sync {
pub struct MultiplexedBlobstoreBase { pub struct MultiplexedBlobstoreBase {
multiplex_id: MultiplexId, multiplex_id: MultiplexId,
/// These are the "normal" blobstores, which are read from on `get`, and written to on `put`
/// as part of normal operation. No special treatment is applied.
blobstores: Arc<[(BlobstoreId, Arc<dyn Blobstore>)]>, blobstores: Arc<[(BlobstoreId, Arc<dyn Blobstore>)]>,
/// Write-mostly blobstores are not normally read from on `get`, but take part in writes
/// like a normal blobstore.
///
/// There are two circumstances in which a write-mostly blobstore will be read from on `get`:
/// 1. The normal blobstores (above) all return Ok(None) or Err for a blob.
/// In this case, we read as it's our only chance of returning data that we previously accepted
/// during a `put` operation.
/// 2. When we're recording blobstore stats to Scuba on a `get` - in this case, the read executes
/// solely to gather statistics, and the result is discarded
write_mostly_blobstores: Arc<[(BlobstoreId, Arc<dyn Blobstore>)]>,
handler: Arc<dyn MultiplexedBlobstorePutHandler>, handler: Arc<dyn MultiplexedBlobstorePutHandler>,
scuba: ScubaSampleBuilder, scuba: ScubaSampleBuilder,
scuba_sample_rate: NonZeroU64, scuba_sample_rate: NonZeroU64,
@ -85,6 +97,7 @@ impl MultiplexedBlobstoreBase {
pub fn new( pub fn new(
multiplex_id: MultiplexId, multiplex_id: MultiplexId,
blobstores: Vec<(BlobstoreId, Arc<dyn Blobstore>)>, blobstores: Vec<(BlobstoreId, Arc<dyn Blobstore>)>,
write_mostly_blobstores: Vec<(BlobstoreId, Arc<dyn Blobstore>)>,
handler: Arc<dyn MultiplexedBlobstorePutHandler>, handler: Arc<dyn MultiplexedBlobstorePutHandler>,
mut scuba: ScubaSampleBuilder, mut scuba: ScubaSampleBuilder,
scuba_sample_rate: NonZeroU64, scuba_sample_rate: NonZeroU64,
@ -94,6 +107,7 @@ impl MultiplexedBlobstoreBase {
Self { Self {
multiplex_id, multiplex_id,
blobstores: blobstores.into(), blobstores: blobstores.into(),
write_mostly_blobstores: write_mostly_blobstores.into(),
handler, handler,
scuba, scuba,
scuba_sample_rate, scuba_sample_rate,
@ -108,13 +122,22 @@ impl MultiplexedBlobstoreBase {
let mut scuba = self.scuba.clone(); let mut scuba = self.scuba.clone();
scuba.sampled(self.scuba_sample_rate); scuba.sampled(self.scuba_sample_rate);
let results = join_all(multiplexed_get( let results = join_all(
ctx, multiplexed_get(
self.blobstores.as_ref(), ctx,
key, self.blobstores.as_ref(),
OperationType::ScrubGet, key,
scuba, OperationType::ScrubGet,
)) scuba.clone(),
)
.chain(multiplexed_get(
ctx,
self.write_mostly_blobstores.as_ref(),
key,
OperationType::ScrubGet,
scuba,
)),
)
.await; .await;
let (successes, errors): (HashMap<_, _>, HashMap<_, _>) = let (successes, errors): (HashMap<_, _>, HashMap<_, _>) =
@ -206,11 +229,12 @@ pub async fn inner_put(
async fn blobstore_get( async fn blobstore_get(
ctx: CoreContext, ctx: CoreContext,
blobstores: Arc<[(BlobstoreId, Arc<dyn Blobstore>)]>, blobstores: Arc<[(BlobstoreId, Arc<dyn Blobstore>)]>,
write_mostly_blobstores: Arc<[(BlobstoreId, Arc<dyn Blobstore>)]>,
key: String, key: String,
scuba: ScubaSampleBuilder, scuba: ScubaSampleBuilder,
) -> Result<Option<BlobstoreGetData>, Error> { ) -> Result<Option<BlobstoreGetData>, Error> {
let is_logged = scuba.sampling().is_logged(); let is_logged = scuba.sampling().is_logged();
let blobstores_count = blobstores.len(); let blobstores_count = blobstores.len() + write_mostly_blobstores.len();
let (stats, result) = { let (stats, result) = {
let ctx = &ctx; let ctx = &ctx;
@ -219,20 +243,33 @@ async fn blobstore_get(
ctx.perf_counters() ctx.perf_counters()
.increment_counter(PerfCounterType::BlobGets); .increment_counter(PerfCounterType::BlobGets);
let mut requests: FuturesUnordered<_> = multiplexed_get( let main_requests: FuturesUnordered<_> = multiplexed_get(
ctx.clone(), ctx.clone(),
blobstores.as_ref(), blobstores.as_ref(),
&key, &key,
OperationType::Get, OperationType::Get,
scuba.clone(),
)
.collect();
let write_mostly_requests: FuturesUnordered<_> = multiplexed_get(
ctx.clone(),
write_mostly_blobstores.as_ref(),
&key,
OperationType::Get,
scuba, scuba,
) )
.collect(); .collect();
// `chain` here guarantees that `main_requests` is empty before it starts
// polling anything in `write_mostly_requests`
let mut requests = main_requests.chain(write_mostly_requests);
while let Some(result) = requests.next().await { while let Some(result) = requests.next().await {
match result { match result {
(_, Ok(Some(mut value))) => { (_, Ok(Some(mut value))) => {
if is_logged { if is_logged {
// Allow the other requests to complete so that we can record some // Allow the other requests to complete so that we can record some
// metrics for the blobstore. // metrics for the blobstore. This will also log metrics for write-mostly
// blobstores, which helps us decide whether they're good
tokio::spawn(requests.for_each(|_| async {})); tokio::spawn(requests.for_each(|_| async {}));
} }
// Return the blob that won the race // Return the blob that won the race
@ -306,9 +343,11 @@ impl Blobstore for MultiplexedBlobstoreBase {
) -> BoxFuture<'static, Result<Option<BlobstoreGetData>, Error>> { ) -> BoxFuture<'static, Result<Option<BlobstoreGetData>, Error>> {
let mut scuba = self.scuba.clone(); let mut scuba = self.scuba.clone();
let blobstores = self.blobstores.clone(); let blobstores = self.blobstores.clone();
let write_mostly_blobstores = self.write_mostly_blobstores.clone();
scuba.sampled(self.scuba_sample_rate); scuba.sampled(self.scuba_sample_rate);
async move { blobstore_get(ctx, blobstores, key, scuba).await }.boxed() async move { blobstore_get(ctx, blobstores, write_mostly_blobstores, key, scuba).await }
.boxed()
} }
fn put( fn put(
@ -323,6 +362,7 @@ impl Blobstore for MultiplexedBlobstoreBase {
let mut puts: FuturesUnordered<_> = self let mut puts: FuturesUnordered<_> = self
.blobstores .blobstores
.iter() .iter()
.chain(self.write_mostly_blobstores.iter())
.cloned() .cloned()
.map({ .map({
|(blobstore_id, blobstore)| { |(blobstore_id, blobstore)| {
@ -409,9 +449,9 @@ impl Blobstore for MultiplexedBlobstoreBase {
} }
fn is_present(&self, ctx: CoreContext, key: String) -> BoxFuture<'static, Result<bool, Error>> { fn is_present(&self, ctx: CoreContext, key: String) -> BoxFuture<'static, Result<bool, Error>> {
let blobstores_count = self.blobstores.len(); let blobstores_count = self.blobstores.len() + self.write_mostly_blobstores.len();
let mut requests: FuturesUnordered<_> = self let main_requests: FuturesUnordered<_> = self
.blobstores .blobstores
.iter() .iter()
.cloned() .cloned()
@ -421,7 +461,20 @@ impl Blobstore for MultiplexedBlobstoreBase {
async move { (blobstore_id, blobstore.is_present(ctx, key).await) } async move { (blobstore_id, blobstore.is_present(ctx, key).await) }
}) })
.collect(); .collect();
let write_mostly_requests: FuturesUnordered<_> = self
.write_mostly_blobstores
.iter()
.cloned()
.map(|(blobstore_id, blobstore)| {
let ctx = ctx.clone();
let key = key.clone();
async move { (blobstore_id, blobstore.is_present(ctx, key).await) }
})
.collect();
// `chain` here guarantees that `main_requests` is empty before it starts
// polling anything in `write_mostly_requests`
let mut requests = main_requests.chain(write_mostly_requests);
async move { async move {
let (stats, result) = { let (stats, result) = {
let ctx = &ctx; let ctx = &ctx;

View File

@ -29,6 +29,7 @@ impl MultiplexedBlobstore {
pub fn new( pub fn new(
multiplex_id: MultiplexId, multiplex_id: MultiplexId,
blobstores: Vec<(BlobstoreId, Arc<dyn Blobstore>)>, blobstores: Vec<(BlobstoreId, Arc<dyn Blobstore>)>,
write_mostly_blobstores: Vec<(BlobstoreId, Arc<dyn Blobstore>)>,
queue: Arc<dyn BlobstoreSyncQueue>, queue: Arc<dyn BlobstoreSyncQueue>,
scuba: ScubaSampleBuilder, scuba: ScubaSampleBuilder,
scuba_sample_rate: NonZeroU64, scuba_sample_rate: NonZeroU64,
@ -40,6 +41,7 @@ impl MultiplexedBlobstore {
blobstore: Arc::new(MultiplexedBlobstoreBase::new( blobstore: Arc::new(MultiplexedBlobstoreBase::new(
multiplex_id, multiplex_id,
blobstores, blobstores,
write_mostly_blobstores,
put_handler, put_handler,
scuba, scuba,
scuba_sample_rate, scuba_sample_rate,

View File

@ -89,6 +89,7 @@ impl ScrubBlobstore {
pub fn new( pub fn new(
multiplex_id: MultiplexId, multiplex_id: MultiplexId,
blobstores: Vec<(BlobstoreId, Arc<dyn Blobstore>)>, blobstores: Vec<(BlobstoreId, Arc<dyn Blobstore>)>,
write_mostly_blobstores: Vec<(BlobstoreId, Arc<dyn Blobstore>)>,
queue: Arc<dyn BlobstoreSyncQueue>, queue: Arc<dyn BlobstoreSyncQueue>,
scuba: ScubaSampleBuilder, scuba: ScubaSampleBuilder,
scuba_sample_rate: NonZeroU64, scuba_sample_rate: NonZeroU64,
@ -98,6 +99,7 @@ impl ScrubBlobstore {
let inner = MultiplexedBlobstore::new( let inner = MultiplexedBlobstore::new(
multiplex_id, multiplex_id,
blobstores.clone(), blobstores.clone(),
write_mostly_blobstores.clone(),
queue.clone(), queue.clone(),
scuba.clone(), scuba.clone(),
scuba_sample_rate, scuba_sample_rate,
@ -110,6 +112,7 @@ impl ScrubBlobstore {
scrub_stores: Arc::new( scrub_stores: Arc::new(
blobstores blobstores
.into_iter() .into_iter()
.chain(write_mostly_blobstores.into_iter())
.collect::<HashMap<BlobstoreId, Arc<dyn Blobstore>>>(), .collect::<HashMap<BlobstoreId, Arc<dyn Blobstore>>>(),
), ),
queue, queue,

View File

@ -62,6 +62,12 @@ impl<T> Tickable<T> {
} }
} }
pub fn add_content(&self, key: String, value: T) {
self.storage.with(|s| {
s.insert(key, value);
})
}
// Broadcast either success or error to a set of outstanding futures, advancing the // Broadcast either success or error to a set of outstanding futures, advancing the
// overall state by one tick. // overall state by one tick.
pub fn tick(&self, error: Option<&str>) { pub fn tick(&self, error: Option<&str>) {
@ -211,6 +217,7 @@ async fn scrub_blobstore_fetch_none(fb: FacebookInit) -> Result<(), Error> {
let bs = ScrubBlobstore::new( let bs = ScrubBlobstore::new(
MultiplexId::new(1), MultiplexId::new(1),
vec![(bid0, bs0.clone()), (bid1, bs1.clone())], vec![(bid0, bs0.clone()), (bid1, bs1.clone())],
vec![],
queue.clone(), queue.clone(),
ScubaSampleBuilder::with_discard(), ScubaSampleBuilder::with_discard(),
nonzero!(1u64), nonzero!(1u64),
@ -252,6 +259,7 @@ async fn base(fb: FacebookInit) {
(BlobstoreId::new(0), bs0.clone()), (BlobstoreId::new(0), bs0.clone()),
(BlobstoreId::new(1), bs1.clone()), (BlobstoreId::new(1), bs1.clone()),
], ],
vec![],
log.clone(), log.clone(),
ScubaSampleBuilder::with_discard(), ScubaSampleBuilder::with_discard(),
nonzero!(1u64), nonzero!(1u64),
@ -395,6 +403,7 @@ async fn multiplexed(fb: FacebookInit) {
let bs = MultiplexedBlobstore::new( let bs = MultiplexedBlobstore::new(
MultiplexId::new(1), MultiplexId::new(1),
vec![(bid0, bs0.clone()), (bid1, bs1.clone())], vec![(bid0, bs0.clone()), (bid1, bs1.clone())],
vec![],
queue.clone(), queue.clone(),
ScubaSampleBuilder::with_discard(), ScubaSampleBuilder::with_discard(),
nonzero!(1u64), nonzero!(1u64),
@ -476,6 +485,7 @@ async fn multiplexed_operation_keys(fb: FacebookInit) -> Result<(), Error> {
(bid1, bs1.clone()), (bid1, bs1.clone()),
(bid2, bs2.clone()), (bid2, bs2.clone()),
], ],
vec![],
queue.clone(), queue.clone(),
ScubaSampleBuilder::with_discard(), ScubaSampleBuilder::with_discard(),
nonzero!(1u64), nonzero!(1u64),
@ -519,6 +529,7 @@ async fn scrubbed(fb: FacebookInit) {
let bs = ScrubBlobstore::new( let bs = ScrubBlobstore::new(
MultiplexId::new(1), MultiplexId::new(1),
vec![(bid0, bs0.clone()), (bid1, bs1.clone())], vec![(bid0, bs0.clone()), (bid1, bs1.clone())],
vec![],
queue.clone(), queue.clone(),
ScubaSampleBuilder::with_discard(), ScubaSampleBuilder::with_discard(),
nonzero!(1u64), nonzero!(1u64),
@ -587,6 +598,7 @@ async fn scrubbed(fb: FacebookInit) {
let bs = ScrubBlobstore::new( let bs = ScrubBlobstore::new(
MultiplexId::new(1), MultiplexId::new(1),
vec![(bid0, bs0.clone()), (bid1, bs1.clone())], vec![(bid0, bs0.clone()), (bid1, bs1.clone())],
vec![],
queue.clone(), queue.clone(),
ScubaSampleBuilder::with_discard(), ScubaSampleBuilder::with_discard(),
nonzero!(1u64), nonzero!(1u64),
@ -670,6 +682,7 @@ async fn queue_waits(fb: FacebookInit) {
(BlobstoreId::new(1), bs1.clone()), (BlobstoreId::new(1), bs1.clone()),
(BlobstoreId::new(2), bs2.clone()), (BlobstoreId::new(2), bs2.clone()),
], ],
vec![],
log.clone(), log.clone(),
ScubaSampleBuilder::with_discard(), ScubaSampleBuilder::with_discard(),
nonzero!(1u64), nonzero!(1u64),
@ -762,3 +775,257 @@ async fn queue_waits(fb: FacebookInit) {
clear(); clear();
} }
} }
#[fbinit::test]
async fn write_mostly_get(fb: FacebookInit) {
let both_key = "both".to_string();
let value = make_value("value");
let write_mostly_key = "write_mostly".to_string();
let main_bs = Arc::new(Tickable::new());
let write_mostly_bs = Arc::new(Tickable::new());
let log = Arc::new(LogHandler::new());
let bs = MultiplexedBlobstoreBase::new(
MultiplexId::new(1),
vec![(BlobstoreId::new(0), main_bs.clone())],
vec![(BlobstoreId::new(1), write_mostly_bs.clone())],
log.clone(),
ScubaSampleBuilder::with_discard(),
nonzero!(1u64),
);
let ctx = CoreContext::test_mock(fb);
// Put one blob into both blobstores
main_bs.add_content(both_key.clone(), value.clone());
write_mostly_bs.add_content(both_key.clone(), value.clone());
// Put a blob only into the write mostly blobstore
write_mostly_bs.add_content(write_mostly_key.clone(), value.clone());
// Fetch the blob that's in both blobstores, see that the write mostly blobstore isn't being
// read from by ticking it
{
let mut fut = bs.get(ctx.clone(), both_key.clone());
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
// Ticking the write_mostly store does nothing.
for _ in 0..3usize {
write_mostly_bs.tick(None);
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
}
// Tick the main store, and we're finished
main_bs.tick(None);
assert_eq!(fut.await.unwrap(), Some(value.clone().into()));
log.clear();
}
// Fetch the blob that's only in the write mostly blobstore, see it fetch correctly
{
let mut fut = bs.get(ctx.clone(), write_mostly_key);
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
// Ticking the main store does nothing, as it lacks the blob
for _ in 0..3usize {
main_bs.tick(None);
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
}
// Tick the write_mostly store, and we're finished
write_mostly_bs.tick(None);
assert_eq!(fut.await.unwrap(), Some(value.clone().into()));
log.clear();
}
// Fetch the blob that's in both blobstores, see that the write mostly blobstore
// is used when the main blobstore fails
{
let mut fut = bs.get(ctx.clone(), both_key);
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
// Ticking the write_mostly store does nothing.
for _ in 0..3usize {
write_mostly_bs.tick(None);
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
}
// Tick the main store, and we're still stuck
main_bs.tick(Some("Main blobstore failed - fallback to write_mostly"));
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
// Finally, the write_mostly store returns our value
write_mostly_bs.tick(None);
assert_eq!(fut.await.unwrap(), Some(value.clone().into()));
log.clear();
}
}
#[fbinit::test]
async fn write_mostly_put(fb: FacebookInit) {
let main_bs = Arc::new(Tickable::new());
let write_mostly_bs = Arc::new(Tickable::new());
let log = Arc::new(LogHandler::new());
let bs = MultiplexedBlobstoreBase::new(
MultiplexId::new(1),
vec![(BlobstoreId::new(0), main_bs.clone())],
vec![(BlobstoreId::new(1), write_mostly_bs.clone())],
log.clone(),
ScubaSampleBuilder::with_discard(),
nonzero!(1u64),
);
let ctx = CoreContext::test_mock(fb);
// succeed as soon as main succeeds. Fail write_mostly to confirm that we can still read.
{
let v0 = make_value("v0");
let k0 = String::from("k0");
let mut put_fut = bs
.put(ctx.clone(), k0.clone(), v0.clone())
.map_err(|_| ())
.boxed();
assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending);
main_bs.tick(None);
put_fut.await.unwrap();
assert_eq!(
main_bs.storage.with(|s| s.get(&k0).cloned()),
Some(v0.clone())
);
assert!(write_mostly_bs.storage.with(|s| s.is_empty()));
write_mostly_bs.tick(Some("write_mostly_bs failed"));
assert!(log
.log
.with(|log| log == &vec![(BlobstoreId::new(0), k0.clone())]));
// should succeed as it is stored in main_bs
let mut get_fut = bs.get(ctx.clone(), k0).map_err(|_| ()).boxed();
assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending);
main_bs.tick(None);
write_mostly_bs.tick(None);
assert_eq!(get_fut.await.unwrap(), Some(v0.into()));
assert!(write_mostly_bs.storage.with(|s| s.is_empty()));
main_bs.storage.with(|s| s.clear());
write_mostly_bs.storage.with(|s| s.clear());
log.clear();
}
// succeed as soon as write_mostly succeeds. Fail main to confirm we can still read
{
let v0 = make_value("v0");
let k0 = String::from("k0");
let mut put_fut = bs
.put(ctx.clone(), k0.clone(), v0.clone())
.map_err(|_| ())
.boxed();
assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending);
write_mostly_bs.tick(None);
put_fut.await.unwrap();
assert_eq!(
write_mostly_bs.storage.with(|s| s.get(&k0).cloned()),
Some(v0.clone())
);
assert!(main_bs.storage.with(|s| s.is_empty()));
main_bs.tick(Some("main_bs failed"));
assert!(log
.log
.with(|log| log == &vec![(BlobstoreId::new(1), k0.clone())]));
// should succeed as it is stored in write_mostly_bs, but main won't read
let mut get_fut = bs.get(ctx.clone(), k0).map_err(|_| ()).boxed();
assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending);
main_bs.tick(None);
assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending);
write_mostly_bs.tick(None);
assert_eq!(get_fut.await.unwrap(), Some(v0.into()));
assert!(main_bs.storage.with(|s| s.is_empty()));
main_bs.storage.with(|s| s.clear());
write_mostly_bs.storage.with(|s| s.clear());
log.clear();
}
// succeed if write_mostly succeeds and main fails
{
let v1 = make_value("v1");
let k1 = String::from("k1");
let mut put_fut = bs
.put(ctx.clone(), k1.clone(), v1.clone())
.map_err(|_| ())
.boxed();
assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending);
main_bs.tick(Some("case 2: main_bs failed"));
assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending);
write_mostly_bs.tick(None);
put_fut.await.unwrap();
assert!(main_bs.storage.with(|s| s.get(&k1).is_none()));
assert_eq!(
write_mostly_bs.storage.with(|s| s.get(&k1).cloned()),
Some(v1.clone())
);
assert!(log
.log
.with(|log| log == &vec![(BlobstoreId::new(1), k1.clone())]));
let mut get_fut = bs.get(ctx.clone(), k1.clone()).map_err(|_| ()).boxed();
assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending);
main_bs.tick(None);
assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending);
write_mostly_bs.tick(None);
assert_eq!(get_fut.await.unwrap(), Some(v1.into()));
assert!(main_bs.storage.with(|s| s.get(&k1).is_none()));
main_bs.storage.with(|s| s.clear());
write_mostly_bs.storage.with(|s| s.clear());
log.clear();
}
// both fail => whole put fail
{
let k2 = String::from("k2");
let v2 = make_value("v2");
let mut put_fut = bs
.put(ctx.clone(), k2.clone(), v2.clone())
.map_err(|_| ())
.boxed();
assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending);
main_bs.tick(Some("case 3: main_bs failed"));
assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending);
write_mostly_bs.tick(Some("case 3: write_mostly_bs failed"));
assert!(put_fut.await.is_err());
}
// both put succeed
{
let k4 = String::from("k4");
let v4 = make_value("v4");
main_bs.storage.with(|s| s.clear());
write_mostly_bs.storage.with(|s| s.clear());
log.clear();
let mut put_fut = bs
.put(ctx.clone(), k4.clone(), v4.clone())
.map_err(|_| ())
.boxed();
assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending);
main_bs.tick(None);
put_fut.await.unwrap();
assert_eq!(
main_bs.storage.with(|s| s.get(&k4).cloned()),
Some(v4.clone())
);
write_mostly_bs.tick(None);
while log.log.with(|log| log.len() != 2) {
tokio::task::yield_now().await;
}
assert_eq!(
write_mostly_bs.storage.with(|s| s.get(&k4).cloned()),
Some(v4.clone())
);
}
}

View File

@ -102,7 +102,7 @@ fn get_blobconfig(
let seeked_id = BlobstoreId::new(inner_blobstore_id); let seeked_id = BlobstoreId::new(inner_blobstore_id);
blobstores blobstores
.into_iter() .into_iter()
.find_map(|(blobstore_id, blobstore)| { .find_map(|(blobstore_id, _, blobstore)| {
if blobstore_id == seeked_id { if blobstore_id == seeked_id {
Some(blobstore) Some(blobstore)
} else { } else {

View File

@ -104,7 +104,7 @@ async fn maybe_schedule_healer_for_storage(
let blobstores = blobstore_configs let blobstores = blobstore_configs
.into_iter() .into_iter()
.map(|(id, blobconfig)| async move { .map(|(id, _, blobconfig)| async move {
let blobstore = make_blobstore( let blobstore = make_blobstore(
fb, fb,
blobconfig, blobconfig,

View File

@ -229,8 +229,8 @@ fn parse_args(fb: FacebookInit) -> Result<Config, Error> {
}; };
let manifold_args = blobstores let manifold_args = blobstores
.iter() .iter()
.filter(|(id, _)| src_blobstore_id == *id) .filter(|(id, ..)| src_blobstore_id == *id)
.map(|(_, args)| args) .map(|(.., args)| args)
.next() .next()
.ok_or(format_err!( .ok_or(format_err!(
"failed to find source blobstore id: {:?}", "failed to find source blobstore id: {:?}",

View File

@ -411,11 +411,11 @@ mod test {
CommitSyncConfigVersion, CommitSyncDirection, DatabaseConfig, CommitSyncConfigVersion, CommitSyncDirection, DatabaseConfig,
DefaultSmallToLargeCommitSyncPathAction, DerivedDataConfig, FilestoreParams, HookBypass, DefaultSmallToLargeCommitSyncPathAction, DerivedDataConfig, FilestoreParams, HookBypass,
HookConfig, HookManagerParams, HookParams, InfinitepushNamespace, InfinitepushParams, HookConfig, HookManagerParams, HookParams, InfinitepushNamespace, InfinitepushParams,
LfsParams, LocalDatabaseConfig, MetadataDatabaseConfig, MultiplexId, PushParams, LfsParams, LocalDatabaseConfig, MetadataDatabaseConfig, MultiplexId, MultiplexedStoreType,
PushrebaseFlags, PushrebaseParams, RemoteDatabaseConfig, RemoteMetadataDatabaseConfig, PushParams, PushrebaseFlags, PushrebaseParams, RemoteDatabaseConfig,
ShardableRemoteDatabaseConfig, ShardedRemoteDatabaseConfig, SmallRepoCommitSyncConfig, RemoteMetadataDatabaseConfig, ShardableRemoteDatabaseConfig, ShardedRemoteDatabaseConfig,
SourceControlServiceMonitoring, SourceControlServiceParams, UnodeVersion, SmallRepoCommitSyncConfig, SourceControlServiceMonitoring, SourceControlServiceParams,
WireprotoLoggingConfig, UnodeVersion, WireprotoLoggingConfig,
}; };
use mononoke_types::MPath; use mononoke_types::MPath;
use nonzero_ext::nonzero; use nonzero_ext::nonzero;
@ -997,6 +997,7 @@ mod test {
blobstores: vec![ blobstores: vec![
( (
BlobstoreId::new(0), BlobstoreId::new(0),
MultiplexedStoreType::Normal,
BlobConfig::Manifold { BlobConfig::Manifold {
bucket: "bucket".into(), bucket: "bucket".into(),
prefix: "".into(), prefix: "".into(),
@ -1004,6 +1005,7 @@ mod test {
), ),
( (
BlobstoreId::new(1), BlobstoreId::new(1),
MultiplexedStoreType::Normal,
BlobConfig::Files { BlobConfig::Files {
path: "/tmp/foo".into(), path: "/tmp/foo".into(),
}, },
@ -1436,7 +1438,7 @@ mod test {
scuba_table: None, scuba_table: None,
scuba_sample_rate: nonzero!(100u64), scuba_sample_rate: nonzero!(100u64),
blobstores: vec![ blobstores: vec![
(BlobstoreId::new(1), BlobConfig::Files { (BlobstoreId::new(1), MultiplexedStoreType::Normal, BlobConfig::Files {
path: "/tmp/foo".into() path: "/tmp/foo".into()
}) })
], ],
@ -1577,4 +1579,71 @@ mod test {
assert!(res.is_err()); assert!(res.is_err());
assert!(msg.contains("unknown keys in config parsing")); assert!(msg.contains("unknown keys in config parsing"));
} }
#[fbinit::test]
fn test_multiplexed_store_types(fb: FacebookInit) {
const STORAGE: &str = r#"
[multiplex_store.metadata.remote]
primary = { db_address = "some_db" }
filenodes = { sharded = { shard_map = "some-shards", shard_num = 123 } }
[multiplex_store.blobstore.multiplexed]
multiplex_id = 1
components = [
{ blobstore_id = 1, blobstore = { blob_files = { path = "/tmp/foo1" } } },
{ blobstore_id = 2, store_type = { normal = {}}, blobstore = { blob_files = { path = "/tmp/foo2" } } },
{ blobstore_id = 3, store_type = { write_mostly = {}}, blobstore = { blob_files = { path = "/tmp/foo3" } } },
]
queue_db = { remote = { db_address = "queue_db_address" } }
"#;
const REPO: &str = r#"
repoid = 123
storage_config = "multiplex_store"
"#;
let paths = btreemap! {
"common/storage.toml" => STORAGE,
"common/commitsyncmap.toml" => "",
"repos/test/server.toml" => REPO,
};
let tmp_dir = write_files(&paths);
let res = load_repo_configs(fb, tmp_dir.path()).expect("Read configs failed");
if let BlobConfig::Multiplexed { blobstores, .. } =
&res.repos["test"].storage_config.blobstore
{
let expected_blobstores = vec![
(
BlobstoreId::new(1),
MultiplexedStoreType::Normal,
BlobConfig::Files {
path: "/tmp/foo1".into(),
},
),
(
BlobstoreId::new(2),
MultiplexedStoreType::Normal,
BlobConfig::Files {
path: "/tmp/foo2".into(),
},
),
(
BlobstoreId::new(3),
MultiplexedStoreType::WriteMostly,
BlobConfig::Files {
path: "/tmp/foo3".into(),
},
),
];
assert_eq!(
blobstores, &expected_blobstores,
"Blobstores parsed from config are wrong"
);
} else {
panic!("Multiplexed config is not a multiplexed blobstore");
}
}
} }

View File

@ -13,13 +13,15 @@ use std::time::Duration;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use metaconfig_types::{ use metaconfig_types::{
BlobConfig, BlobstoreId, DatabaseConfig, FilestoreParams, LocalDatabaseConfig, BlobConfig, BlobstoreId, DatabaseConfig, FilestoreParams, LocalDatabaseConfig,
MetadataDatabaseConfig, MultiplexId, RemoteDatabaseConfig, RemoteMetadataDatabaseConfig, MetadataDatabaseConfig, MultiplexId, MultiplexedStoreType, RemoteDatabaseConfig,
ShardableRemoteDatabaseConfig, ShardedRemoteDatabaseConfig, StorageConfig, RemoteMetadataDatabaseConfig, ShardableRemoteDatabaseConfig, ShardedRemoteDatabaseConfig,
StorageConfig,
}; };
use nonzero_ext::nonzero; use nonzero_ext::nonzero;
use repos::{ use repos::{
RawBlobstoreConfig, RawDbConfig, RawDbLocal, RawDbRemote, RawDbShardableRemote, RawBlobstoreConfig, RawDbConfig, RawDbLocal, RawDbRemote, RawDbShardableRemote,
RawDbShardedRemote, RawFilestoreParams, RawMetadataConfig, RawStorageConfig, RawDbShardedRemote, RawFilestoreParams, RawMetadataConfig, RawMultiplexedStoreType,
RawStorageConfig,
}; };
use crate::convert::Convert; use crate::convert::Convert;
@ -67,6 +69,9 @@ impl Convert for RawBlobstoreConfig {
.map(|comp| { .map(|comp| {
Ok(( Ok((
BlobstoreId::new(comp.blobstore_id.try_into()?), BlobstoreId::new(comp.blobstore_id.try_into()?),
comp.store_type
.convert()?
.unwrap_or(MultiplexedStoreType::Normal),
comp.blobstore.convert()?, comp.blobstore.convert()?,
)) ))
}) })
@ -209,3 +214,17 @@ impl Convert for RawFilestoreParams {
}) })
} }
} }
impl Convert for RawMultiplexedStoreType {
type Output = MultiplexedStoreType;
fn convert(self) -> Result<Self::Output> {
match self {
RawMultiplexedStoreType::normal(_) => Ok(MultiplexedStoreType::Normal),
RawMultiplexedStoreType::write_mostly(_) => Ok(MultiplexedStoreType::WriteMostly),
RawMultiplexedStoreType::UnknownField(field) => {
Err(anyhow!("unknown store type {}", field))
}
}
}
}

View File

@ -643,6 +643,16 @@ impl FromStr for ScrubAction {
} }
} }
/// Whether we should read from this blobstore normally in a Multiplex,
/// or only read from it in Scrub or when it's our last chance to find the blob
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize)]
pub enum MultiplexedStoreType {
/// Normal operation, no special treatment
Normal,
/// Only read if Normal blobstores don't provide the blob. Writes go here as per normal
WriteMostly,
}
/// Configuration for a blobstore /// Configuration for a blobstore
#[derive(Debug, Clone, Eq, PartialEq)] #[derive(Debug, Clone, Eq, PartialEq)]
pub enum BlobConfig { pub enum BlobConfig {
@ -680,7 +690,7 @@ pub enum BlobConfig {
/// A scuba table I guess /// A scuba table I guess
scuba_table: Option<String>, scuba_table: Option<String>,
/// Set of blobstores being multiplexed over /// Set of blobstores being multiplexed over
blobstores: Vec<(BlobstoreId, BlobConfig)>, blobstores: Vec<(BlobstoreId, MultiplexedStoreType, BlobConfig)>,
/// 1 in scuba_sample_rate samples will be logged. /// 1 in scuba_sample_rate samples will be logged.
scuba_sample_rate: NonZeroU64, scuba_sample_rate: NonZeroU64,
/// DB config to use for the sync queue /// DB config to use for the sync queue
@ -693,7 +703,7 @@ pub enum BlobConfig {
/// A scuba table I guess /// A scuba table I guess
scuba_table: Option<String>, scuba_table: Option<String>,
/// Set of blobstores being multiplexed over /// Set of blobstores being multiplexed over
blobstores: Vec<(BlobstoreId, BlobConfig)>, blobstores: Vec<(BlobstoreId, MultiplexedStoreType, BlobConfig)>,
/// Whether to attempt repair /// Whether to attempt repair
scrub_action: ScrubAction, scrub_action: ScrubAction,
/// 1 in scuba_sample_rate samples will be logged. /// 1 in scuba_sample_rate samples will be logged.
@ -737,7 +747,7 @@ impl BlobConfig {
Manifold { .. } | Mysql { .. } | ManifoldWithTtl { .. } => false, Manifold { .. } | Mysql { .. } | ManifoldWithTtl { .. } => false,
Multiplexed { blobstores, .. } | Scrub { blobstores, .. } => blobstores Multiplexed { blobstores, .. } | Scrub { blobstores, .. } => blobstores
.iter() .iter()
.map(|(_, config)| config) .map(|(_, _, config)| config)
.all(BlobConfig::is_local), .all(BlobConfig::is_local),
Logging { blobconfig, .. } => blobconfig.is_local(), Logging { blobconfig, .. } => blobconfig.is_local(),
Pack { blobconfig, .. } => blobconfig.is_local(), Pack { blobconfig, .. } => blobconfig.is_local(),
@ -760,7 +770,7 @@ impl BlobConfig {
{ {
let scuba_table = mem::replace(scuba_table, None); let scuba_table = mem::replace(scuba_table, None);
let mut blobstores = mem::replace(blobstores, Vec::new()); let mut blobstores = mem::replace(blobstores, Vec::new());
for (_, store) in blobstores.iter_mut() { for (_, _, store) in blobstores.iter_mut() {
store.set_scrubbed(scrub_action); store.set_scrubbed(scrub_action);
} }
*self = Scrub { *self = Scrub {

View File

@ -119,7 +119,7 @@ fn get_blobconfig(
let seeked_id = BlobstoreId::new(inner_blobstore_id); let seeked_id = BlobstoreId::new(inner_blobstore_id);
blobstores blobstores
.into_iter() .into_iter()
.find_map(|(blobstore_id, blobstore)| { .find_map(|(blobstore_id, _, blobstore)| {
if blobstore_id == seeked_id { if blobstore_id == seeked_id {
Some(blobstore) Some(blobstore)
} else { } else {
@ -181,7 +181,7 @@ pub async fn open_blobstore(
// Without this the new stats only show up when a repair is needed (i.e. as they get incremented), // Without this the new stats only show up when a repair is needed (i.e. as they get incremented),
// which makes them harder to monitor on (no datapoints rather than a zero datapoint at start). // which makes them harder to monitor on (no datapoints rather than a zero datapoint at start).
for s in &[STATS::scrub_repaired, STATS::scrub_repair_required] { for s in &[STATS::scrub_repaired, STATS::scrub_repair_required] {
for (id, _config) in &blobstores { for (id, _ty, _config) in &blobstores {
s.add_value(0, (walk_stats_key, id.to_string(), repo_stats_key.clone())); s.add_value(0, (walk_stats_key, id.to_string(), repo_stats_key.clone()));
} }
} }