diff --git a/configerator/structs/scm/mononoke/repos/repos.thrift b/configerator/structs/scm/mononoke/repos/repos.thrift index 7f9a01f5a6..98509aa0dd 100644 --- a/configerator/structs/scm/mononoke/repos/repos.thrift +++ b/configerator/structs/scm/mononoke/repos/repos.thrift @@ -1,4 +1,4 @@ -// @generated SignedSource<<2d5fffd0140b4cfc55b254c5e4df7f1b>> +// @generated SignedSource<> // DO NOT EDIT THIS FILE MANUALLY! // This file is a mechanical copy of the version in the configerator repo. To // modify it, edit the copy in the configerator repo instead and copy it over by @@ -161,6 +161,9 @@ struct RawBlobstoreMultiplexed { 3: optional i64 scuba_sample_rate, 4: optional i32 multiplex_id, 5: optional RawDbConfig queue_db, + // The number of components that must successfully `put` a blob before the + // multiplex as a whole claims that it successfully `put` the blob + 6: optional i64 minimum_successful_writes, } struct RawBlobstoreManifoldWithTtl { 1: string manifold_bucket, @@ -195,9 +198,22 @@ union RawBlobstoreConfig { 10: RawBlobstorePack pack, } +// A write-mostly blobstore is one that is not read from in normal operation. +// Mononoke will read from it in two cases: +// 1. Verifying that data is present in all blobstores (scrub etc) +// 2. Where all "normal" (not write-mostly) blobstores fail to return a blob (error or missing) +union RawMultiplexedStoreType { + 1: RawMultiplexedStoreNormal normal, + 2: RawMultiplexedStoreWriteMostly write_mostly, +} + +struct RawMultiplexedStoreNormal {} +struct RawMultiplexedStoreWriteMostly {} + struct RawBlobstoreIdConfig { 1: i64 blobstore_id, 2: RawBlobstoreConfig blobstore, + 3: optional RawMultiplexedStoreType store_type, } struct RawDbLocal { diff --git a/eden/mononoke/blobstore/factory/src/blobstore.rs b/eden/mononoke/blobstore/factory/src/blobstore.rs index e49623304e..1ecee74eed 100644 --- a/eden/mononoke/blobstore/factory/src/blobstore.rs +++ b/eden/mononoke/blobstore/factory/src/blobstore.rs @@ -18,7 +18,7 @@ use futures::{ }; use logblob::LogBlob; use metaconfig_types::{ - BlobConfig, BlobstoreId, DatabaseConfig, MultiplexId, ScrubAction, + BlobConfig, BlobstoreId, DatabaseConfig, MultiplexId, MultiplexedStoreType, ScrubAction, ShardableRemoteDatabaseConfig, }; use multiplexedblob::{LoggingScrubHandler, MultiplexedBlobstore, ScrubBlobstore, ScrubHandler}; @@ -318,7 +318,7 @@ pub fn make_blobstore_multiplexed<'a>( queue_db: DatabaseConfig, scuba_table: Option, scuba_sample_rate: NonZeroU64, - inner_config: Vec<(BlobstoreId, BlobConfig)>, + inner_config: Vec<(BlobstoreId, MultiplexedStoreType, BlobConfig)>, scrub_args: Option<(Arc, ScrubAction)>, mysql_options: MysqlOptions, readonly_storage: ReadOnlyStorage, @@ -335,7 +335,7 @@ pub fn make_blobstore_multiplexed<'a>( let mut applied_chaos = false; let components = future::try_join_all(inner_config.into_iter().map({ - move |(blobstoreid, config)| { + move |(blobstoreid, store_type, config)| { let mut blobstore_options = blobstore_options.clone(); if blobstore_options.chaos_options.has_chaos() { @@ -360,7 +360,7 @@ pub fn make_blobstore_multiplexed<'a>( ) .await?; - Ok((blobstoreid, store)) + Ok((blobstoreid, store_type, store)) } } })); @@ -374,10 +374,26 @@ pub fn make_blobstore_multiplexed<'a>( let (components, queue) = future::try_join(components, queue).await?; + // For now, `partition` could do this, but this will be easier to extend when we introduce more store types + let (normal_components, write_mostly_components) = { + let mut normal_components = vec![]; + let mut write_mostly_components = vec![]; + for (blobstore_id, store_type, store) in components.into_iter() { + match store_type { + MultiplexedStoreType::Normal => normal_components.push((blobstore_id, store)), + MultiplexedStoreType::WriteMostly => { + write_mostly_components.push((blobstore_id, store)) + } + } + } + (normal_components, write_mostly_components) + }; + let blobstore = match scrub_args { Some((scrub_handler, scrub_action)) => Arc::new(ScrubBlobstore::new( multiplex_id, - components, + normal_components, + write_mostly_components, Arc::new(queue), scuba_table.map_or(ScubaSampleBuilder::with_discard(), |table| { ScubaSampleBuilder::new(fb, table) @@ -388,7 +404,8 @@ pub fn make_blobstore_multiplexed<'a>( )) as Arc, None => Arc::new(MultiplexedBlobstore::new( multiplex_id, - components, + normal_components, + write_mostly_components, Arc::new(queue), scuba_table.map_or(ScubaSampleBuilder::with_discard(), |table| { ScubaSampleBuilder::new(fb, table) diff --git a/eden/mononoke/blobstore/multiplexedblob/src/base.rs b/eden/mononoke/blobstore/multiplexedblob/src/base.rs index cfdf4c7593..8dcb80d4cd 100644 --- a/eden/mononoke/blobstore/multiplexedblob/src/base.rs +++ b/eden/mononoke/blobstore/multiplexedblob/src/base.rs @@ -75,7 +75,19 @@ pub trait MultiplexedBlobstorePutHandler: Send + Sync { pub struct MultiplexedBlobstoreBase { multiplex_id: MultiplexId, + /// These are the "normal" blobstores, which are read from on `get`, and written to on `put` + /// as part of normal operation. No special treatment is applied. blobstores: Arc<[(BlobstoreId, Arc)]>, + /// Write-mostly blobstores are not normally read from on `get`, but take part in writes + /// like a normal blobstore. + /// + /// There are two circumstances in which a write-mostly blobstore will be read from on `get`: + /// 1. The normal blobstores (above) all return Ok(None) or Err for a blob. + /// In this case, we read as it's our only chance of returning data that we previously accepted + /// during a `put` operation. + /// 2. When we're recording blobstore stats to Scuba on a `get` - in this case, the read executes + /// solely to gather statistics, and the result is discarded + write_mostly_blobstores: Arc<[(BlobstoreId, Arc)]>, handler: Arc, scuba: ScubaSampleBuilder, scuba_sample_rate: NonZeroU64, @@ -85,6 +97,7 @@ impl MultiplexedBlobstoreBase { pub fn new( multiplex_id: MultiplexId, blobstores: Vec<(BlobstoreId, Arc)>, + write_mostly_blobstores: Vec<(BlobstoreId, Arc)>, handler: Arc, mut scuba: ScubaSampleBuilder, scuba_sample_rate: NonZeroU64, @@ -94,6 +107,7 @@ impl MultiplexedBlobstoreBase { Self { multiplex_id, blobstores: blobstores.into(), + write_mostly_blobstores: write_mostly_blobstores.into(), handler, scuba, scuba_sample_rate, @@ -108,13 +122,22 @@ impl MultiplexedBlobstoreBase { let mut scuba = self.scuba.clone(); scuba.sampled(self.scuba_sample_rate); - let results = join_all(multiplexed_get( - ctx, - self.blobstores.as_ref(), - key, - OperationType::ScrubGet, - scuba, - )) + let results = join_all( + multiplexed_get( + ctx, + self.blobstores.as_ref(), + key, + OperationType::ScrubGet, + scuba.clone(), + ) + .chain(multiplexed_get( + ctx, + self.write_mostly_blobstores.as_ref(), + key, + OperationType::ScrubGet, + scuba, + )), + ) .await; let (successes, errors): (HashMap<_, _>, HashMap<_, _>) = @@ -206,11 +229,12 @@ pub async fn inner_put( async fn blobstore_get( ctx: CoreContext, blobstores: Arc<[(BlobstoreId, Arc)]>, + write_mostly_blobstores: Arc<[(BlobstoreId, Arc)]>, key: String, scuba: ScubaSampleBuilder, ) -> Result, Error> { let is_logged = scuba.sampling().is_logged(); - let blobstores_count = blobstores.len(); + let blobstores_count = blobstores.len() + write_mostly_blobstores.len(); let (stats, result) = { let ctx = &ctx; @@ -219,20 +243,33 @@ async fn blobstore_get( ctx.perf_counters() .increment_counter(PerfCounterType::BlobGets); - let mut requests: FuturesUnordered<_> = multiplexed_get( + let main_requests: FuturesUnordered<_> = multiplexed_get( ctx.clone(), blobstores.as_ref(), &key, OperationType::Get, + scuba.clone(), + ) + .collect(); + let write_mostly_requests: FuturesUnordered<_> = multiplexed_get( + ctx.clone(), + write_mostly_blobstores.as_ref(), + &key, + OperationType::Get, scuba, ) .collect(); + + // `chain` here guarantees that `main_requests` is empty before it starts + // polling anything in `write_mostly_requests` + let mut requests = main_requests.chain(write_mostly_requests); while let Some(result) = requests.next().await { match result { (_, Ok(Some(mut value))) => { if is_logged { // Allow the other requests to complete so that we can record some - // metrics for the blobstore. + // metrics for the blobstore. This will also log metrics for write-mostly + // blobstores, which helps us decide whether they're good tokio::spawn(requests.for_each(|_| async {})); } // Return the blob that won the race @@ -306,9 +343,11 @@ impl Blobstore for MultiplexedBlobstoreBase { ) -> BoxFuture<'static, Result, Error>> { let mut scuba = self.scuba.clone(); let blobstores = self.blobstores.clone(); + let write_mostly_blobstores = self.write_mostly_blobstores.clone(); scuba.sampled(self.scuba_sample_rate); - async move { blobstore_get(ctx, blobstores, key, scuba).await }.boxed() + async move { blobstore_get(ctx, blobstores, write_mostly_blobstores, key, scuba).await } + .boxed() } fn put( @@ -323,6 +362,7 @@ impl Blobstore for MultiplexedBlobstoreBase { let mut puts: FuturesUnordered<_> = self .blobstores .iter() + .chain(self.write_mostly_blobstores.iter()) .cloned() .map({ |(blobstore_id, blobstore)| { @@ -409,9 +449,9 @@ impl Blobstore for MultiplexedBlobstoreBase { } fn is_present(&self, ctx: CoreContext, key: String) -> BoxFuture<'static, Result> { - let blobstores_count = self.blobstores.len(); + let blobstores_count = self.blobstores.len() + self.write_mostly_blobstores.len(); - let mut requests: FuturesUnordered<_> = self + let main_requests: FuturesUnordered<_> = self .blobstores .iter() .cloned() @@ -421,7 +461,20 @@ impl Blobstore for MultiplexedBlobstoreBase { async move { (blobstore_id, blobstore.is_present(ctx, key).await) } }) .collect(); + let write_mostly_requests: FuturesUnordered<_> = self + .write_mostly_blobstores + .iter() + .cloned() + .map(|(blobstore_id, blobstore)| { + let ctx = ctx.clone(); + let key = key.clone(); + async move { (blobstore_id, blobstore.is_present(ctx, key).await) } + }) + .collect(); + // `chain` here guarantees that `main_requests` is empty before it starts + // polling anything in `write_mostly_requests` + let mut requests = main_requests.chain(write_mostly_requests); async move { let (stats, result) = { let ctx = &ctx; diff --git a/eden/mononoke/blobstore/multiplexedblob/src/queue.rs b/eden/mononoke/blobstore/multiplexedblob/src/queue.rs index 1e78654d67..9d23f44af6 100644 --- a/eden/mononoke/blobstore/multiplexedblob/src/queue.rs +++ b/eden/mononoke/blobstore/multiplexedblob/src/queue.rs @@ -29,6 +29,7 @@ impl MultiplexedBlobstore { pub fn new( multiplex_id: MultiplexId, blobstores: Vec<(BlobstoreId, Arc)>, + write_mostly_blobstores: Vec<(BlobstoreId, Arc)>, queue: Arc, scuba: ScubaSampleBuilder, scuba_sample_rate: NonZeroU64, @@ -40,6 +41,7 @@ impl MultiplexedBlobstore { blobstore: Arc::new(MultiplexedBlobstoreBase::new( multiplex_id, blobstores, + write_mostly_blobstores, put_handler, scuba, scuba_sample_rate, diff --git a/eden/mononoke/blobstore/multiplexedblob/src/scrub.rs b/eden/mononoke/blobstore/multiplexedblob/src/scrub.rs index 600096fafd..a85f7c30d5 100644 --- a/eden/mononoke/blobstore/multiplexedblob/src/scrub.rs +++ b/eden/mononoke/blobstore/multiplexedblob/src/scrub.rs @@ -89,6 +89,7 @@ impl ScrubBlobstore { pub fn new( multiplex_id: MultiplexId, blobstores: Vec<(BlobstoreId, Arc)>, + write_mostly_blobstores: Vec<(BlobstoreId, Arc)>, queue: Arc, scuba: ScubaSampleBuilder, scuba_sample_rate: NonZeroU64, @@ -98,6 +99,7 @@ impl ScrubBlobstore { let inner = MultiplexedBlobstore::new( multiplex_id, blobstores.clone(), + write_mostly_blobstores.clone(), queue.clone(), scuba.clone(), scuba_sample_rate, @@ -110,6 +112,7 @@ impl ScrubBlobstore { scrub_stores: Arc::new( blobstores .into_iter() + .chain(write_mostly_blobstores.into_iter()) .collect::>>(), ), queue, diff --git a/eden/mononoke/blobstore/multiplexedblob/src/test.rs b/eden/mononoke/blobstore/multiplexedblob/src/test.rs index 2e93a80690..8e36ebab8e 100644 --- a/eden/mononoke/blobstore/multiplexedblob/src/test.rs +++ b/eden/mononoke/blobstore/multiplexedblob/src/test.rs @@ -62,6 +62,12 @@ impl Tickable { } } + pub fn add_content(&self, key: String, value: T) { + self.storage.with(|s| { + s.insert(key, value); + }) + } + // Broadcast either success or error to a set of outstanding futures, advancing the // overall state by one tick. pub fn tick(&self, error: Option<&str>) { @@ -211,6 +217,7 @@ async fn scrub_blobstore_fetch_none(fb: FacebookInit) -> Result<(), Error> { let bs = ScrubBlobstore::new( MultiplexId::new(1), vec![(bid0, bs0.clone()), (bid1, bs1.clone())], + vec![], queue.clone(), ScubaSampleBuilder::with_discard(), nonzero!(1u64), @@ -252,6 +259,7 @@ async fn base(fb: FacebookInit) { (BlobstoreId::new(0), bs0.clone()), (BlobstoreId::new(1), bs1.clone()), ], + vec![], log.clone(), ScubaSampleBuilder::with_discard(), nonzero!(1u64), @@ -395,6 +403,7 @@ async fn multiplexed(fb: FacebookInit) { let bs = MultiplexedBlobstore::new( MultiplexId::new(1), vec![(bid0, bs0.clone()), (bid1, bs1.clone())], + vec![], queue.clone(), ScubaSampleBuilder::with_discard(), nonzero!(1u64), @@ -476,6 +485,7 @@ async fn multiplexed_operation_keys(fb: FacebookInit) -> Result<(), Error> { (bid1, bs1.clone()), (bid2, bs2.clone()), ], + vec![], queue.clone(), ScubaSampleBuilder::with_discard(), nonzero!(1u64), @@ -519,6 +529,7 @@ async fn scrubbed(fb: FacebookInit) { let bs = ScrubBlobstore::new( MultiplexId::new(1), vec![(bid0, bs0.clone()), (bid1, bs1.clone())], + vec![], queue.clone(), ScubaSampleBuilder::with_discard(), nonzero!(1u64), @@ -587,6 +598,7 @@ async fn scrubbed(fb: FacebookInit) { let bs = ScrubBlobstore::new( MultiplexId::new(1), vec![(bid0, bs0.clone()), (bid1, bs1.clone())], + vec![], queue.clone(), ScubaSampleBuilder::with_discard(), nonzero!(1u64), @@ -670,6 +682,7 @@ async fn queue_waits(fb: FacebookInit) { (BlobstoreId::new(1), bs1.clone()), (BlobstoreId::new(2), bs2.clone()), ], + vec![], log.clone(), ScubaSampleBuilder::with_discard(), nonzero!(1u64), @@ -762,3 +775,257 @@ async fn queue_waits(fb: FacebookInit) { clear(); } } + +#[fbinit::test] +async fn write_mostly_get(fb: FacebookInit) { + let both_key = "both".to_string(); + let value = make_value("value"); + let write_mostly_key = "write_mostly".to_string(); + let main_bs = Arc::new(Tickable::new()); + let write_mostly_bs = Arc::new(Tickable::new()); + + let log = Arc::new(LogHandler::new()); + let bs = MultiplexedBlobstoreBase::new( + MultiplexId::new(1), + vec![(BlobstoreId::new(0), main_bs.clone())], + vec![(BlobstoreId::new(1), write_mostly_bs.clone())], + log.clone(), + ScubaSampleBuilder::with_discard(), + nonzero!(1u64), + ); + + let ctx = CoreContext::test_mock(fb); + + // Put one blob into both blobstores + main_bs.add_content(both_key.clone(), value.clone()); + write_mostly_bs.add_content(both_key.clone(), value.clone()); + // Put a blob only into the write mostly blobstore + write_mostly_bs.add_content(write_mostly_key.clone(), value.clone()); + + // Fetch the blob that's in both blobstores, see that the write mostly blobstore isn't being + // read from by ticking it + { + let mut fut = bs.get(ctx.clone(), both_key.clone()); + assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending()); + + // Ticking the write_mostly store does nothing. + for _ in 0..3usize { + write_mostly_bs.tick(None); + assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending()); + } + + // Tick the main store, and we're finished + main_bs.tick(None); + assert_eq!(fut.await.unwrap(), Some(value.clone().into())); + log.clear(); + } + + // Fetch the blob that's only in the write mostly blobstore, see it fetch correctly + { + let mut fut = bs.get(ctx.clone(), write_mostly_key); + assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending()); + + // Ticking the main store does nothing, as it lacks the blob + for _ in 0..3usize { + main_bs.tick(None); + assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending()); + } + + // Tick the write_mostly store, and we're finished + write_mostly_bs.tick(None); + assert_eq!(fut.await.unwrap(), Some(value.clone().into())); + log.clear(); + } + + // Fetch the blob that's in both blobstores, see that the write mostly blobstore + // is used when the main blobstore fails + { + let mut fut = bs.get(ctx.clone(), both_key); + assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending()); + + // Ticking the write_mostly store does nothing. + for _ in 0..3usize { + write_mostly_bs.tick(None); + assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending()); + } + + // Tick the main store, and we're still stuck + main_bs.tick(Some("Main blobstore failed - fallback to write_mostly")); + assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending()); + + // Finally, the write_mostly store returns our value + write_mostly_bs.tick(None); + assert_eq!(fut.await.unwrap(), Some(value.clone().into())); + log.clear(); + } +} + +#[fbinit::test] +async fn write_mostly_put(fb: FacebookInit) { + let main_bs = Arc::new(Tickable::new()); + let write_mostly_bs = Arc::new(Tickable::new()); + + let log = Arc::new(LogHandler::new()); + let bs = MultiplexedBlobstoreBase::new( + MultiplexId::new(1), + vec![(BlobstoreId::new(0), main_bs.clone())], + vec![(BlobstoreId::new(1), write_mostly_bs.clone())], + log.clone(), + ScubaSampleBuilder::with_discard(), + nonzero!(1u64), + ); + + let ctx = CoreContext::test_mock(fb); + + // succeed as soon as main succeeds. Fail write_mostly to confirm that we can still read. + { + let v0 = make_value("v0"); + let k0 = String::from("k0"); + + let mut put_fut = bs + .put(ctx.clone(), k0.clone(), v0.clone()) + .map_err(|_| ()) + .boxed(); + assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending); + main_bs.tick(None); + put_fut.await.unwrap(); + assert_eq!( + main_bs.storage.with(|s| s.get(&k0).cloned()), + Some(v0.clone()) + ); + assert!(write_mostly_bs.storage.with(|s| s.is_empty())); + write_mostly_bs.tick(Some("write_mostly_bs failed")); + assert!(log + .log + .with(|log| log == &vec![(BlobstoreId::new(0), k0.clone())])); + + // should succeed as it is stored in main_bs + let mut get_fut = bs.get(ctx.clone(), k0).map_err(|_| ()).boxed(); + assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending); + main_bs.tick(None); + write_mostly_bs.tick(None); + assert_eq!(get_fut.await.unwrap(), Some(v0.into())); + assert!(write_mostly_bs.storage.with(|s| s.is_empty())); + + main_bs.storage.with(|s| s.clear()); + write_mostly_bs.storage.with(|s| s.clear()); + log.clear(); + } + + // succeed as soon as write_mostly succeeds. Fail main to confirm we can still read + { + let v0 = make_value("v0"); + let k0 = String::from("k0"); + + let mut put_fut = bs + .put(ctx.clone(), k0.clone(), v0.clone()) + .map_err(|_| ()) + .boxed(); + assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending); + write_mostly_bs.tick(None); + put_fut.await.unwrap(); + assert_eq!( + write_mostly_bs.storage.with(|s| s.get(&k0).cloned()), + Some(v0.clone()) + ); + assert!(main_bs.storage.with(|s| s.is_empty())); + main_bs.tick(Some("main_bs failed")); + assert!(log + .log + .with(|log| log == &vec![(BlobstoreId::new(1), k0.clone())])); + + // should succeed as it is stored in write_mostly_bs, but main won't read + let mut get_fut = bs.get(ctx.clone(), k0).map_err(|_| ()).boxed(); + assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending); + main_bs.tick(None); + assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending); + write_mostly_bs.tick(None); + assert_eq!(get_fut.await.unwrap(), Some(v0.into())); + assert!(main_bs.storage.with(|s| s.is_empty())); + + main_bs.storage.with(|s| s.clear()); + write_mostly_bs.storage.with(|s| s.clear()); + log.clear(); + } + + // succeed if write_mostly succeeds and main fails + { + let v1 = make_value("v1"); + let k1 = String::from("k1"); + + let mut put_fut = bs + .put(ctx.clone(), k1.clone(), v1.clone()) + .map_err(|_| ()) + .boxed(); + assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending); + main_bs.tick(Some("case 2: main_bs failed")); + assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending); + write_mostly_bs.tick(None); + put_fut.await.unwrap(); + assert!(main_bs.storage.with(|s| s.get(&k1).is_none())); + assert_eq!( + write_mostly_bs.storage.with(|s| s.get(&k1).cloned()), + Some(v1.clone()) + ); + assert!(log + .log + .with(|log| log == &vec![(BlobstoreId::new(1), k1.clone())])); + + let mut get_fut = bs.get(ctx.clone(), k1.clone()).map_err(|_| ()).boxed(); + assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending); + main_bs.tick(None); + assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending); + write_mostly_bs.tick(None); + assert_eq!(get_fut.await.unwrap(), Some(v1.into())); + assert!(main_bs.storage.with(|s| s.get(&k1).is_none())); + + main_bs.storage.with(|s| s.clear()); + write_mostly_bs.storage.with(|s| s.clear()); + log.clear(); + } + + // both fail => whole put fail + { + let k2 = String::from("k2"); + let v2 = make_value("v2"); + + let mut put_fut = bs + .put(ctx.clone(), k2.clone(), v2.clone()) + .map_err(|_| ()) + .boxed(); + assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending); + main_bs.tick(Some("case 3: main_bs failed")); + assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending); + write_mostly_bs.tick(Some("case 3: write_mostly_bs failed")); + assert!(put_fut.await.is_err()); + } + + // both put succeed + { + let k4 = String::from("k4"); + let v4 = make_value("v4"); + main_bs.storage.with(|s| s.clear()); + write_mostly_bs.storage.with(|s| s.clear()); + log.clear(); + + let mut put_fut = bs + .put(ctx.clone(), k4.clone(), v4.clone()) + .map_err(|_| ()) + .boxed(); + assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending); + main_bs.tick(None); + put_fut.await.unwrap(); + assert_eq!( + main_bs.storage.with(|s| s.get(&k4).cloned()), + Some(v4.clone()) + ); + write_mostly_bs.tick(None); + while log.log.with(|log| log.len() != 2) { + tokio::task::yield_now().await; + } + assert_eq!( + write_mostly_bs.storage.with(|s| s.get(&k4).cloned()), + Some(v4.clone()) + ); + } +} diff --git a/eden/mononoke/cmds/admin/blobstore_fetch.rs b/eden/mononoke/cmds/admin/blobstore_fetch.rs index 640488399d..a093cbc535 100644 --- a/eden/mononoke/cmds/admin/blobstore_fetch.rs +++ b/eden/mononoke/cmds/admin/blobstore_fetch.rs @@ -102,7 +102,7 @@ fn get_blobconfig( let seeked_id = BlobstoreId::new(inner_blobstore_id); blobstores .into_iter() - .find_map(|(blobstore_id, blobstore)| { + .find_map(|(blobstore_id, _, blobstore)| { if blobstore_id == seeked_id { Some(blobstore) } else { diff --git a/eden/mononoke/cmds/blobstore_healer/main.rs b/eden/mononoke/cmds/blobstore_healer/main.rs index 6b98961939..33de28fee1 100644 --- a/eden/mononoke/cmds/blobstore_healer/main.rs +++ b/eden/mononoke/cmds/blobstore_healer/main.rs @@ -104,7 +104,7 @@ async fn maybe_schedule_healer_for_storage( let blobstores = blobstore_configs .into_iter() - .map(|(id, blobconfig)| async move { + .map(|(id, _, blobconfig)| async move { let blobstore = make_blobstore( fb, blobconfig, diff --git a/eden/mononoke/cmds/populate_healer.rs b/eden/mononoke/cmds/populate_healer.rs index e35214c0e4..1fed2d772c 100644 --- a/eden/mononoke/cmds/populate_healer.rs +++ b/eden/mononoke/cmds/populate_healer.rs @@ -229,8 +229,8 @@ fn parse_args(fb: FacebookInit) -> Result { }; let manifold_args = blobstores .iter() - .filter(|(id, _)| src_blobstore_id == *id) - .map(|(_, args)| args) + .filter(|(id, ..)| src_blobstore_id == *id) + .map(|(.., args)| args) .next() .ok_or(format_err!( "failed to find source blobstore id: {:?}", diff --git a/eden/mononoke/metaconfig/parser/src/config.rs b/eden/mononoke/metaconfig/parser/src/config.rs index 4d7ca52e85..46975af965 100644 --- a/eden/mononoke/metaconfig/parser/src/config.rs +++ b/eden/mononoke/metaconfig/parser/src/config.rs @@ -411,11 +411,11 @@ mod test { CommitSyncConfigVersion, CommitSyncDirection, DatabaseConfig, DefaultSmallToLargeCommitSyncPathAction, DerivedDataConfig, FilestoreParams, HookBypass, HookConfig, HookManagerParams, HookParams, InfinitepushNamespace, InfinitepushParams, - LfsParams, LocalDatabaseConfig, MetadataDatabaseConfig, MultiplexId, PushParams, - PushrebaseFlags, PushrebaseParams, RemoteDatabaseConfig, RemoteMetadataDatabaseConfig, - ShardableRemoteDatabaseConfig, ShardedRemoteDatabaseConfig, SmallRepoCommitSyncConfig, - SourceControlServiceMonitoring, SourceControlServiceParams, UnodeVersion, - WireprotoLoggingConfig, + LfsParams, LocalDatabaseConfig, MetadataDatabaseConfig, MultiplexId, MultiplexedStoreType, + PushParams, PushrebaseFlags, PushrebaseParams, RemoteDatabaseConfig, + RemoteMetadataDatabaseConfig, ShardableRemoteDatabaseConfig, ShardedRemoteDatabaseConfig, + SmallRepoCommitSyncConfig, SourceControlServiceMonitoring, SourceControlServiceParams, + UnodeVersion, WireprotoLoggingConfig, }; use mononoke_types::MPath; use nonzero_ext::nonzero; @@ -997,6 +997,7 @@ mod test { blobstores: vec![ ( BlobstoreId::new(0), + MultiplexedStoreType::Normal, BlobConfig::Manifold { bucket: "bucket".into(), prefix: "".into(), @@ -1004,6 +1005,7 @@ mod test { ), ( BlobstoreId::new(1), + MultiplexedStoreType::Normal, BlobConfig::Files { path: "/tmp/foo".into(), }, @@ -1436,7 +1438,7 @@ mod test { scuba_table: None, scuba_sample_rate: nonzero!(100u64), blobstores: vec![ - (BlobstoreId::new(1), BlobConfig::Files { + (BlobstoreId::new(1), MultiplexedStoreType::Normal, BlobConfig::Files { path: "/tmp/foo".into() }) ], @@ -1577,4 +1579,71 @@ mod test { assert!(res.is_err()); assert!(msg.contains("unknown keys in config parsing")); } + + #[fbinit::test] + fn test_multiplexed_store_types(fb: FacebookInit) { + const STORAGE: &str = r#" + [multiplex_store.metadata.remote] + primary = { db_address = "some_db" } + filenodes = { sharded = { shard_map = "some-shards", shard_num = 123 } } + + [multiplex_store.blobstore.multiplexed] + multiplex_id = 1 + components = [ + { blobstore_id = 1, blobstore = { blob_files = { path = "/tmp/foo1" } } }, + { blobstore_id = 2, store_type = { normal = {}}, blobstore = { blob_files = { path = "/tmp/foo2" } } }, + { blobstore_id = 3, store_type = { write_mostly = {}}, blobstore = { blob_files = { path = "/tmp/foo3" } } }, + ] + queue_db = { remote = { db_address = "queue_db_address" } } + "#; + + const REPO: &str = r#" + repoid = 123 + storage_config = "multiplex_store" + "#; + + let paths = btreemap! { + "common/storage.toml" => STORAGE, + "common/commitsyncmap.toml" => "", + "repos/test/server.toml" => REPO, + }; + + let tmp_dir = write_files(&paths); + let res = load_repo_configs(fb, tmp_dir.path()).expect("Read configs failed"); + + if let BlobConfig::Multiplexed { blobstores, .. } = + &res.repos["test"].storage_config.blobstore + { + let expected_blobstores = vec![ + ( + BlobstoreId::new(1), + MultiplexedStoreType::Normal, + BlobConfig::Files { + path: "/tmp/foo1".into(), + }, + ), + ( + BlobstoreId::new(2), + MultiplexedStoreType::Normal, + BlobConfig::Files { + path: "/tmp/foo2".into(), + }, + ), + ( + BlobstoreId::new(3), + MultiplexedStoreType::WriteMostly, + BlobConfig::Files { + path: "/tmp/foo3".into(), + }, + ), + ]; + + assert_eq!( + blobstores, &expected_blobstores, + "Blobstores parsed from config are wrong" + ); + } else { + panic!("Multiplexed config is not a multiplexed blobstore"); + } + } } diff --git a/eden/mononoke/metaconfig/parser/src/convert/storage.rs b/eden/mononoke/metaconfig/parser/src/convert/storage.rs index 304b86d4ee..038258f885 100644 --- a/eden/mononoke/metaconfig/parser/src/convert/storage.rs +++ b/eden/mononoke/metaconfig/parser/src/convert/storage.rs @@ -13,13 +13,15 @@ use std::time::Duration; use anyhow::{anyhow, Result}; use metaconfig_types::{ BlobConfig, BlobstoreId, DatabaseConfig, FilestoreParams, LocalDatabaseConfig, - MetadataDatabaseConfig, MultiplexId, RemoteDatabaseConfig, RemoteMetadataDatabaseConfig, - ShardableRemoteDatabaseConfig, ShardedRemoteDatabaseConfig, StorageConfig, + MetadataDatabaseConfig, MultiplexId, MultiplexedStoreType, RemoteDatabaseConfig, + RemoteMetadataDatabaseConfig, ShardableRemoteDatabaseConfig, ShardedRemoteDatabaseConfig, + StorageConfig, }; use nonzero_ext::nonzero; use repos::{ RawBlobstoreConfig, RawDbConfig, RawDbLocal, RawDbRemote, RawDbShardableRemote, - RawDbShardedRemote, RawFilestoreParams, RawMetadataConfig, RawStorageConfig, + RawDbShardedRemote, RawFilestoreParams, RawMetadataConfig, RawMultiplexedStoreType, + RawStorageConfig, }; use crate::convert::Convert; @@ -67,6 +69,9 @@ impl Convert for RawBlobstoreConfig { .map(|comp| { Ok(( BlobstoreId::new(comp.blobstore_id.try_into()?), + comp.store_type + .convert()? + .unwrap_or(MultiplexedStoreType::Normal), comp.blobstore.convert()?, )) }) @@ -209,3 +214,17 @@ impl Convert for RawFilestoreParams { }) } } + +impl Convert for RawMultiplexedStoreType { + type Output = MultiplexedStoreType; + + fn convert(self) -> Result { + match self { + RawMultiplexedStoreType::normal(_) => Ok(MultiplexedStoreType::Normal), + RawMultiplexedStoreType::write_mostly(_) => Ok(MultiplexedStoreType::WriteMostly), + RawMultiplexedStoreType::UnknownField(field) => { + Err(anyhow!("unknown store type {}", field)) + } + } + } +} diff --git a/eden/mononoke/metaconfig/types/src/lib.rs b/eden/mononoke/metaconfig/types/src/lib.rs index dd8874beff..1fdb030e4f 100644 --- a/eden/mononoke/metaconfig/types/src/lib.rs +++ b/eden/mononoke/metaconfig/types/src/lib.rs @@ -643,6 +643,16 @@ impl FromStr for ScrubAction { } } +/// Whether we should read from this blobstore normally in a Multiplex, +/// or only read from it in Scrub or when it's our last chance to find the blob +#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize)] +pub enum MultiplexedStoreType { + /// Normal operation, no special treatment + Normal, + /// Only read if Normal blobstores don't provide the blob. Writes go here as per normal + WriteMostly, +} + /// Configuration for a blobstore #[derive(Debug, Clone, Eq, PartialEq)] pub enum BlobConfig { @@ -680,7 +690,7 @@ pub enum BlobConfig { /// A scuba table I guess scuba_table: Option, /// Set of blobstores being multiplexed over - blobstores: Vec<(BlobstoreId, BlobConfig)>, + blobstores: Vec<(BlobstoreId, MultiplexedStoreType, BlobConfig)>, /// 1 in scuba_sample_rate samples will be logged. scuba_sample_rate: NonZeroU64, /// DB config to use for the sync queue @@ -693,7 +703,7 @@ pub enum BlobConfig { /// A scuba table I guess scuba_table: Option, /// Set of blobstores being multiplexed over - blobstores: Vec<(BlobstoreId, BlobConfig)>, + blobstores: Vec<(BlobstoreId, MultiplexedStoreType, BlobConfig)>, /// Whether to attempt repair scrub_action: ScrubAction, /// 1 in scuba_sample_rate samples will be logged. @@ -737,7 +747,7 @@ impl BlobConfig { Manifold { .. } | Mysql { .. } | ManifoldWithTtl { .. } => false, Multiplexed { blobstores, .. } | Scrub { blobstores, .. } => blobstores .iter() - .map(|(_, config)| config) + .map(|(_, _, config)| config) .all(BlobConfig::is_local), Logging { blobconfig, .. } => blobconfig.is_local(), Pack { blobconfig, .. } => blobconfig.is_local(), @@ -760,7 +770,7 @@ impl BlobConfig { { let scuba_table = mem::replace(scuba_table, None); let mut blobstores = mem::replace(blobstores, Vec::new()); - for (_, store) in blobstores.iter_mut() { + for (_, _, store) in blobstores.iter_mut() { store.set_scrubbed(scrub_action); } *self = Scrub { diff --git a/eden/mononoke/walker/src/blobstore.rs b/eden/mononoke/walker/src/blobstore.rs index 0eac27904b..31bbce953d 100644 --- a/eden/mononoke/walker/src/blobstore.rs +++ b/eden/mononoke/walker/src/blobstore.rs @@ -119,7 +119,7 @@ fn get_blobconfig( let seeked_id = BlobstoreId::new(inner_blobstore_id); blobstores .into_iter() - .find_map(|(blobstore_id, blobstore)| { + .find_map(|(blobstore_id, _, blobstore)| { if blobstore_id == seeked_id { Some(blobstore) } else { @@ -181,7 +181,7 @@ pub async fn open_blobstore( // Without this the new stats only show up when a repair is needed (i.e. as they get incremented), // which makes them harder to monitor on (no datapoints rather than a zero datapoint at start). for s in &[STATS::scrub_repaired, STATS::scrub_repair_required] { - for (id, _config) in &blobstores { + for (id, _ty, _config) in &blobstores { s.add_value(0, (walk_stats_key, id.to_string(), repo_stats_key.clone())); } }