mirror of
https://github.com/facebook/sapling.git
synced 2025-01-06 21:48:36 +03:00
(1/2) Rename write mostly to write only
Summary: **Context**: The write-mostly name is confusing and out of date. We never read from write-mostly blobstores (only on scrub). Let's rename it to write only to make it less confusing. See D41520772 for migration plan. This diff is the fbsource part of step (1): **Makes our code accept both configs as meaning write only** and does all the renames. The most important change is file `eden/mononoke/metaconfig/parser/src/convert/storage.rs`, which starts accepting `write-only` config as well as `write-mostly`. Reviewed By: mitrandir77 Differential Revision: D41520648 fbshipit-source-id: b0274c937ca7db10debb8c952a9a1867c0b431c1
This commit is contained in:
parent
762d4ea48d
commit
c8cbb1d847
@ -1,4 +1,4 @@
|
||||
// @generated SignedSource<<857212e4da3c7986c26f610bc0136f2d>>
|
||||
// @generated SignedSource<<613d90f8c0d525b247d0445f8b64608e>>
|
||||
// DO NOT EDIT THIS FILE MANUALLY!
|
||||
// This file is a mechanical copy of the version in the configerator repo. To
|
||||
// modify it, edit the copy in the configerator repo instead and copy it over by
|
||||
@ -372,17 +372,19 @@ union RawBlobstoreConfig {
|
||||
12: RawBlobstoreMultiplexedWal multiplexed_wal;
|
||||
}
|
||||
|
||||
// A write-mostly blobstore is one that is not read from in normal operation.
|
||||
// Mononoke will read from it in two cases:
|
||||
// A write-only blobstore is one that is not read from in normal operation.
|
||||
// Mononoke will read from it in these cases:
|
||||
// 1. Verifying that data is present in all blobstores (scrub etc)
|
||||
// 2. Where all "normal" (not write-mostly) blobstores fail to return a blob (error or missing)
|
||||
union RawMultiplexedStoreType {
|
||||
1: RawMultiplexedStoreNormal normal;
|
||||
// TODO: delete
|
||||
2: RawMultiplexedStoreWriteMostly write_mostly;
|
||||
3: RawMultiplexedStoreWriteOnly write_only;
|
||||
}
|
||||
|
||||
struct RawMultiplexedStoreNormal {}
|
||||
struct RawMultiplexedStoreNormal {} (rust.exhaustive)
|
||||
struct RawMultiplexedStoreWriteMostly {} (rust.exhaustive)
|
||||
struct RawMultiplexedStoreWriteOnly {} (rust.exhaustive)
|
||||
|
||||
struct RawBlobstoreIdConfig {
|
||||
1: i64 blobstore_id;
|
||||
|
@ -49,7 +49,7 @@ use multiplexedblob::ScrubAction;
|
||||
use multiplexedblob::ScrubBlobstore;
|
||||
use multiplexedblob::ScrubHandler;
|
||||
use multiplexedblob::ScrubOptions;
|
||||
use multiplexedblob::ScrubWriteMostly;
|
||||
use multiplexedblob::SrubWriteOnly;
|
||||
use multiplexedblob_wal::scrub::WalScrubBlobstore;
|
||||
use multiplexedblob_wal::Scuba as WalScuba;
|
||||
use multiplexedblob_wal::WalMultiplexedBlobstore;
|
||||
@ -140,12 +140,9 @@ impl BlobstoreOptions {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_scrub_action_on_missing_write_mostly(
|
||||
self,
|
||||
scrub_missing: ScrubWriteMostly,
|
||||
) -> Self {
|
||||
pub fn with_scrub_action_on_missing_write_only(self, scrub_missing: SrubWriteOnly) -> Self {
|
||||
if let Some(mut scrub_options) = self.scrub_options {
|
||||
scrub_options.scrub_action_on_missing_write_mostly = scrub_missing;
|
||||
scrub_options.scrub_action_on_missing_write_only = scrub_missing;
|
||||
Self {
|
||||
scrub_options: Some(scrub_options),
|
||||
..self
|
||||
@ -724,7 +721,7 @@ async fn make_blobstore_multiplexed<'a>(
|
||||
scrub_handler: &'a Arc<dyn ScrubHandler>,
|
||||
component_sampler: Option<&'a Arc<dyn ComponentSamplingHandler>>,
|
||||
) -> Result<Arc<dyn BlobstorePutOps>, Error> {
|
||||
let (normal_components, write_mostly_components) = setup_inner_blobstores(
|
||||
let (normal_components, write_only_components) = setup_inner_blobstores(
|
||||
fb,
|
||||
inner_config,
|
||||
mysql_options,
|
||||
@ -747,7 +744,7 @@ async fn make_blobstore_multiplexed<'a>(
|
||||
Some(scrub_options) => Arc::new(ScrubBlobstore::new(
|
||||
multiplex_id,
|
||||
normal_components,
|
||||
write_mostly_components,
|
||||
write_only_components,
|
||||
minimum_successful_writes,
|
||||
not_present_read_quorum,
|
||||
Arc::new(queue),
|
||||
@ -765,7 +762,7 @@ async fn make_blobstore_multiplexed<'a>(
|
||||
None => Arc::new(MultiplexedBlobstore::new(
|
||||
multiplex_id,
|
||||
normal_components,
|
||||
write_mostly_components,
|
||||
write_only_components,
|
||||
minimum_successful_writes,
|
||||
not_present_read_quorum,
|
||||
Arc::new(queue),
|
||||
@ -800,7 +797,7 @@ async fn make_multiplexed_wal<'a>(
|
||||
scrub_handler: &'a Arc<dyn ScrubHandler>,
|
||||
component_sampler: Option<&'a Arc<dyn ComponentSamplingHandler>>,
|
||||
) -> Result<Arc<dyn BlobstorePutOps>, Error> {
|
||||
let (normal_components, write_mostly_components) = setup_inner_blobstores(
|
||||
let (normal_components, write_only_components) = setup_inner_blobstores(
|
||||
fb,
|
||||
inner_config,
|
||||
mysql_options,
|
||||
@ -831,7 +828,7 @@ async fn make_multiplexed_wal<'a>(
|
||||
multiplex_id,
|
||||
wal_queue,
|
||||
normal_components,
|
||||
write_mostly_components,
|
||||
write_only_components,
|
||||
write_quorum,
|
||||
None, // use default timeouts
|
||||
scuba,
|
||||
@ -843,7 +840,7 @@ async fn make_multiplexed_wal<'a>(
|
||||
multiplex_id,
|
||||
wal_queue,
|
||||
normal_components,
|
||||
write_mostly_components,
|
||||
write_only_components,
|
||||
write_quorum,
|
||||
None, // use default timeouts
|
||||
scuba,
|
||||
@ -912,14 +909,12 @@ async fn setup_inner_blobstores<'a>(
|
||||
|
||||
// For now, `partition` could do this, but this will be easier to extend when we introduce more store types
|
||||
let mut normal_components = vec![];
|
||||
let mut write_mostly_components = vec![];
|
||||
let mut write_only_components = vec![];
|
||||
for (blobstore_id, store_type, store) in components.into_iter() {
|
||||
match store_type {
|
||||
MultiplexedStoreType::Normal => normal_components.push((blobstore_id, store)),
|
||||
MultiplexedStoreType::WriteMostly => {
|
||||
write_mostly_components.push((blobstore_id, store))
|
||||
}
|
||||
MultiplexedStoreType::WriteOnly => write_only_components.push((blobstore_id, store)),
|
||||
}
|
||||
}
|
||||
Ok((normal_components, write_mostly_components))
|
||||
Ok((normal_components, write_only_components))
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ pub use facebook::ManifoldArgs;
|
||||
pub use facebook::ManifoldOptions;
|
||||
pub use multiplexedblob::scrub::default_scrub_handler;
|
||||
pub use multiplexedblob::scrub::ScrubOptions;
|
||||
pub use multiplexedblob::scrub::ScrubWriteMostly;
|
||||
pub use multiplexedblob::scrub::SrubWriteOnly;
|
||||
pub use multiplexedblob::ScrubAction;
|
||||
pub use multiplexedblob::ScrubHandler;
|
||||
pub use packblob::PackOptions;
|
||||
|
@ -57,7 +57,7 @@ use tokio::time::timeout;
|
||||
use tunables::tunables;
|
||||
use twox_hash::XxHash;
|
||||
|
||||
use crate::scrub::ScrubWriteMostly;
|
||||
use crate::scrub::SrubWriteOnly;
|
||||
|
||||
const REQUEST_TIMEOUT: Duration = Duration::from_secs(600);
|
||||
const DEFAULT_IS_PRESENT_TIMEOUT_MS: i64 = 10000;
|
||||
@ -71,12 +71,12 @@ pub enum ErrorKind {
|
||||
#[error("Some blobstores failed, and other returned None: {main_errors:?}")]
|
||||
SomeFailedOthersNone {
|
||||
main_errors: Arc<BlobstoresReturnedError>,
|
||||
write_mostly_errors: Arc<BlobstoresReturnedError>,
|
||||
write_only_errors: Arc<BlobstoresReturnedError>,
|
||||
},
|
||||
#[error("All blobstores failed: {main_errors:?}")]
|
||||
AllFailed {
|
||||
main_errors: Arc<BlobstoresReturnedError>,
|
||||
write_mostly_errors: Arc<BlobstoresReturnedError>,
|
||||
write_only_errors: Arc<BlobstoresReturnedError>,
|
||||
},
|
||||
// Errors below this point are from ScrubBlobstore only. If they include an
|
||||
// Option<BlobstoreBytes>, this implies that this error is recoverable
|
||||
@ -87,7 +87,7 @@ pub enum ErrorKind {
|
||||
#[error("Some blobstores missing this item: {missing_main:?}")]
|
||||
SomeMissingItem {
|
||||
missing_main: Arc<BlobstoresReturnedNone>,
|
||||
missing_write_mostly: Arc<BlobstoresReturnedNone>,
|
||||
missing_write_only: Arc<BlobstoresReturnedNone>,
|
||||
value: BlobstoreGetData,
|
||||
},
|
||||
#[error("Multiple failures on put: {0:?}")]
|
||||
@ -120,13 +120,13 @@ pub struct MultiplexedBlobstoreBase {
|
||||
/// Write-mostly blobstores are not normally read from on `get`, but take part in writes
|
||||
/// like a normal blobstore.
|
||||
///
|
||||
/// There are two circumstances in which a write-mostly blobstore will be read from on `get`:
|
||||
/// There are two circumstances in which a write-only blobstore will be read from on `get`:
|
||||
/// 1. The normal blobstores (above) all return Ok(None) or Err for a blob.
|
||||
/// In this case, we read as it's our only chance of returning data that we previously accepted
|
||||
/// during a `put` operation.
|
||||
/// 2. When we're recording blobstore stats to Scuba on a `get` - in this case, the read executes
|
||||
/// solely to gather statistics, and the result is discarded
|
||||
write_mostly_blobstores: Arc<[(BlobstoreId, Arc<dyn BlobstorePutOps>)]>,
|
||||
write_only_blobstores: Arc<[(BlobstoreId, Arc<dyn BlobstorePutOps>)]>,
|
||||
/// `put` is considered successful if either this many `put` and `on_put` pairs succeeded or all puts were
|
||||
/// successful (regardless of whether `on_put`s were successful).
|
||||
/// This is meant to ensure that `put` fails if the data could end up lost (e.g. if a buggy experimental
|
||||
@ -148,15 +148,15 @@ impl std::fmt::Display for MultiplexedBlobstoreBase {
|
||||
.iter()
|
||||
.map(|(id, store)| (*id, store.to_string()))
|
||||
.collect();
|
||||
let write_mostly_blobstores: Vec<_> = self
|
||||
.write_mostly_blobstores
|
||||
let write_only_blobstores: Vec<_> = self
|
||||
.write_only_blobstores
|
||||
.iter()
|
||||
.map(|(id, store)| (*id, store.to_string()))
|
||||
.collect();
|
||||
write!(
|
||||
f,
|
||||
"Normal {:?}, write mostly {:?}",
|
||||
blobstores, write_mostly_blobstores
|
||||
"Normal {:?}, write only {:?}",
|
||||
blobstores, write_only_blobstores
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -164,42 +164,42 @@ impl std::fmt::Display for MultiplexedBlobstoreBase {
|
||||
fn blobstores_failed_error(
|
||||
main_blobstore_ids: impl Iterator<Item = BlobstoreId>,
|
||||
main_errors: HashMap<BlobstoreId, Error>,
|
||||
write_mostly_errors: HashMap<BlobstoreId, Error>,
|
||||
write_only_errors: HashMap<BlobstoreId, Error>,
|
||||
) -> ErrorKind {
|
||||
let main_errored_ids: HashSet<BlobstoreId> = main_errors.keys().copied().collect();
|
||||
let all_main_ids: HashSet<BlobstoreId> = main_blobstore_ids.collect();
|
||||
if main_errored_ids == all_main_ids {
|
||||
// The write mostly stores that returned None might not have been fully populated
|
||||
// The write only stores that returned None might not have been fully populated
|
||||
ErrorKind::AllFailed {
|
||||
main_errors: Arc::new(main_errors),
|
||||
write_mostly_errors: Arc::new(write_mostly_errors),
|
||||
write_only_errors: Arc::new(write_only_errors),
|
||||
}
|
||||
} else {
|
||||
ErrorKind::SomeFailedOthersNone {
|
||||
main_errors: Arc::new(main_errors),
|
||||
write_mostly_errors: Arc::new(write_mostly_errors),
|
||||
write_only_errors: Arc::new(write_only_errors),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type GetResult = (BlobstoreId, Result<Option<BlobstoreGetData>, Error>);
|
||||
|
||||
/// Get normal and write mostly results based on ScrubWriteMostly
|
||||
/// Get normal and write only results based on SrubWriteOnly
|
||||
/// mode of getting results, which might optimise for less access
|
||||
/// to main blobstores or less access to write mostly blobstores
|
||||
/// to main blobstores or less access to write only blobstores
|
||||
pub async fn scrub_get_results<MF, WF>(
|
||||
get_main_results: impl FnOnce() -> MF,
|
||||
mut get_write_mostly_results: impl FnMut() -> WF,
|
||||
write_mostly_blobstores: impl Iterator<Item = BlobstoreId>,
|
||||
write_mostly: ScrubWriteMostly,
|
||||
mut get_write_only_results: impl FnMut() -> WF,
|
||||
write_only_blobstores: impl Iterator<Item = BlobstoreId>,
|
||||
write_only: SrubWriteOnly,
|
||||
) -> impl Iterator<Item = (bool, GetResult)>
|
||||
where
|
||||
MF: Future<Output = Vec<GetResult>>,
|
||||
WF: Future<Output = Vec<GetResult>>,
|
||||
{
|
||||
// Exit early if all mostly-write are ok, and don't check main blobstores
|
||||
if write_mostly == ScrubWriteMostly::ScrubIfAbsent {
|
||||
let mut results = get_write_mostly_results().await.into_iter();
|
||||
if write_only == SrubWriteOnly::ScrubIfAbsent {
|
||||
let mut results = get_write_only_results().await.into_iter();
|
||||
if let Some((bs_id, Ok(Some(data)))) = results.next() {
|
||||
if results.all(|(_, r)| match r {
|
||||
Ok(Some(other_data)) => other_data == data,
|
||||
@ -210,24 +210,22 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
let write_mostly_results = async {
|
||||
match write_mostly {
|
||||
ScrubWriteMostly::Scrub | ScrubWriteMostly::SkipMissing => {
|
||||
get_write_mostly_results().await
|
||||
}
|
||||
ScrubWriteMostly::PopulateIfAbsent | ScrubWriteMostly::ScrubIfAbsent => {
|
||||
write_mostly_blobstores.map(|id| (id, Ok(None))).collect()
|
||||
let write_only_results = async {
|
||||
match write_only {
|
||||
SrubWriteOnly::Scrub | SrubWriteOnly::SkipMissing => get_write_only_results().await,
|
||||
SrubWriteOnly::PopulateIfAbsent | SrubWriteOnly::ScrubIfAbsent => {
|
||||
write_only_blobstores.map(|id| (id, Ok(None))).collect()
|
||||
}
|
||||
}
|
||||
};
|
||||
let (normal_results, write_mostly_results) =
|
||||
future::join(get_main_results(), write_mostly_results).await;
|
||||
let (normal_results, write_only_results) =
|
||||
future::join(get_main_results(), write_only_results).await;
|
||||
|
||||
Either::Right(
|
||||
normal_results
|
||||
.into_iter()
|
||||
.map(|r| (false, r))
|
||||
.chain(write_mostly_results.into_iter().map(|r| (true, r))),
|
||||
.chain(write_only_results.into_iter().map(|r| (true, r))),
|
||||
)
|
||||
}
|
||||
|
||||
@ -236,16 +234,16 @@ pub fn scrub_parse_results(
|
||||
all_main: impl Iterator<Item = BlobstoreId>,
|
||||
) -> Result<Option<BlobstoreGetData>, ErrorKind> {
|
||||
let mut missing_main = HashSet::new();
|
||||
let mut missing_write_mostly = HashSet::new();
|
||||
let mut missing_write_only = HashSet::new();
|
||||
let mut get_data = None;
|
||||
let mut main_errors = HashMap::new();
|
||||
let mut write_mostly_errors = HashMap::new();
|
||||
let mut write_only_errors = HashMap::new();
|
||||
|
||||
for (is_write_mostly, (blobstore_id, result)) in results {
|
||||
for (is_write_only, (blobstore_id, result)) in results {
|
||||
match result {
|
||||
Ok(None) => {
|
||||
if is_write_mostly {
|
||||
missing_write_mostly.insert(blobstore_id);
|
||||
if is_write_only {
|
||||
missing_write_only.insert(blobstore_id);
|
||||
} else {
|
||||
missing_main.insert(blobstore_id);
|
||||
}
|
||||
@ -261,8 +259,8 @@ pub fn scrub_parse_results(
|
||||
.insert(blobstore_id);
|
||||
}
|
||||
Err(err) => {
|
||||
if is_write_mostly {
|
||||
write_mostly_errors.insert(blobstore_id, err);
|
||||
if is_write_only {
|
||||
write_only_errors.insert(blobstore_id, err);
|
||||
} else {
|
||||
main_errors.insert(blobstore_id, err);
|
||||
}
|
||||
@ -271,24 +269,24 @@ pub fn scrub_parse_results(
|
||||
}
|
||||
match get_data {
|
||||
None => {
|
||||
if main_errors.is_empty() && write_mostly_errors.is_empty() {
|
||||
if main_errors.is_empty() && write_only_errors.is_empty() {
|
||||
Ok(None)
|
||||
} else {
|
||||
Err(blobstores_failed_error(
|
||||
all_main,
|
||||
main_errors,
|
||||
write_mostly_errors,
|
||||
write_only_errors,
|
||||
))
|
||||
}
|
||||
}
|
||||
Some((all_values, value)) if all_values.len() == 1 => {
|
||||
if missing_main.is_empty() && missing_write_mostly.is_empty() {
|
||||
if missing_main.is_empty() && missing_write_only.is_empty() {
|
||||
Ok(Some(value))
|
||||
} else {
|
||||
// This silently ignores failed blobstores if at least one has a value
|
||||
Err(ErrorKind::SomeMissingItem {
|
||||
missing_main: Arc::new(missing_main),
|
||||
missing_write_mostly: Arc::new(missing_write_mostly),
|
||||
missing_write_only: Arc::new(missing_write_only),
|
||||
value,
|
||||
})
|
||||
}
|
||||
@ -297,7 +295,7 @@ pub fn scrub_parse_results(
|
||||
let answered = all_values.into_iter().map(|(_, stores)| stores).collect();
|
||||
let mut all_missing = HashSet::new();
|
||||
all_missing.extend(missing_main.into_iter());
|
||||
all_missing.extend(missing_write_mostly.into_iter());
|
||||
all_missing.extend(missing_write_only.into_iter());
|
||||
Err(ErrorKind::ValueMismatch(
|
||||
Arc::new(answered),
|
||||
Arc::new(all_missing),
|
||||
@ -310,7 +308,7 @@ impl MultiplexedBlobstoreBase {
|
||||
pub fn new(
|
||||
multiplex_id: MultiplexId,
|
||||
blobstores: Vec<(BlobstoreId, Arc<dyn BlobstorePutOps>)>,
|
||||
write_mostly_blobstores: Vec<(BlobstoreId, Arc<dyn BlobstorePutOps>)>,
|
||||
write_only_blobstores: Vec<(BlobstoreId, Arc<dyn BlobstorePutOps>)>,
|
||||
minimum_successful_writes: NonZeroUsize,
|
||||
not_present_read_quorum: NonZeroUsize,
|
||||
handler: Arc<dyn MultiplexedBlobstorePutHandler>,
|
||||
@ -322,7 +320,7 @@ impl MultiplexedBlobstoreBase {
|
||||
Self {
|
||||
multiplex_id,
|
||||
blobstores: blobstores.into(),
|
||||
write_mostly_blobstores: write_mostly_blobstores.into(),
|
||||
write_only_blobstores: write_only_blobstores.into(),
|
||||
minimum_successful_writes,
|
||||
not_present_read_quorum,
|
||||
handler,
|
||||
@ -339,7 +337,7 @@ impl MultiplexedBlobstoreBase {
|
||||
&self,
|
||||
ctx: &CoreContext,
|
||||
key: &str,
|
||||
write_mostly: ScrubWriteMostly,
|
||||
write_only: SrubWriteOnly,
|
||||
) -> Result<Option<BlobstoreGetData>, ErrorKind> {
|
||||
let mut scuba = self.scuba.clone();
|
||||
scuba.sampled(self.scuba_sample_rate);
|
||||
@ -357,14 +355,14 @@ impl MultiplexedBlobstoreBase {
|
||||
|| {
|
||||
join_all(multiplexed_get(
|
||||
ctx,
|
||||
self.write_mostly_blobstores.as_ref(),
|
||||
self.write_only_blobstores.as_ref(),
|
||||
key,
|
||||
OperationType::ScrubGet,
|
||||
scuba.clone(),
|
||||
))
|
||||
},
|
||||
self.write_mostly_blobstores.iter().map(|(id, _)| *id),
|
||||
write_mostly,
|
||||
self.write_only_blobstores.iter().map(|(id, _)| *id),
|
||||
write_only,
|
||||
)
|
||||
.await;
|
||||
|
||||
@ -423,13 +421,13 @@ pub async fn inner_put(
|
||||
async fn blobstore_get<'a>(
|
||||
ctx: &'a CoreContext,
|
||||
blobstores: Arc<[(BlobstoreId, Arc<dyn BlobstorePutOps>)]>,
|
||||
write_mostly_blobstores: Arc<[(BlobstoreId, Arc<dyn BlobstorePutOps>)]>,
|
||||
write_only_blobstores: Arc<[(BlobstoreId, Arc<dyn BlobstorePutOps>)]>,
|
||||
not_present_read_quorum: NonZeroUsize,
|
||||
key: &'a str,
|
||||
scuba: MononokeScubaSampleBuilder,
|
||||
) -> Result<Option<BlobstoreGetData>, Error> {
|
||||
let is_logged = scuba.sampling().is_logged();
|
||||
let blobstores_count = blobstores.len() + write_mostly_blobstores.len();
|
||||
let blobstores_count = blobstores.len() + write_only_blobstores.len();
|
||||
let mut needed_not_present: usize = not_present_read_quorum.get();
|
||||
|
||||
if needed_not_present > blobstores_count {
|
||||
@ -443,7 +441,7 @@ async fn blobstore_get<'a>(
|
||||
let (stats, result) = {
|
||||
async move {
|
||||
let mut main_errors = HashMap::new();
|
||||
let mut write_mostly_errors = HashMap::new();
|
||||
let mut write_only_errors = HashMap::new();
|
||||
ctx.perf_counters()
|
||||
.increment_counter(PerfCounterType::BlobGets);
|
||||
|
||||
@ -455,9 +453,9 @@ async fn blobstore_get<'a>(
|
||||
scuba.clone(),
|
||||
)
|
||||
.collect();
|
||||
let write_mostly_requests: FuturesUnordered<_> = multiplexed_get(
|
||||
let write_only_requests: FuturesUnordered<_> = multiplexed_get(
|
||||
ctx.clone(),
|
||||
write_mostly_blobstores.as_ref(),
|
||||
write_only_blobstores.as_ref(),
|
||||
key.to_owned(),
|
||||
OperationType::Get,
|
||||
scuba,
|
||||
@ -465,16 +463,16 @@ async fn blobstore_get<'a>(
|
||||
.collect();
|
||||
|
||||
// `chain` here guarantees that `main_requests` is empty before it starts
|
||||
// polling anything in `write_mostly_requests`
|
||||
// polling anything in `write_only_requests`
|
||||
let mut requests = main_requests
|
||||
.map(|r| (false, r))
|
||||
.chain(write_mostly_requests.map(|r| (true, r)));
|
||||
while let Some((is_write_mostly, result)) = requests.next().await {
|
||||
.chain(write_only_requests.map(|r| (true, r)));
|
||||
while let Some((is_write_only, result)) = requests.next().await {
|
||||
match result {
|
||||
(_, Ok(Some(mut value))) => {
|
||||
if is_logged {
|
||||
// Allow the other requests to complete so that we can record some
|
||||
// metrics for the blobstore. This will also log metrics for write-mostly
|
||||
// metrics for the blobstore. This will also log metrics for write-only
|
||||
// blobstores, which helps us decide whether they're good
|
||||
spawn_stream_completion(requests);
|
||||
}
|
||||
@ -483,8 +481,8 @@ async fn blobstore_get<'a>(
|
||||
return Ok(Some(value));
|
||||
}
|
||||
(blobstore_id, Err(error)) => {
|
||||
if is_write_mostly {
|
||||
write_mostly_errors.insert(blobstore_id, error);
|
||||
if is_write_only {
|
||||
write_only_errors.insert(blobstore_id, error);
|
||||
} else {
|
||||
main_errors.insert(blobstore_id, error);
|
||||
}
|
||||
@ -501,20 +499,20 @@ async fn blobstore_get<'a>(
|
||||
}
|
||||
}
|
||||
|
||||
let error_count = main_errors.len() + write_mostly_errors.len();
|
||||
let error_count = main_errors.len() + write_only_errors.len();
|
||||
if error_count == 0 {
|
||||
// All blobstores must have returned None, as Some would have triggered a return,
|
||||
Ok(None)
|
||||
} else if error_count == blobstores_count {
|
||||
Err(ErrorKind::AllFailed {
|
||||
main_errors: Arc::new(main_errors),
|
||||
write_mostly_errors: Arc::new(write_mostly_errors),
|
||||
write_only_errors: Arc::new(write_only_errors),
|
||||
})
|
||||
} else {
|
||||
Err(blobstores_failed_error(
|
||||
blobstores.iter().map(|(id, _)| *id),
|
||||
main_errors,
|
||||
write_mostly_errors,
|
||||
write_only_errors,
|
||||
))
|
||||
}
|
||||
}
|
||||
@ -618,14 +616,14 @@ impl Blobstore for MultiplexedBlobstoreBase {
|
||||
) -> Result<Option<BlobstoreGetData>> {
|
||||
let mut scuba = self.scuba.clone();
|
||||
let blobstores = self.blobstores.clone();
|
||||
let write_mostly_blobstores = self.write_mostly_blobstores.clone();
|
||||
let write_only_blobstores = self.write_only_blobstores.clone();
|
||||
let not_present_read_quorum = self.not_present_read_quorum;
|
||||
scuba.sampled(self.scuba_sample_rate);
|
||||
|
||||
blobstore_get(
|
||||
ctx,
|
||||
blobstores,
|
||||
write_mostly_blobstores,
|
||||
write_only_blobstores,
|
||||
not_present_read_quorum,
|
||||
key,
|
||||
scuba,
|
||||
@ -641,7 +639,7 @@ impl Blobstore for MultiplexedBlobstoreBase {
|
||||
let mut scuba = self.scuba.clone();
|
||||
scuba.sampled(self.scuba_sample_rate);
|
||||
let is_logged = scuba.sampling().is_logged();
|
||||
let blobstores_count = self.blobstores.len() + self.write_mostly_blobstores.len();
|
||||
let blobstores_count = self.blobstores.len() + self.write_only_blobstores.len();
|
||||
let mut needed_not_present: usize = self.not_present_read_quorum.get();
|
||||
let comprehensive_lookup = matches!(
|
||||
ctx.session().session_class(),
|
||||
@ -656,9 +654,9 @@ impl Blobstore for MultiplexedBlobstoreBase {
|
||||
)
|
||||
.collect();
|
||||
|
||||
let write_mostly_requests: FuturesUnordered<_> = multiplexed_is_present(
|
||||
let write_only_requests: FuturesUnordered<_> = multiplexed_is_present(
|
||||
ctx.clone(),
|
||||
&self.write_mostly_blobstores.clone(),
|
||||
&self.write_only_blobstores.clone(),
|
||||
key.to_owned(),
|
||||
scuba,
|
||||
)
|
||||
@ -668,28 +666,28 @@ impl Blobstore for MultiplexedBlobstoreBase {
|
||||
// "comprehensive" and "regular"
|
||||
//
|
||||
// Comprehensive lookup requires presence in all the blobstores.
|
||||
// Regular lookup requires presence in at least one main or write mostly blobstore.
|
||||
// Regular lookup requires presence in at least one main or write only blobstore.
|
||||
|
||||
// `chain` here guarantees that `main_requests` is empty before it starts
|
||||
// polling anything in `write_mostly_requests`
|
||||
// polling anything in `write_only_requests`
|
||||
let mut requests = main_requests
|
||||
.map(|r| (false, r))
|
||||
.chain(write_mostly_requests.map(|r| (true, r)));
|
||||
.chain(write_only_requests.map(|r| (true, r)));
|
||||
let (stats, result) = {
|
||||
let blobstores = &self.blobstores;
|
||||
async move {
|
||||
let mut main_errors = HashMap::new();
|
||||
let mut write_mostly_errors = HashMap::new();
|
||||
let mut write_only_errors = HashMap::new();
|
||||
let mut present_counter = 0;
|
||||
ctx.perf_counters()
|
||||
.increment_counter(PerfCounterType::BlobPresenceChecks);
|
||||
while let Some((is_write_mostly, result)) = requests.next().await {
|
||||
while let Some((is_write_only, result)) = requests.next().await {
|
||||
match result {
|
||||
(_, Ok(BlobstoreIsPresent::Present)) => {
|
||||
if !comprehensive_lookup {
|
||||
if is_logged {
|
||||
// Allow the other requests to complete so that we can record some
|
||||
// metrics for the blobstore. This will also log metrics for write-mostly
|
||||
// metrics for the blobstore. This will also log metrics for write-only
|
||||
// blobstores, which helps us decide whether they're good
|
||||
spawn_stream_completion(requests);
|
||||
}
|
||||
@ -708,8 +706,8 @@ impl Blobstore for MultiplexedBlobstoreBase {
|
||||
}
|
||||
// is_present failed for the underlying blobstore
|
||||
(blobstore_id, Err(error)) => {
|
||||
if is_write_mostly {
|
||||
write_mostly_errors.insert(blobstore_id, error);
|
||||
if is_write_only {
|
||||
write_only_errors.insert(blobstore_id, error);
|
||||
} else {
|
||||
main_errors.insert(blobstore_id, error);
|
||||
}
|
||||
@ -719,8 +717,8 @@ impl Blobstore for MultiplexedBlobstoreBase {
|
||||
"Received 'ProbablyNotPresent' from the underlying blobstore"
|
||||
.to_string(),
|
||||
);
|
||||
if is_write_mostly {
|
||||
write_mostly_errors.insert(blobstore_id, err);
|
||||
if is_write_only {
|
||||
write_only_errors.insert(blobstore_id, err);
|
||||
} else {
|
||||
main_errors.insert(blobstore_id, err);
|
||||
}
|
||||
@ -730,14 +728,14 @@ impl Blobstore for MultiplexedBlobstoreBase {
|
||||
|
||||
if comprehensive_lookup {
|
||||
// all blobstores reported the blob is present
|
||||
if main_errors.is_empty() && write_mostly_errors.is_empty() {
|
||||
if main_errors.is_empty() && write_only_errors.is_empty() {
|
||||
Ok(BlobstoreIsPresent::Present)
|
||||
}
|
||||
// some blobstores reported the blob is present, others failed
|
||||
else if present_counter > 0 {
|
||||
let err = Error::from(ErrorKind::SomeFailedOthersNone {
|
||||
main_errors: Arc::new(main_errors),
|
||||
write_mostly_errors: Arc::new(write_mostly_errors),
|
||||
write_only_errors: Arc::new(write_only_errors),
|
||||
});
|
||||
Ok(BlobstoreIsPresent::ProbablyNotPresent(err))
|
||||
}
|
||||
@ -745,33 +743,33 @@ impl Blobstore for MultiplexedBlobstoreBase {
|
||||
else {
|
||||
Err(ErrorKind::AllFailed {
|
||||
main_errors: Arc::new(main_errors),
|
||||
write_mostly_errors: Arc::new(write_mostly_errors),
|
||||
write_only_errors: Arc::new(write_only_errors),
|
||||
})
|
||||
}
|
||||
} else {
|
||||
// all blobstores reported the blob is missing
|
||||
if main_errors.is_empty() && write_mostly_errors.is_empty() {
|
||||
if main_errors.is_empty() && write_only_errors.is_empty() {
|
||||
Ok(BlobstoreIsPresent::Absent)
|
||||
}
|
||||
// all blobstores failed
|
||||
else if main_errors.len() + write_mostly_errors.len() == blobstores_count {
|
||||
else if main_errors.len() + write_only_errors.len() == blobstores_count {
|
||||
Err(ErrorKind::AllFailed {
|
||||
main_errors: Arc::new(main_errors),
|
||||
write_mostly_errors: Arc::new(write_mostly_errors),
|
||||
write_only_errors: Arc::new(write_only_errors),
|
||||
})
|
||||
}
|
||||
// some blobstores reported the blob is missing, others failed
|
||||
else {
|
||||
let write_mostly_err = blobstores_failed_error(
|
||||
let write_only_err = blobstores_failed_error(
|
||||
blobstores.iter().map(|(id, _)| *id),
|
||||
main_errors,
|
||||
write_mostly_errors,
|
||||
write_only_errors,
|
||||
);
|
||||
if matches!(write_mostly_err, ErrorKind::SomeFailedOthersNone { .. }) {
|
||||
let err = Error::from(write_mostly_err);
|
||||
if matches!(write_only_err, ErrorKind::SomeFailedOthersNone { .. }) {
|
||||
let err = Error::from(write_only_err);
|
||||
Ok(BlobstoreIsPresent::ProbablyNotPresent(err))
|
||||
} else {
|
||||
Err(write_mostly_err)
|
||||
Err(write_only_err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -820,7 +818,7 @@ impl MultiplexedBlobstoreBase {
|
||||
let mut puts: FuturesUnordered<_> = self
|
||||
.blobstores
|
||||
.iter()
|
||||
.chain(self.write_mostly_blobstores.iter())
|
||||
.chain(self.write_only_blobstores.iter())
|
||||
.cloned()
|
||||
.map({
|
||||
|(blobstore_id, blobstore)| {
|
||||
|
@ -16,7 +16,7 @@ pub use crate::scrub::ScrubAction;
|
||||
pub use crate::scrub::ScrubBlobstore;
|
||||
pub use crate::scrub::ScrubHandler;
|
||||
pub use crate::scrub::ScrubOptions;
|
||||
pub use crate::scrub::ScrubWriteMostly;
|
||||
pub use crate::scrub::SrubWriteOnly;
|
||||
|
||||
#[cfg(test)]
|
||||
mod test;
|
||||
|
@ -54,7 +54,7 @@ impl MultiplexedBlobstore {
|
||||
pub fn new(
|
||||
multiplex_id: MultiplexId,
|
||||
blobstores: Vec<(BlobstoreId, Arc<dyn BlobstorePutOps>)>,
|
||||
write_mostly_blobstores: Vec<(BlobstoreId, Arc<dyn BlobstorePutOps>)>,
|
||||
write_only_blobstores: Vec<(BlobstoreId, Arc<dyn BlobstorePutOps>)>,
|
||||
minimum_successful_writes: NonZeroUsize,
|
||||
not_present_read_quorum: NonZeroUsize,
|
||||
queue: Arc<dyn BlobstoreSyncQueue>,
|
||||
@ -70,7 +70,7 @@ impl MultiplexedBlobstore {
|
||||
blobstore: Arc::new(MultiplexedBlobstoreBase::new(
|
||||
multiplex_id,
|
||||
blobstores,
|
||||
write_mostly_blobstores,
|
||||
write_only_blobstores,
|
||||
minimum_successful_writes,
|
||||
not_present_read_quorum,
|
||||
put_handler,
|
||||
|
@ -71,7 +71,7 @@ pub enum ScrubAction {
|
||||
Repair,
|
||||
}
|
||||
|
||||
// How to treat write mostly stores during the scrub
|
||||
// How to treat write only stores during the scrub
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
@ -84,16 +84,16 @@ pub enum ScrubAction {
|
||||
EnumVariantNames,
|
||||
IntoStaticStr
|
||||
)]
|
||||
pub enum ScrubWriteMostly {
|
||||
/// don't take action on scrub missing keys from write mostly stores
|
||||
pub enum SrubWriteOnly {
|
||||
/// don't take action on scrub missing keys from write only stores
|
||||
SkipMissing,
|
||||
/// take the normal scrub action for write mostly stores
|
||||
/// take the normal scrub action for write only stores
|
||||
Scrub,
|
||||
/// Mode for populating empty stores. Assumes its already missing. Don't attempt to read. Write with IfAbsent so won't overwrite if run incorrectluy.
|
||||
/// More efficient than the above if thes store is totally empty.
|
||||
PopulateIfAbsent,
|
||||
/// Mode for rescrubbing write-mostly stores before enabling them. Assumes that the data in them is correct,
|
||||
/// and won't read from the main stores unless the write-mostly stores have missing data or read failures
|
||||
/// Mode for rescrubbing write-only stores before enabling them. Assumes that the data in them is correct,
|
||||
/// and won't read from the main stores unless the write-only stores have missing data or read failures
|
||||
/// This ensures that load on the main stores is kept to a minimum
|
||||
ScrubIfAbsent,
|
||||
}
|
||||
@ -102,7 +102,7 @@ pub enum ScrubWriteMostly {
|
||||
pub struct ScrubOptions {
|
||||
pub scrub_action: ScrubAction,
|
||||
pub scrub_grace: Option<Duration>,
|
||||
pub scrub_action_on_missing_write_mostly: ScrubWriteMostly,
|
||||
pub scrub_action_on_missing_write_only: SrubWriteOnly,
|
||||
pub queue_peek_bound: Duration,
|
||||
}
|
||||
|
||||
@ -111,7 +111,7 @@ impl Default for ScrubOptions {
|
||||
Self {
|
||||
scrub_action: ScrubAction::ReportOnly,
|
||||
scrub_grace: None,
|
||||
scrub_action_on_missing_write_mostly: ScrubWriteMostly::Scrub,
|
||||
scrub_action_on_missing_write_only: SrubWriteOnly::Scrub,
|
||||
queue_peek_bound: *HEAL_MAX_BACKLOG,
|
||||
}
|
||||
}
|
||||
@ -189,7 +189,7 @@ impl ScrubBlobstore {
|
||||
pub fn new(
|
||||
multiplex_id: MultiplexId,
|
||||
blobstores: Vec<(BlobstoreId, Arc<dyn BlobstorePutOps>)>,
|
||||
write_mostly_blobstores: Vec<(BlobstoreId, Arc<dyn BlobstorePutOps>)>,
|
||||
write_only_blobstores: Vec<(BlobstoreId, Arc<dyn BlobstorePutOps>)>,
|
||||
minimum_successful_writes: NonZeroUsize,
|
||||
not_present_read_quorum: NonZeroUsize,
|
||||
queue: Arc<dyn BlobstoreSyncQueue>,
|
||||
@ -203,7 +203,7 @@ impl ScrubBlobstore {
|
||||
let inner = MultiplexedBlobstore::new(
|
||||
multiplex_id,
|
||||
blobstores.clone(),
|
||||
write_mostly_blobstores.clone(),
|
||||
write_only_blobstores.clone(),
|
||||
minimum_successful_writes,
|
||||
not_present_read_quorum,
|
||||
queue.clone(),
|
||||
@ -218,7 +218,7 @@ impl ScrubBlobstore {
|
||||
scrub_stores: Arc::new(
|
||||
blobstores
|
||||
.into_iter()
|
||||
.chain(write_mostly_blobstores.into_iter())
|
||||
.chain(write_only_blobstores.into_iter())
|
||||
.collect(),
|
||||
),
|
||||
queue,
|
||||
@ -267,7 +267,7 @@ pub async fn maybe_repair<F: Future<Output = Result<bool>>>(
|
||||
key: &str,
|
||||
value: BlobstoreGetData,
|
||||
missing_main: Arc<HashSet<BlobstoreId>>,
|
||||
missing_write_mostly: Arc<HashSet<BlobstoreId>>,
|
||||
missing_write_only: Arc<HashSet<BlobstoreId>>,
|
||||
scrub_stores: &HashMap<BlobstoreId, Arc<dyn BlobstorePutOps>>,
|
||||
scrub_handler: &dyn ScrubHandler,
|
||||
scrub_options: &ScrubOptions,
|
||||
@ -290,9 +290,9 @@ pub async fn maybe_repair<F: Future<Output = Result<bool>>>(
|
||||
let mut needs_repair: HashMap<BlobstoreId, (PutBehaviour, &dyn BlobstorePutOps)> =
|
||||
HashMap::new();
|
||||
|
||||
// For write mostly stores we can chose not to do the scrub action
|
||||
// For write only stores we can chose not to do the scrub action
|
||||
// e.g. if store is still being populated, a checking scrub wouldn't want to raise alarm on the store
|
||||
if scrub_options.scrub_action_on_missing_write_mostly != ScrubWriteMostly::SkipMissing
|
||||
if scrub_options.scrub_action_on_missing_write_only != SrubWriteOnly::SkipMissing
|
||||
|| !missing_main.is_empty()
|
||||
{
|
||||
// Only peek the queue if needed
|
||||
@ -313,12 +313,12 @@ pub async fn maybe_repair<F: Future<Output = Result<bool>>>(
|
||||
needs_repair.insert(*k, (PutBehaviour::Overwrite, s.as_ref()));
|
||||
}
|
||||
}
|
||||
for k in missing_write_mostly.iter() {
|
||||
for k in missing_write_only.iter() {
|
||||
if let Some(s) = scrub_stores.get(k) {
|
||||
let put_behaviour = match scrub_options.scrub_action_on_missing_write_mostly {
|
||||
ScrubWriteMostly::SkipMissing => None,
|
||||
ScrubWriteMostly::Scrub => Some(PutBehaviour::Overwrite),
|
||||
ScrubWriteMostly::PopulateIfAbsent | ScrubWriteMostly::ScrubIfAbsent => {
|
||||
let put_behaviour = match scrub_options.scrub_action_on_missing_write_only {
|
||||
SrubWriteOnly::SkipMissing => None,
|
||||
SrubWriteOnly::Scrub => Some(PutBehaviour::Overwrite),
|
||||
SrubWriteOnly::PopulateIfAbsent | SrubWriteOnly::ScrubIfAbsent => {
|
||||
Some(PutBehaviour::IfAbsent)
|
||||
}
|
||||
};
|
||||
@ -371,7 +371,7 @@ async fn blobstore_get(
|
||||
scuba: &MononokeScubaSampleBuilder,
|
||||
) -> Result<Option<BlobstoreGetData>> {
|
||||
match inner_blobstore
|
||||
.scrub_get(ctx, key, scrub_options.scrub_action_on_missing_write_mostly)
|
||||
.scrub_get(ctx, key, scrub_options.scrub_action_on_missing_write_only)
|
||||
.await
|
||||
{
|
||||
Ok(value) => Ok(value),
|
||||
@ -390,7 +390,7 @@ async fn blobstore_get(
|
||||
}
|
||||
ErrorKind::SomeMissingItem {
|
||||
missing_main,
|
||||
missing_write_mostly,
|
||||
missing_write_only,
|
||||
value,
|
||||
} => {
|
||||
maybe_repair(
|
||||
@ -398,7 +398,7 @@ async fn blobstore_get(
|
||||
key,
|
||||
value,
|
||||
missing_main,
|
||||
missing_write_mostly,
|
||||
missing_write_only,
|
||||
scrub_stores,
|
||||
scrub_handler,
|
||||
scrub_options,
|
||||
|
@ -62,7 +62,7 @@ use crate::scrub::ScrubAction;
|
||||
use crate::scrub::ScrubBlobstore;
|
||||
use crate::scrub::ScrubHandler;
|
||||
use crate::scrub::ScrubOptions;
|
||||
use crate::scrub::ScrubWriteMostly;
|
||||
use crate::scrub::SrubWriteOnly;
|
||||
|
||||
#[async_trait]
|
||||
impl MultiplexedBlobstorePutHandler for Tickable<BlobstoreId> {
|
||||
@ -166,7 +166,7 @@ impl<'a, F: Future + Unpin> Future for PollOnce<'a, F> {
|
||||
|
||||
async fn scrub_none(
|
||||
fb: FacebookInit,
|
||||
scrub_action_on_missing_write_mostly: ScrubWriteMostly,
|
||||
scrub_action_on_missing_write_only: SrubWriteOnly,
|
||||
) -> Result<()> {
|
||||
let bid0 = BlobstoreId::new(0);
|
||||
let bs0 = Arc::new(Tickable::new());
|
||||
@ -190,7 +190,7 @@ async fn scrub_none(
|
||||
MononokeScubaSampleBuilder::with_discard(),
|
||||
nonzero!(1u64),
|
||||
ScrubOptions {
|
||||
scrub_action_on_missing_write_mostly,
|
||||
scrub_action_on_missing_write_only,
|
||||
..ScrubOptions::default()
|
||||
},
|
||||
Arc::new(LoggingScrubHandler::new(false)) as Arc<dyn ScrubHandler>,
|
||||
@ -224,9 +224,9 @@ async fn scrub_none(
|
||||
|
||||
#[fbinit::test]
|
||||
async fn scrub_blobstore_fetch_none(fb: FacebookInit) -> Result<()> {
|
||||
scrub_none(fb, ScrubWriteMostly::Scrub).await?;
|
||||
scrub_none(fb, ScrubWriteMostly::SkipMissing).await?;
|
||||
scrub_none(fb, ScrubWriteMostly::PopulateIfAbsent).await
|
||||
scrub_none(fb, SrubWriteOnly::Scrub).await?;
|
||||
scrub_none(fb, SrubWriteOnly::SkipMissing).await?;
|
||||
scrub_none(fb, SrubWriteOnly::PopulateIfAbsent).await
|
||||
}
|
||||
|
||||
#[fbinit::test]
|
||||
@ -860,7 +860,7 @@ async fn multiplexed_blob_size(fb: FacebookInit) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn scrub_scenarios(fb: FacebookInit, scrub_action_on_missing_write_mostly: ScrubWriteMostly) {
|
||||
async fn scrub_scenarios(fb: FacebookInit, scrub_action_on_missing_write_only: SrubWriteOnly) {
|
||||
let ctx = CoreContext::test_mock(fb);
|
||||
borrowed!(ctx);
|
||||
let queue = Arc::new(SqlBlobstoreSyncQueue::with_sqlite_in_memory().unwrap());
|
||||
@ -883,7 +883,7 @@ async fn scrub_scenarios(fb: FacebookInit, scrub_action_on_missing_write_mostly:
|
||||
nonzero!(1u64),
|
||||
ScrubOptions {
|
||||
scrub_action: ScrubAction::ReportOnly,
|
||||
scrub_action_on_missing_write_mostly,
|
||||
scrub_action_on_missing_write_only,
|
||||
..ScrubOptions::default()
|
||||
},
|
||||
scrub_handler.clone(),
|
||||
@ -905,7 +905,7 @@ async fn scrub_scenarios(fb: FacebookInit, scrub_action_on_missing_write_mostly:
|
||||
assert_eq!(get_fut.await.unwrap(), None, "SomeNone + Err expected None");
|
||||
}
|
||||
|
||||
// non-existing key when one write mostly blobstore failing
|
||||
// non-existing key when one write only blobstore failing
|
||||
{
|
||||
let k0 = "k0";
|
||||
|
||||
@ -985,7 +985,7 @@ async fn scrub_scenarios(fb: FacebookInit, scrub_action_on_missing_write_mostly:
|
||||
nonzero!(1u64),
|
||||
ScrubOptions {
|
||||
scrub_action: ScrubAction::Repair,
|
||||
scrub_action_on_missing_write_mostly,
|
||||
scrub_action_on_missing_write_only,
|
||||
..ScrubOptions::default()
|
||||
},
|
||||
scrub_handler.clone(),
|
||||
@ -1034,7 +1034,7 @@ async fn scrub_scenarios(fb: FacebookInit, scrub_action_on_missing_write_mostly:
|
||||
nonzero!(1u64),
|
||||
ScrubOptions {
|
||||
scrub_action: ScrubAction::Repair,
|
||||
scrub_action_on_missing_write_mostly,
|
||||
scrub_action_on_missing_write_only,
|
||||
queue_peek_bound: Duration::from_secs(7200),
|
||||
..ScrubOptions::default()
|
||||
},
|
||||
@ -1059,7 +1059,7 @@ async fn scrub_scenarios(fb: FacebookInit, scrub_action_on_missing_write_mostly:
|
||||
bs0.tick(None);
|
||||
assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending);
|
||||
bs1.tick(None);
|
||||
if scrub_action_on_missing_write_mostly != ScrubWriteMostly::PopulateIfAbsent {
|
||||
if scrub_action_on_missing_write_only != SrubWriteOnly::PopulateIfAbsent {
|
||||
// this read doesn't happen in this mode
|
||||
bs2.tick(None);
|
||||
}
|
||||
@ -1101,7 +1101,7 @@ async fn scrub_scenarios(fb: FacebookInit, scrub_action_on_missing_write_mostly:
|
||||
bs0.tick(None);
|
||||
assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending);
|
||||
bs1.tick(None);
|
||||
if scrub_action_on_missing_write_mostly != ScrubWriteMostly::PopulateIfAbsent {
|
||||
if scrub_action_on_missing_write_only != SrubWriteOnly::PopulateIfAbsent {
|
||||
// this read doesn't happen in this mode
|
||||
bs2.tick(None);
|
||||
}
|
||||
@ -1115,13 +1115,13 @@ async fn scrub_scenarios(fb: FacebookInit, scrub_action_on_missing_write_mostly:
|
||||
// Now all populated.
|
||||
assert_eq!(bs0.get_bytes(k1), Some(v1.clone()));
|
||||
assert_eq!(bs1.get_bytes(k1), Some(v1.clone()));
|
||||
match scrub_action_on_missing_write_mostly {
|
||||
ScrubWriteMostly::Scrub
|
||||
| ScrubWriteMostly::PopulateIfAbsent
|
||||
| ScrubWriteMostly::ScrubIfAbsent => {
|
||||
match scrub_action_on_missing_write_only {
|
||||
SrubWriteOnly::Scrub
|
||||
| SrubWriteOnly::PopulateIfAbsent
|
||||
| SrubWriteOnly::ScrubIfAbsent => {
|
||||
assert_eq!(bs2.get_bytes(k1), Some(v1.clone()))
|
||||
}
|
||||
ScrubWriteMostly::SkipMissing => {
|
||||
SrubWriteOnly::SkipMissing => {
|
||||
assert_eq!(bs2.get_bytes(k1), None)
|
||||
}
|
||||
}
|
||||
@ -1130,9 +1130,9 @@ async fn scrub_scenarios(fb: FacebookInit, scrub_action_on_missing_write_mostly:
|
||||
|
||||
#[fbinit::test]
|
||||
async fn scrubbed(fb: FacebookInit) {
|
||||
scrub_scenarios(fb, ScrubWriteMostly::Scrub).await;
|
||||
scrub_scenarios(fb, ScrubWriteMostly::SkipMissing).await;
|
||||
scrub_scenarios(fb, ScrubWriteMostly::PopulateIfAbsent).await;
|
||||
scrub_scenarios(fb, SrubWriteOnly::Scrub).await;
|
||||
scrub_scenarios(fb, SrubWriteOnly::SkipMissing).await;
|
||||
scrub_scenarios(fb, SrubWriteOnly::PopulateIfAbsent).await;
|
||||
}
|
||||
|
||||
#[fbinit::test]
|
||||
@ -1237,19 +1237,19 @@ async fn queue_waits(fb: FacebookInit) {
|
||||
}
|
||||
|
||||
#[fbinit::test]
|
||||
async fn write_mostly_get(fb: FacebookInit) {
|
||||
async fn write_only_get(fb: FacebookInit) {
|
||||
let both_key = "both";
|
||||
let value = make_value("value");
|
||||
let write_mostly_key = "write_mostly";
|
||||
let write_only_key = "write_only";
|
||||
let main_only_key = "main_only";
|
||||
let main_bs = Arc::new(Tickable::new());
|
||||
let write_mostly_bs = Arc::new(Tickable::new());
|
||||
let write_only_bs = Arc::new(Tickable::new());
|
||||
|
||||
let log = Arc::new(LogHandler::new());
|
||||
let bs = MultiplexedBlobstoreBase::new(
|
||||
MultiplexId::new(1),
|
||||
vec![(BlobstoreId::new(0), main_bs.clone())],
|
||||
vec![(BlobstoreId::new(1), write_mostly_bs.clone())],
|
||||
vec![(BlobstoreId::new(1), write_only_bs.clone())],
|
||||
nonzero!(1usize),
|
||||
nonzero!(2usize),
|
||||
log.clone(),
|
||||
@ -1263,19 +1263,19 @@ async fn write_mostly_get(fb: FacebookInit) {
|
||||
// Put one blob into both blobstores
|
||||
main_bs.add_bytes(both_key.to_owned(), value.clone());
|
||||
main_bs.add_bytes(main_only_key.to_owned(), value.clone());
|
||||
write_mostly_bs.add_bytes(both_key.to_owned(), value.clone());
|
||||
// Put a blob only into the write mostly blobstore
|
||||
write_mostly_bs.add_bytes(write_mostly_key.to_owned(), value.clone());
|
||||
write_only_bs.add_bytes(both_key.to_owned(), value.clone());
|
||||
// Put a blob only into the write only blobstore
|
||||
write_only_bs.add_bytes(write_only_key.to_owned(), value.clone());
|
||||
|
||||
// Fetch the blob that's in both blobstores, see that the write mostly blobstore isn't being
|
||||
// Fetch the blob that's in both blobstores, see that the write only blobstore isn't being
|
||||
// read from by ticking it
|
||||
{
|
||||
let mut fut = bs.get(ctx, both_key);
|
||||
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
|
||||
|
||||
// Ticking the write_mostly store does nothing.
|
||||
// Ticking the write_only store does nothing.
|
||||
for _ in 0..3usize {
|
||||
write_mostly_bs.tick(None);
|
||||
write_only_bs.tick(None);
|
||||
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
|
||||
}
|
||||
|
||||
@ -1285,9 +1285,9 @@ async fn write_mostly_get(fb: FacebookInit) {
|
||||
log.clear();
|
||||
}
|
||||
|
||||
// Fetch the blob that's only in the write mostly blobstore, see it fetch correctly
|
||||
// Fetch the blob that's only in the write only blobstore, see it fetch correctly
|
||||
{
|
||||
let mut fut = bs.get(ctx, write_mostly_key);
|
||||
let mut fut = bs.get(ctx, write_only_key);
|
||||
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
|
||||
|
||||
// Ticking the main store does nothing, as it lacks the blob
|
||||
@ -1296,71 +1296,71 @@ async fn write_mostly_get(fb: FacebookInit) {
|
||||
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
|
||||
}
|
||||
|
||||
// Tick the write_mostly store, and we're finished
|
||||
write_mostly_bs.tick(None);
|
||||
// Tick the write_only store, and we're finished
|
||||
write_only_bs.tick(None);
|
||||
assert_eq!(fut.await.unwrap(), Some(value.clone().into()));
|
||||
log.clear();
|
||||
}
|
||||
|
||||
// Fetch the blob that's in both blobstores, see that the write mostly blobstore
|
||||
// Fetch the blob that's in both blobstores, see that the write only blobstore
|
||||
// is used when the main blobstore fails
|
||||
{
|
||||
let mut fut = bs.get(ctx, both_key);
|
||||
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
|
||||
|
||||
// Ticking the write_mostly store does nothing.
|
||||
// Ticking the write_only store does nothing.
|
||||
for _ in 0..3usize {
|
||||
write_mostly_bs.tick(None);
|
||||
write_only_bs.tick(None);
|
||||
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
|
||||
}
|
||||
|
||||
// Tick the main store, and we're still stuck
|
||||
main_bs.tick(Some("Main blobstore failed - fallback to write_mostly"));
|
||||
main_bs.tick(Some("Main blobstore failed - fallback to write_only"));
|
||||
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
|
||||
|
||||
// Finally, the write_mostly store returns our value
|
||||
write_mostly_bs.tick(None);
|
||||
// Finally, the write_only store returns our value
|
||||
write_only_bs.tick(None);
|
||||
assert_eq!(fut.await.unwrap(), Some(value.clone().into()));
|
||||
log.clear();
|
||||
}
|
||||
|
||||
// Fetch the blob that's in main blobstores, see that the write mostly blobstore
|
||||
// Fetch the blob that's in main blobstores, see that the write only blobstore
|
||||
// None value is not used when the main blobstore fails
|
||||
{
|
||||
let mut fut = bs.get(ctx, main_only_key);
|
||||
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
|
||||
|
||||
// Ticking the write_mostly store does nothing.
|
||||
// Ticking the write_only store does nothing.
|
||||
for _ in 0..3usize {
|
||||
write_mostly_bs.tick(None);
|
||||
write_only_bs.tick(None);
|
||||
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
|
||||
}
|
||||
|
||||
// Tick the main store, and we're still stuck
|
||||
main_bs.tick(Some("Main blobstore failed - fallback to write_mostly"));
|
||||
main_bs.tick(Some("Main blobstore failed - fallback to write_only"));
|
||||
assert!(PollOnce::new(Pin::new(&mut fut)).await.is_pending());
|
||||
|
||||
// Finally, should get an error as None from a write mostly is indeterminate
|
||||
// Finally, should get an error as None from a write only is indeterminate
|
||||
// as it might not have been fully populated yet
|
||||
write_mostly_bs.tick(None);
|
||||
write_only_bs.tick(None);
|
||||
assert_eq!(
|
||||
fut.await.err().unwrap().to_string().as_str(),
|
||||
"All blobstores failed: {BlobstoreId(0): Main blobstore failed - fallback to write_mostly}"
|
||||
"All blobstores failed: {BlobstoreId(0): Main blobstore failed - fallback to write_only}"
|
||||
);
|
||||
log.clear();
|
||||
}
|
||||
}
|
||||
|
||||
#[fbinit::test]
|
||||
async fn write_mostly_put(fb: FacebookInit) {
|
||||
async fn write_only_put(fb: FacebookInit) {
|
||||
let main_bs = Arc::new(Tickable::new());
|
||||
let write_mostly_bs = Arc::new(Tickable::new());
|
||||
let write_only_bs = Arc::new(Tickable::new());
|
||||
|
||||
let log = Arc::new(LogHandler::new());
|
||||
let bs = MultiplexedBlobstoreBase::new(
|
||||
MultiplexId::new(1),
|
||||
vec![(BlobstoreId::new(0), main_bs.clone())],
|
||||
vec![(BlobstoreId::new(1), write_mostly_bs.clone())],
|
||||
vec![(BlobstoreId::new(1), write_only_bs.clone())],
|
||||
nonzero!(1usize),
|
||||
nonzero!(2usize),
|
||||
log.clone(),
|
||||
@ -1371,7 +1371,7 @@ async fn write_mostly_put(fb: FacebookInit) {
|
||||
let ctx = CoreContext::test_mock(fb);
|
||||
borrowed!(ctx);
|
||||
|
||||
// succeed as soon as main succeeds. Fail write_mostly to confirm that we can still read.
|
||||
// succeed as soon as main succeeds. Fail write_only to confirm that we can still read.
|
||||
{
|
||||
let v0 = make_value("v0");
|
||||
let k0 = "k0";
|
||||
@ -1384,8 +1384,8 @@ async fn write_mostly_put(fb: FacebookInit) {
|
||||
main_bs.tick(None);
|
||||
put_fut.await.unwrap();
|
||||
assert_eq!(main_bs.get_bytes(k0), Some(v0.clone()));
|
||||
assert!(write_mostly_bs.storage.with(|s| s.is_empty()));
|
||||
write_mostly_bs.tick(Some("write_mostly_bs failed"));
|
||||
assert!(write_only_bs.storage.with(|s| s.is_empty()));
|
||||
write_only_bs.tick(Some("write_only_bs failed"));
|
||||
assert!(
|
||||
log.log
|
||||
.with(|log| log == &vec![(BlobstoreId::new(0), k0.to_owned())])
|
||||
@ -1395,16 +1395,16 @@ async fn write_mostly_put(fb: FacebookInit) {
|
||||
let mut get_fut = bs.get(ctx, k0).map_err(|_| ()).boxed();
|
||||
assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending);
|
||||
main_bs.tick(None);
|
||||
write_mostly_bs.tick(None);
|
||||
write_only_bs.tick(None);
|
||||
assert_eq!(get_fut.await.unwrap(), Some(v0.into()));
|
||||
assert!(write_mostly_bs.storage.with(|s| s.is_empty()));
|
||||
assert!(write_only_bs.storage.with(|s| s.is_empty()));
|
||||
|
||||
main_bs.storage.with(|s| s.clear());
|
||||
write_mostly_bs.storage.with(|s| s.clear());
|
||||
write_only_bs.storage.with(|s| s.clear());
|
||||
log.clear();
|
||||
}
|
||||
|
||||
// succeed as soon as write_mostly succeeds. Fail main to confirm we can still read
|
||||
// succeed as soon as write_only succeeds. Fail main to confirm we can still read
|
||||
{
|
||||
let v0 = make_value("v0");
|
||||
let k0 = "k0";
|
||||
@ -1414,9 +1414,9 @@ async fn write_mostly_put(fb: FacebookInit) {
|
||||
.map_err(|_| ())
|
||||
.boxed();
|
||||
assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending);
|
||||
write_mostly_bs.tick(None);
|
||||
write_only_bs.tick(None);
|
||||
put_fut.await.unwrap();
|
||||
assert_eq!(write_mostly_bs.get_bytes(k0), Some(v0.clone()));
|
||||
assert_eq!(write_only_bs.get_bytes(k0), Some(v0.clone()));
|
||||
assert!(main_bs.storage.with(|s| s.is_empty()));
|
||||
main_bs.tick(Some("main_bs failed"));
|
||||
assert!(
|
||||
@ -1424,21 +1424,21 @@ async fn write_mostly_put(fb: FacebookInit) {
|
||||
.with(|log| log == &vec![(BlobstoreId::new(1), k0.to_owned())])
|
||||
);
|
||||
|
||||
// should succeed as it is stored in write_mostly_bs, but main won't read
|
||||
// should succeed as it is stored in write_only_bs, but main won't read
|
||||
let mut get_fut = bs.get(ctx, k0).map_err(|_| ()).boxed();
|
||||
assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending);
|
||||
main_bs.tick(None);
|
||||
assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending);
|
||||
write_mostly_bs.tick(None);
|
||||
write_only_bs.tick(None);
|
||||
assert_eq!(get_fut.await.unwrap(), Some(v0.into()));
|
||||
assert!(main_bs.storage.with(|s| s.is_empty()));
|
||||
|
||||
main_bs.storage.with(|s| s.clear());
|
||||
write_mostly_bs.storage.with(|s| s.clear());
|
||||
write_only_bs.storage.with(|s| s.clear());
|
||||
log.clear();
|
||||
}
|
||||
|
||||
// succeed if write_mostly succeeds and main fails
|
||||
// succeed if write_only succeeds and main fails
|
||||
{
|
||||
let v1 = make_value("v1");
|
||||
let k1 = "k1";
|
||||
@ -1450,10 +1450,10 @@ async fn write_mostly_put(fb: FacebookInit) {
|
||||
assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending);
|
||||
main_bs.tick(Some("case 2: main_bs failed"));
|
||||
assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending);
|
||||
write_mostly_bs.tick(None);
|
||||
write_only_bs.tick(None);
|
||||
put_fut.await.unwrap();
|
||||
assert!(main_bs.storage.with(|s| s.get(k1).is_none()));
|
||||
assert_eq!(write_mostly_bs.get_bytes(k1), Some(v1.clone()));
|
||||
assert_eq!(write_only_bs.get_bytes(k1), Some(v1.clone()));
|
||||
assert!(
|
||||
log.log
|
||||
.with(|log| log == &vec![(BlobstoreId::new(1), k1.to_owned())])
|
||||
@ -1463,12 +1463,12 @@ async fn write_mostly_put(fb: FacebookInit) {
|
||||
assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending);
|
||||
main_bs.tick(None);
|
||||
assert_eq!(PollOnce::new(Pin::new(&mut get_fut)).await, Poll::Pending);
|
||||
write_mostly_bs.tick(None);
|
||||
write_only_bs.tick(None);
|
||||
assert_eq!(get_fut.await.unwrap(), Some(v1.into()));
|
||||
assert!(main_bs.storage.with(|s| s.get(k1).is_none()));
|
||||
|
||||
main_bs.storage.with(|s| s.clear());
|
||||
write_mostly_bs.storage.with(|s| s.clear());
|
||||
write_only_bs.storage.with(|s| s.clear());
|
||||
log.clear();
|
||||
}
|
||||
|
||||
@ -1484,7 +1484,7 @@ async fn write_mostly_put(fb: FacebookInit) {
|
||||
assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending);
|
||||
main_bs.tick(Some("case 3: main_bs failed"));
|
||||
assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending);
|
||||
write_mostly_bs.tick(Some("case 3: write_mostly_bs failed"));
|
||||
write_only_bs.tick(Some("case 3: write_only_bs failed"));
|
||||
assert!(put_fut.await.is_err());
|
||||
}
|
||||
|
||||
@ -1493,7 +1493,7 @@ async fn write_mostly_put(fb: FacebookInit) {
|
||||
let v4 = make_value("v4");
|
||||
let k4 = "k4";
|
||||
main_bs.storage.with(|s| s.clear());
|
||||
write_mostly_bs.storage.with(|s| s.clear());
|
||||
write_only_bs.storage.with(|s| s.clear());
|
||||
log.clear();
|
||||
|
||||
let mut put_fut = bs
|
||||
@ -1504,11 +1504,11 @@ async fn write_mostly_put(fb: FacebookInit) {
|
||||
main_bs.tick(None);
|
||||
put_fut.await.unwrap();
|
||||
assert_eq!(main_bs.get_bytes(k4), Some(v4.clone()));
|
||||
write_mostly_bs.tick(None);
|
||||
write_only_bs.tick(None);
|
||||
while log.log.with(|log| log.len() != 2) {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
assert_eq!(write_mostly_bs.get_bytes(k4), Some(v4.clone()));
|
||||
assert_eq!(write_only_bs.get_bytes(k4), Some(v4.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1516,7 +1516,7 @@ async fn write_mostly_put(fb: FacebookInit) {
|
||||
async fn needed_writes(fb: FacebookInit) {
|
||||
let main_bs0 = Arc::new(Tickable::new());
|
||||
let main_bs2 = Arc::new(Tickable::new());
|
||||
let write_mostly_bs = Arc::new(Tickable::new());
|
||||
let write_only_bs = Arc::new(Tickable::new());
|
||||
|
||||
let log = Arc::new(LogHandler::new());
|
||||
let bs = MultiplexedBlobstoreBase::new(
|
||||
@ -1525,7 +1525,7 @@ async fn needed_writes(fb: FacebookInit) {
|
||||
(BlobstoreId::new(0), main_bs0.clone()),
|
||||
(BlobstoreId::new(2), main_bs2.clone()),
|
||||
],
|
||||
vec![(BlobstoreId::new(1), write_mostly_bs.clone())],
|
||||
vec![(BlobstoreId::new(1), write_only_bs.clone())],
|
||||
nonzero!(2usize),
|
||||
nonzero!(2usize),
|
||||
log.clone(),
|
||||
@ -1577,12 +1577,12 @@ async fn needed_writes(fb: FacebookInit) {
|
||||
|
||||
assert_eq!(main_bs0.get_bytes(k0), Some(v0.clone()));
|
||||
assert_eq!(main_bs2.get_bytes(k0), Some(v0.clone()));
|
||||
assert_eq!(write_mostly_bs.get_bytes(k0), None);
|
||||
write_mostly_bs.tick(Some("Error"));
|
||||
assert_eq!(write_only_bs.get_bytes(k0), None);
|
||||
write_only_bs.tick(Some("Error"));
|
||||
log.clear();
|
||||
}
|
||||
|
||||
// A write-mostly counts as a success.
|
||||
// A write-only counts as a success.
|
||||
{
|
||||
let v1 = make_value("v1");
|
||||
let k1 = "k1";
|
||||
@ -1604,7 +1604,7 @@ async fn needed_writes(fb: FacebookInit) {
|
||||
)
|
||||
});
|
||||
|
||||
write_mostly_bs.tick(None);
|
||||
write_only_bs.tick(None);
|
||||
assert!(
|
||||
put_fut.await.is_ok(),
|
||||
"Put failed with two succcessful writes"
|
||||
@ -1622,7 +1622,7 @@ async fn needed_writes(fb: FacebookInit) {
|
||||
});
|
||||
|
||||
assert_eq!(main_bs0.get_bytes(k1), Some(v1.clone()));
|
||||
assert_eq!(write_mostly_bs.get_bytes(k1), Some(v1.clone()));
|
||||
assert_eq!(write_only_bs.get_bytes(k1), Some(v1.clone()));
|
||||
assert_eq!(main_bs2.get_bytes(k1), None);
|
||||
main_bs2.tick(Some("Error"));
|
||||
log.clear();
|
||||
@ -1633,7 +1633,7 @@ async fn needed_writes(fb: FacebookInit) {
|
||||
async fn needed_writes_bad_config(fb: FacebookInit) {
|
||||
let main_bs0 = Arc::new(Tickable::new());
|
||||
let main_bs2 = Arc::new(Tickable::new());
|
||||
let write_mostly_bs = Arc::new(Tickable::new());
|
||||
let write_only_bs = Arc::new(Tickable::new());
|
||||
|
||||
let log = Arc::new(LogHandler::new());
|
||||
let bs = MultiplexedBlobstoreBase::new(
|
||||
@ -1642,7 +1642,7 @@ async fn needed_writes_bad_config(fb: FacebookInit) {
|
||||
(BlobstoreId::new(0), main_bs0.clone()),
|
||||
(BlobstoreId::new(2), main_bs2.clone()),
|
||||
],
|
||||
vec![(BlobstoreId::new(1), write_mostly_bs.clone())],
|
||||
vec![(BlobstoreId::new(1), write_only_bs.clone())],
|
||||
nonzero!(5usize),
|
||||
nonzero!(5usize),
|
||||
log.clone(),
|
||||
@ -1663,7 +1663,7 @@ async fn needed_writes_bad_config(fb: FacebookInit) {
|
||||
|
||||
main_bs0.tick(None);
|
||||
main_bs2.tick(None);
|
||||
write_mostly_bs.tick(None);
|
||||
write_only_bs.tick(None);
|
||||
|
||||
assert!(
|
||||
put_fut.await.is_err(),
|
||||
|
@ -146,7 +146,7 @@ pub struct WalMultiplexedBlobstore {
|
||||
pub(crate) blobstores: Arc<[TimedStore]>,
|
||||
/// Write-mostly blobstores are not normally read from on `get`, but take part in writes
|
||||
/// like a normal blobstore.
|
||||
pub(crate) write_mostly_blobstores: Arc<[TimedStore]>,
|
||||
pub(crate) write_only_blobstores: Arc<[TimedStore]>,
|
||||
|
||||
/// Scuba table to log status of the underlying single blobstore queries.
|
||||
pub(crate) scuba: Scuba,
|
||||
@ -156,8 +156,8 @@ impl std::fmt::Display for WalMultiplexedBlobstore {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"WAL MultiplexedBlobstore[normal {:?}, write mostly {:?}]",
|
||||
self.blobstores, self.write_mostly_blobstores
|
||||
"WAL MultiplexedBlobstore[normal {:?}, write only {:?}]",
|
||||
self.blobstores, self.write_only_blobstores
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -180,7 +180,7 @@ impl WalMultiplexedBlobstore {
|
||||
multiplex_id: MultiplexId,
|
||||
wal_queue: Arc<dyn BlobstoreWal>,
|
||||
blobstores: Vec<(BlobstoreId, Arc<dyn BlobstorePutOps>)>,
|
||||
write_mostly_blobstores: Vec<(BlobstoreId, Arc<dyn BlobstorePutOps>)>,
|
||||
write_only_blobstores: Vec<(BlobstoreId, Arc<dyn BlobstorePutOps>)>,
|
||||
write_quorum: usize,
|
||||
timeout: Option<MultiplexTimeout>,
|
||||
scuba: Scuba,
|
||||
@ -189,13 +189,13 @@ impl WalMultiplexedBlobstore {
|
||||
|
||||
let to = timeout.unwrap_or_default();
|
||||
let blobstores = with_timed_stores(blobstores, to.clone()).into();
|
||||
let write_mostly_blobstores = with_timed_stores(write_mostly_blobstores, to).into();
|
||||
let write_only_blobstores = with_timed_stores(write_only_blobstores, to).into();
|
||||
|
||||
Ok(Self {
|
||||
multiplex_id,
|
||||
wal_queue,
|
||||
blobstores,
|
||||
write_mostly_blobstores,
|
||||
write_only_blobstores,
|
||||
quorum,
|
||||
scuba,
|
||||
})
|
||||
@ -260,25 +260,24 @@ impl WalMultiplexedBlobstore {
|
||||
let main_puts =
|
||||
spawn_stream_completion(put_futs.map_err(|(_id, err)| err));
|
||||
|
||||
// Spawn the write-mostly blobstore writes, we don't want to wait for them
|
||||
let write_mostly_puts = inner_multi_put(
|
||||
// Spawn the write-only blobstore writes, we don't want to wait for them
|
||||
let write_only_puts = inner_multi_put(
|
||||
ctx,
|
||||
self.write_mostly_blobstores.clone(),
|
||||
self.write_only_blobstores.clone(),
|
||||
&key,
|
||||
&value,
|
||||
put_behaviour,
|
||||
scuba,
|
||||
);
|
||||
let write_mostly_puts = spawn_stream_completion(
|
||||
write_mostly_puts.map_err(|(_id, err)| err),
|
||||
);
|
||||
let write_only_puts =
|
||||
spawn_stream_completion(write_only_puts.map_err(|(_id, err)| err));
|
||||
|
||||
cloned!(ctx, self.wal_queue);
|
||||
if put_errors.is_empty() {
|
||||
// Optimisation: It put fully succeeded on all blobstores, we can remove
|
||||
// it from queue and healer doesn't need to deal with it.
|
||||
tokio::spawn(async move {
|
||||
let (r1, r2) = futures::join!(main_puts, write_mostly_puts);
|
||||
let (r1, r2) = futures::join!(main_puts, write_only_puts);
|
||||
r1??;
|
||||
r2??;
|
||||
// TODO(yancouto): Batch deletes together.
|
||||
|
@ -28,7 +28,7 @@ use metaconfig_types::MultiplexId;
|
||||
use multiplexedblob::base::ErrorKind;
|
||||
use multiplexedblob::ScrubHandler;
|
||||
use multiplexedblob::ScrubOptions;
|
||||
use multiplexedblob::ScrubWriteMostly;
|
||||
use multiplexedblob::SrubWriteOnly;
|
||||
|
||||
use crate::multiplex;
|
||||
use crate::MultiplexTimeout;
|
||||
@ -40,7 +40,7 @@ impl WalMultiplexedBlobstore {
|
||||
&self,
|
||||
ctx: &CoreContext,
|
||||
key: &str,
|
||||
write_mostly: ScrubWriteMostly,
|
||||
write_only: SrubWriteOnly,
|
||||
) -> Result<Option<BlobstoreGetData>, ErrorKind> {
|
||||
let mut scuba = self.scuba.clone();
|
||||
scuba.sampled();
|
||||
@ -59,15 +59,15 @@ impl WalMultiplexedBlobstore {
|
||||
|| {
|
||||
multiplex::inner_multi_get(
|
||||
ctx,
|
||||
self.write_mostly_blobstores.clone(),
|
||||
self.write_only_blobstores.clone(),
|
||||
key,
|
||||
OperationType::ScrubGet,
|
||||
&scuba,
|
||||
)
|
||||
.collect::<Vec<_>>()
|
||||
},
|
||||
self.write_mostly_blobstores.iter().map(|b| *b.id()),
|
||||
write_mostly,
|
||||
self.write_only_blobstores.iter().map(|b| *b.id()),
|
||||
write_only,
|
||||
)
|
||||
.await;
|
||||
|
||||
@ -94,7 +94,7 @@ impl WalScrubBlobstore {
|
||||
multiplex_id: MultiplexId,
|
||||
wal_queue: Arc<dyn BlobstoreWal>,
|
||||
blobstores: Vec<(BlobstoreId, Arc<dyn BlobstorePutOps>)>,
|
||||
write_mostly_blobstores: Vec<(BlobstoreId, Arc<dyn BlobstorePutOps>)>,
|
||||
write_only_blobstores: Vec<(BlobstoreId, Arc<dyn BlobstorePutOps>)>,
|
||||
write_quorum: usize,
|
||||
timeout: Option<MultiplexTimeout>,
|
||||
scuba: Scuba,
|
||||
@ -105,14 +105,14 @@ impl WalScrubBlobstore {
|
||||
blobstores
|
||||
.iter()
|
||||
.cloned()
|
||||
.chain(write_mostly_blobstores.iter().cloned())
|
||||
.chain(write_only_blobstores.iter().cloned())
|
||||
.collect(),
|
||||
);
|
||||
let inner = WalMultiplexedBlobstore::new(
|
||||
multiplex_id,
|
||||
wal_queue,
|
||||
blobstores,
|
||||
write_mostly_blobstores,
|
||||
write_only_blobstores,
|
||||
write_quorum,
|
||||
timeout,
|
||||
scuba,
|
||||
@ -133,17 +133,17 @@ impl Blobstore for WalScrubBlobstore {
|
||||
ctx: &'a CoreContext,
|
||||
key: &'a str,
|
||||
) -> Result<Option<BlobstoreGetData>> {
|
||||
let write_mostly = self.scrub_options.scrub_action_on_missing_write_mostly;
|
||||
match self.inner.scrub_get(ctx, key, write_mostly).await {
|
||||
let write_only = self.scrub_options.scrub_action_on_missing_write_only;
|
||||
match self.inner.scrub_get(ctx, key, write_only).await {
|
||||
Ok(value) => Ok(value),
|
||||
Err(ErrorKind::SomeFailedOthersNone {
|
||||
main_errors,
|
||||
write_mostly_errors,
|
||||
write_only_errors,
|
||||
}) => {
|
||||
if self.inner.blobstores.len() - main_errors.len() < self.inner.quorum.read.get() {
|
||||
Err(ErrorKind::SomeFailedOthersNone {
|
||||
main_errors,
|
||||
write_mostly_errors,
|
||||
write_only_errors,
|
||||
})
|
||||
.context("Can't tell if blob exists or not due to failing blobstores")
|
||||
} else {
|
||||
@ -153,7 +153,7 @@ impl Blobstore for WalScrubBlobstore {
|
||||
}
|
||||
Err(ErrorKind::SomeMissingItem {
|
||||
missing_main,
|
||||
missing_write_mostly,
|
||||
missing_write_only,
|
||||
value,
|
||||
}) => {
|
||||
multiplexedblob::scrub::maybe_repair(
|
||||
@ -161,7 +161,7 @@ impl Blobstore for WalScrubBlobstore {
|
||||
key,
|
||||
value,
|
||||
missing_main,
|
||||
missing_write_mostly,
|
||||
missing_write_only,
|
||||
self.all_blobstores.as_ref(),
|
||||
self.scrub_handler.as_ref(),
|
||||
&self.scrub_options,
|
||||
|
@ -38,7 +38,7 @@ use multiplexedblob::LoggingScrubHandler;
|
||||
use multiplexedblob::ScrubAction;
|
||||
use multiplexedblob::ScrubHandler;
|
||||
use multiplexedblob::ScrubOptions;
|
||||
use multiplexedblob::ScrubWriteMostly;
|
||||
use multiplexedblob::SrubWriteOnly;
|
||||
use nonzero_ext::nonzero;
|
||||
use scuba_ext::MononokeScubaSampleBuilder;
|
||||
use sql_construct::SqlConstruct;
|
||||
@ -59,7 +59,7 @@ async fn test_quorum_is_valid(_fb: FacebookInit) -> Result<()> {
|
||||
|
||||
// Check the quorum cannot be zero
|
||||
{
|
||||
// no main-stores, no write-mostly
|
||||
// no main-stores, no write-only
|
||||
assert!(setup_multiplex(0, 0, None).is_err());
|
||||
}
|
||||
|
||||
@ -73,8 +73,8 @@ async fn test_quorum_is_valid(_fb: FacebookInit) -> Result<()> {
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
// write-mostly don't count into the quorum
|
||||
let write_mostly = (2..4)
|
||||
// write-only don't count into the quorum
|
||||
let write_only = (2..4)
|
||||
.map(|id| {
|
||||
(
|
||||
BlobstoreId::new(id),
|
||||
@ -87,7 +87,7 @@ async fn test_quorum_is_valid(_fb: FacebookInit) -> Result<()> {
|
||||
MultiplexId::new(0),
|
||||
wal,
|
||||
stores,
|
||||
write_mostly,
|
||||
write_only,
|
||||
quorum,
|
||||
None,
|
||||
scuba,
|
||||
@ -1012,7 +1012,7 @@ fn assert_is_present_ok(result: Result<BlobstoreIsPresent>, expected: BlobstoreI
|
||||
|
||||
async fn scrub_none(
|
||||
fb: FacebookInit,
|
||||
scrub_action_on_missing_write_mostly: ScrubWriteMostly,
|
||||
scrub_action_on_missing_write_only: SrubWriteOnly,
|
||||
) -> Result<()> {
|
||||
let bid0 = BlobstoreId::new(0);
|
||||
let bs0 = Arc::new(Tickable::new());
|
||||
@ -1033,7 +1033,7 @@ async fn scrub_none(
|
||||
None,
|
||||
Scuba::new_from_raw(fb, None, None, nonzero!(1u64))?,
|
||||
ScrubOptions {
|
||||
scrub_action_on_missing_write_mostly,
|
||||
scrub_action_on_missing_write_only,
|
||||
..ScrubOptions::default()
|
||||
},
|
||||
Arc::new(LoggingScrubHandler::new(false)) as Arc<dyn ScrubHandler>,
|
||||
@ -1055,16 +1055,16 @@ async fn scrub_none(
|
||||
|
||||
#[fbinit::test]
|
||||
async fn scrub_blobstore_fetch_none(fb: FacebookInit) -> Result<()> {
|
||||
scrub_none(fb, ScrubWriteMostly::Scrub).await?;
|
||||
scrub_none(fb, ScrubWriteMostly::SkipMissing).await?;
|
||||
scrub_none(fb, ScrubWriteMostly::PopulateIfAbsent).await
|
||||
scrub_none(fb, SrubWriteOnly::Scrub).await?;
|
||||
scrub_none(fb, SrubWriteOnly::SkipMissing).await?;
|
||||
scrub_none(fb, SrubWriteOnly::PopulateIfAbsent).await
|
||||
}
|
||||
async fn scrub_scenarios(
|
||||
fb: FacebookInit,
|
||||
scrub_action_on_missing_write_mostly: ScrubWriteMostly,
|
||||
scrub_action_on_missing_write_only: SrubWriteOnly,
|
||||
) -> Result<()> {
|
||||
use ScrubWriteMostly::*;
|
||||
println!("{:?}", scrub_action_on_missing_write_mostly);
|
||||
use SrubWriteOnly::*;
|
||||
println!("{:?}", scrub_action_on_missing_write_only);
|
||||
let ctx = CoreContext::test_mock(fb);
|
||||
borrowed!(ctx);
|
||||
let (tick_queue, queue) = setup_queue();
|
||||
@ -1086,7 +1086,7 @@ async fn scrub_scenarios(
|
||||
scuba.clone(),
|
||||
ScrubOptions {
|
||||
scrub_action: ScrubAction::ReportOnly,
|
||||
scrub_action_on_missing_write_mostly,
|
||||
scrub_action_on_missing_write_only,
|
||||
..ScrubOptions::default()
|
||||
},
|
||||
scrub_handler.clone(),
|
||||
@ -1108,7 +1108,7 @@ async fn scrub_scenarios(
|
||||
assert!(get_fut.await.is_err(), "SomeNone + Err expected Err");
|
||||
}
|
||||
|
||||
// non-existing key when one write mostly blobstore failing
|
||||
// non-existing key when one write only blobstore failing
|
||||
{
|
||||
let k0 = "k0";
|
||||
|
||||
@ -1120,9 +1120,9 @@ async fn scrub_scenarios(
|
||||
|
||||
bs1.tick(None);
|
||||
|
||||
match scrub_action_on_missing_write_mostly {
|
||||
match scrub_action_on_missing_write_only {
|
||||
PopulateIfAbsent | ScrubIfAbsent => {
|
||||
// bs2 is ignored because it's write_mostly and the result of the normal
|
||||
// bs2 is ignored because it's write_only and the result of the normal
|
||||
// blobstores is failing
|
||||
assert_eq!(get_fut.await.unwrap(), None, "SomeNone + Err expected None");
|
||||
}
|
||||
@ -1197,7 +1197,7 @@ async fn scrub_scenarios(
|
||||
scuba.clone(),
|
||||
ScrubOptions {
|
||||
scrub_action: ScrubAction::Repair,
|
||||
scrub_action_on_missing_write_mostly,
|
||||
scrub_action_on_missing_write_only,
|
||||
..ScrubOptions::default()
|
||||
},
|
||||
scrub_handler.clone(),
|
||||
@ -1263,7 +1263,7 @@ async fn scrub_scenarios(
|
||||
bs0.tick(None);
|
||||
assert_pending(&mut get_fut).await;
|
||||
bs1.tick(None);
|
||||
if scrub_action_on_missing_write_mostly != ScrubWriteMostly::PopulateIfAbsent {
|
||||
if scrub_action_on_missing_write_only != SrubWriteOnly::PopulateIfAbsent {
|
||||
// this read doesn't happen in this mode
|
||||
bs2.tick(None);
|
||||
}
|
||||
@ -1277,13 +1277,13 @@ async fn scrub_scenarios(
|
||||
// Now all populated.
|
||||
assert_eq!(bs0.get_bytes(k1), Some(v1.clone()));
|
||||
assert_eq!(bs1.get_bytes(k1), Some(v1.clone()));
|
||||
match scrub_action_on_missing_write_mostly {
|
||||
ScrubWriteMostly::Scrub
|
||||
| ScrubWriteMostly::PopulateIfAbsent
|
||||
| ScrubWriteMostly::ScrubIfAbsent => {
|
||||
match scrub_action_on_missing_write_only {
|
||||
SrubWriteOnly::Scrub
|
||||
| SrubWriteOnly::PopulateIfAbsent
|
||||
| SrubWriteOnly::ScrubIfAbsent => {
|
||||
assert_eq!(bs2.get_bytes(k1), Some(v1.clone()))
|
||||
}
|
||||
ScrubWriteMostly::SkipMissing => {
|
||||
SrubWriteOnly::SkipMissing => {
|
||||
assert_eq!(bs2.get_bytes(k1), None)
|
||||
}
|
||||
}
|
||||
@ -1305,11 +1305,11 @@ async fn scrub_scenarios(
|
||||
// gets
|
||||
bs0.tick(None);
|
||||
bs1.tick(Some("bs1 get failed"));
|
||||
if scrub_action_on_missing_write_mostly != ScrubWriteMostly::PopulateIfAbsent {
|
||||
if scrub_action_on_missing_write_only != SrubWriteOnly::PopulateIfAbsent {
|
||||
// this read doesn't happen in this mode
|
||||
bs2.tick(None);
|
||||
}
|
||||
if scrub_action_on_missing_write_mostly != ScrubWriteMostly::SkipMissing {
|
||||
if scrub_action_on_missing_write_only != SrubWriteOnly::SkipMissing {
|
||||
assert_pending(&mut get_fut).await;
|
||||
// repair bs2
|
||||
bs2.tick(None);
|
||||
@ -1318,7 +1318,7 @@ async fn scrub_scenarios(
|
||||
// bs1 still doesn't have the value. This is expected because we don't
|
||||
// want a single failing blobstore blocking the scrub.
|
||||
assert_eq!(bs1.get_bytes(k2), None);
|
||||
if scrub_action_on_missing_write_mostly != ScrubWriteMostly::SkipMissing {
|
||||
if scrub_action_on_missing_write_only != SrubWriteOnly::SkipMissing {
|
||||
// bs2 got repaired successfully
|
||||
assert_eq!(bs2.get_bytes(k2), Some(v2.clone()));
|
||||
} else {
|
||||
@ -1330,11 +1330,11 @@ async fn scrub_scenarios(
|
||||
|
||||
#[fbinit::test]
|
||||
async fn scrubbed(fb: FacebookInit) {
|
||||
scrub_scenarios(fb, ScrubWriteMostly::Scrub).await.unwrap();
|
||||
scrub_scenarios(fb, ScrubWriteMostly::SkipMissing)
|
||||
scrub_scenarios(fb, SrubWriteOnly::Scrub).await.unwrap();
|
||||
scrub_scenarios(fb, SrubWriteOnly::SkipMissing)
|
||||
.await
|
||||
.unwrap();
|
||||
scrub_scenarios(fb, ScrubWriteMostly::PopulateIfAbsent)
|
||||
scrub_scenarios(fb, SrubWriteOnly::PopulateIfAbsent)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ use std::time::Duration;
|
||||
use anyhow::Result;
|
||||
use blobstore_factory::ScrubAction;
|
||||
use blobstore_factory::ScrubOptions;
|
||||
use blobstore_factory::ScrubWriteMostly;
|
||||
use blobstore_factory::SrubWriteOnly;
|
||||
use clap::Args;
|
||||
use environment::MononokeEnvironment;
|
||||
use mononoke_app::AppExtension;
|
||||
@ -42,14 +42,14 @@ pub struct ScrubArgs {
|
||||
)]
|
||||
pub blobstore_scrub_queue_peek_bound: Option<u64>,
|
||||
|
||||
/// Whether to allow missing values from write-mostly stores when
|
||||
/// Whether to allow missing values from write-only stores when
|
||||
/// scrubbing
|
||||
#[clap(
|
||||
long,
|
||||
help_heading = "BLOBSTORE OPTIONS",
|
||||
requires = "blobstore-scrub-action"
|
||||
)]
|
||||
pub blobstore_scrub_write_mostly_missing: Option<ScrubWriteMostly>,
|
||||
pub blobstore_scrub_write_only_missing: Option<SrubWriteOnly>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
@ -57,7 +57,7 @@ pub struct ScrubAppExtension {
|
||||
pub action: Option<ScrubAction>,
|
||||
pub grace: Option<Duration>,
|
||||
pub queue_peek_bound: Option<Duration>,
|
||||
pub write_mostly_missing: Option<ScrubWriteMostly>,
|
||||
pub write_only_missing: Option<SrubWriteOnly>,
|
||||
}
|
||||
|
||||
impl ScrubAppExtension {
|
||||
@ -86,10 +86,10 @@ impl AppExtension for ScrubAppExtension {
|
||||
queue_peek_bound.as_secs().to_string(),
|
||||
));
|
||||
}
|
||||
if let Some(write_mostly_missing) = self.write_mostly_missing {
|
||||
if let Some(write_only_missing) = self.write_only_missing {
|
||||
defaults.push((
|
||||
"blobstore-scrub-write-mostly-missing",
|
||||
<&'static str>::from(write_mostly_missing).to_string(),
|
||||
"blobstore-scrub-write-only-missing",
|
||||
<&'static str>::from(write_only_missing).to_string(),
|
||||
));
|
||||
}
|
||||
defaults
|
||||
@ -105,8 +105,8 @@ impl AppExtension for ScrubAppExtension {
|
||||
if let Some(scrub_grace) = args.blobstore_scrub_grace {
|
||||
scrub_options.scrub_grace = Some(Duration::from_secs(scrub_grace));
|
||||
}
|
||||
if let Some(action_on_missing) = args.blobstore_scrub_write_mostly_missing {
|
||||
scrub_options.scrub_action_on_missing_write_mostly = action_on_missing;
|
||||
if let Some(action_on_missing) = args.blobstore_scrub_write_only_missing {
|
||||
scrub_options.scrub_action_on_missing_write_only = action_on_missing;
|
||||
}
|
||||
if let Some(queue_peek_bound) = args.blobstore_scrub_queue_peek_bound {
|
||||
scrub_options.queue_peek_bound = Duration::from_secs(queue_peek_bound);
|
||||
|
@ -13,7 +13,7 @@ use anyhow::Error;
|
||||
use anyhow::Result;
|
||||
use blobstore_factory::PutBehaviour;
|
||||
use blobstore_factory::ScrubAction;
|
||||
use blobstore_factory::ScrubWriteMostly;
|
||||
use blobstore_factory::SrubWriteOnly;
|
||||
use blobstore_factory::DEFAULT_PUT_BEHAVIOUR;
|
||||
use clap_old::App;
|
||||
use clap_old::Arg;
|
||||
@ -75,7 +75,7 @@ pub const CACHELIB_ATTEMPT_ZSTD_ARG: &str = "blobstore-cachelib-attempt-zstd";
|
||||
pub const BLOBSTORE_PUT_BEHAVIOUR_ARG: &str = "blobstore-put-behaviour";
|
||||
pub const BLOBSTORE_SCRUB_ACTION_ARG: &str = "blobstore-scrub-action";
|
||||
pub const BLOBSTORE_SCRUB_GRACE_ARG: &str = "blobstore-scrub-grace";
|
||||
pub const BLOBSTORE_SCRUB_WRITE_MOSTLY_MISSING_ARG: &str = "blobstore-scrub-write-mostly-missing";
|
||||
pub const BLOBSTORE_SCRUB_WRITE_ONLY_MISSING_ARG: &str = "blobstore-scrub-write-only-missing";
|
||||
pub const BLOBSTORE_SCRUB_QUEUE_PEEK_BOUND_ARG: &str = "blobstore-scrub-queue-peek";
|
||||
pub const PUT_MEAN_DELAY_SECS_ARG: &str = "blobstore-put-mean-delay-secs";
|
||||
pub const PUT_STDDEV_DELAY_SECS_ARG: &str = "blobstore-put-stddev-delay-secs";
|
||||
@ -218,8 +218,8 @@ pub struct MononokeAppBuilder {
|
||||
// Whether to allow a grace period before reporting a key missing in a store for recent keys
|
||||
scrub_grace_secs_default: Option<u64>,
|
||||
|
||||
// Whether to report missing keys in write mostly blobstores as a scrub action when scrubbing
|
||||
scrub_action_on_missing_write_mostly_default: Option<ScrubWriteMostly>,
|
||||
// Whether to report missing keys in write only blobstores as a scrub action when scrubbing
|
||||
scrub_action_on_missing_write_only_default: Option<SrubWriteOnly>,
|
||||
|
||||
// Whether to set a default for how long to peek back at the multiplex queue when scrubbing
|
||||
scrub_queue_peek_bound_secs_default: Option<u64>,
|
||||
@ -319,7 +319,7 @@ impl MononokeAppBuilder {
|
||||
default_scuba_dataset: None,
|
||||
scrub_action_default: None,
|
||||
scrub_grace_secs_default: None,
|
||||
scrub_action_on_missing_write_mostly_default: None,
|
||||
scrub_action_on_missing_write_only_default: None,
|
||||
scrub_queue_peek_bound_secs_default: None,
|
||||
slog_filter_fn: None,
|
||||
dynamic_repos: false,
|
||||
@ -475,12 +475,12 @@ impl MononokeAppBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// This command has a special handling of write mostly stores when scrubbing
|
||||
pub fn with_scrub_action_on_missing_write_mostly_default(
|
||||
/// This command has a special handling of write only stores when scrubbing
|
||||
pub fn with_scrub_action_on_missing_write_only_default(
|
||||
mut self,
|
||||
d: Option<ScrubWriteMostly>,
|
||||
d: Option<SrubWriteOnly>,
|
||||
) -> Self {
|
||||
self.scrub_action_on_missing_write_mostly_default = d;
|
||||
self.scrub_action_on_missing_write_only_default = d;
|
||||
self
|
||||
}
|
||||
|
||||
@ -869,22 +869,20 @@ impl MononokeAppBuilder {
|
||||
scrub_queue_peek_bound_arg = scrub_queue_peek_bound_arg
|
||||
.default_value(FORMATTED.get_or_init(|| format!("{}", default)));
|
||||
};
|
||||
let mut scrub_action_on_missing_write_mostly_arg =
|
||||
Arg::with_name(BLOBSTORE_SCRUB_WRITE_MOSTLY_MISSING_ARG)
|
||||
.long(BLOBSTORE_SCRUB_WRITE_MOSTLY_MISSING_ARG)
|
||||
let mut scrub_action_on_missing_write_only_arg =
|
||||
Arg::with_name(BLOBSTORE_SCRUB_WRITE_ONLY_MISSING_ARG)
|
||||
.long(BLOBSTORE_SCRUB_WRITE_ONLY_MISSING_ARG)
|
||||
.takes_value(true)
|
||||
.required(false)
|
||||
.possible_values(ScrubWriteMostly::VARIANTS)
|
||||
.help(
|
||||
"Whether to allow missing values from write mostly stores when scrubbing",
|
||||
);
|
||||
if let Some(default) = self.scrub_action_on_missing_write_mostly_default {
|
||||
scrub_action_on_missing_write_mostly_arg =
|
||||
scrub_action_on_missing_write_mostly_arg.default_value(default.into());
|
||||
.possible_values(SrubWriteOnly::VARIANTS)
|
||||
.help("Whether to allow missing values from write only stores when scrubbing");
|
||||
if let Some(default) = self.scrub_action_on_missing_write_only_default {
|
||||
scrub_action_on_missing_write_only_arg =
|
||||
scrub_action_on_missing_write_only_arg.default_value(default.into());
|
||||
}
|
||||
app.arg(scrub_action_arg)
|
||||
.arg(scrub_grace_arg)
|
||||
.arg(scrub_action_on_missing_write_mostly_arg)
|
||||
.arg(scrub_action_on_missing_write_only_arg)
|
||||
.arg(scrub_queue_peek_bound_arg)
|
||||
} else {
|
||||
app
|
||||
|
@ -33,7 +33,7 @@ use blobstore_factory::DelayOptions;
|
||||
use blobstore_factory::PackOptions;
|
||||
use blobstore_factory::PutBehaviour;
|
||||
use blobstore_factory::ScrubAction;
|
||||
use blobstore_factory::ScrubWriteMostly;
|
||||
use blobstore_factory::SrubWriteOnly;
|
||||
use blobstore_factory::ThrottleOptions;
|
||||
use cached_config::ConfigHandle;
|
||||
use cached_config::ConfigStore;
|
||||
@ -86,7 +86,7 @@ use super::app::BLOBSTORE_PUT_BEHAVIOUR_ARG;
|
||||
use super::app::BLOBSTORE_SCRUB_ACTION_ARG;
|
||||
use super::app::BLOBSTORE_SCRUB_GRACE_ARG;
|
||||
use super::app::BLOBSTORE_SCRUB_QUEUE_PEEK_BOUND_ARG;
|
||||
use super::app::BLOBSTORE_SCRUB_WRITE_MOSTLY_MISSING_ARG;
|
||||
use super::app::BLOBSTORE_SCRUB_WRITE_ONLY_MISSING_ARG;
|
||||
use super::app::CACHELIB_ATTEMPT_ZSTD_ARG;
|
||||
use super::app::CRYPTO_PATH_REGEX_ARG;
|
||||
use super::app::DERIVE_REMOTELY;
|
||||
@ -831,15 +831,15 @@ fn parse_blobstore_options(
|
||||
.map(u64::from_str)
|
||||
.transpose()?;
|
||||
|
||||
let scrub_action_on_missing_write_mostly = matches
|
||||
.value_of(BLOBSTORE_SCRUB_WRITE_MOSTLY_MISSING_ARG)
|
||||
.map(ScrubWriteMostly::from_str)
|
||||
let scrub_action_on_missing_write_only = matches
|
||||
.value_of(BLOBSTORE_SCRUB_WRITE_ONLY_MISSING_ARG)
|
||||
.map(SrubWriteOnly::from_str)
|
||||
.transpose()?;
|
||||
let mut blobstore_options = blobstore_options
|
||||
.with_scrub_action(scrub_action)
|
||||
.with_scrub_grace(scrub_grace);
|
||||
if let Some(v) = scrub_action_on_missing_write_mostly {
|
||||
blobstore_options = blobstore_options.with_scrub_action_on_missing_write_mostly(v)
|
||||
if let Some(v) = scrub_action_on_missing_write_only {
|
||||
blobstore_options = blobstore_options.with_scrub_action_on_missing_write_only(v)
|
||||
}
|
||||
let scrub_queue_peek_bound = matches
|
||||
.value_of(BLOBSTORE_SCRUB_QUEUE_PEEK_BOUND_ARG)
|
||||
|
@ -14,7 +14,7 @@ use async_compression::tokio::write::ZstdEncoder;
|
||||
use async_compression::Level;
|
||||
use blobstore_factory::make_blobstore;
|
||||
use blobstore_factory::ScrubAction;
|
||||
use blobstore_factory::ScrubWriteMostly;
|
||||
use blobstore_factory::SrubWriteOnly;
|
||||
use clap_old::Arg;
|
||||
use cmdlib::args;
|
||||
use cmdlib::args::ArgType;
|
||||
@ -96,7 +96,7 @@ fn main(fb: fbinit::FacebookInit) -> Result<()> {
|
||||
.with_all_repos()
|
||||
.with_arg_types(vec![ArgType::Scrub])
|
||||
.with_scrub_action_default(Some(ScrubAction::Repair))
|
||||
.with_scrub_action_on_missing_write_mostly_default(Some(ScrubWriteMostly::Scrub))
|
||||
.with_scrub_action_on_missing_write_only_default(Some(SrubWriteOnly::Scrub))
|
||||
.build()
|
||||
.arg(
|
||||
Arg::with_name(ARG_STORAGE_CONFIG_NAME)
|
||||
|
@ -1742,7 +1742,7 @@ mod test {
|
||||
components = [
|
||||
{ blobstore_id = 1, blobstore = { blob_files = { path = "/tmp/foo1" } } },
|
||||
{ blobstore_id = 2, store_type = { normal = {}}, blobstore = { blob_files = { path = "/tmp/foo2" } } },
|
||||
{ blobstore_id = 3, store_type = { write_mostly = {}}, blobstore = { blob_files = { path = "/tmp/foo3" } } },
|
||||
{ blobstore_id = 3, store_type = { write_only = {}}, blobstore = { blob_files = { path = "/tmp/foo3" } } },
|
||||
]
|
||||
queue_db = { remote = { db_address = "queue_db_address" } }
|
||||
"#;
|
||||
@ -1799,7 +1799,7 @@ mod test {
|
||||
),
|
||||
(
|
||||
BlobstoreId::new(3),
|
||||
MultiplexedStoreType::WriteMostly,
|
||||
MultiplexedStoreType::WriteOnly,
|
||||
BlobConfig::Files {
|
||||
path: "/tmp/foo3".into(),
|
||||
},
|
||||
|
@ -46,7 +46,10 @@ use repos::RawDbShardedRemote;
|
||||
use repos::RawEphemeralBlobstoreConfig;
|
||||
use repos::RawFilestoreParams;
|
||||
use repos::RawMetadataConfig;
|
||||
use repos::RawMultiplexedStoreNormal;
|
||||
use repos::RawMultiplexedStoreType;
|
||||
use repos::RawMultiplexedStoreWriteMostly;
|
||||
use repos::RawMultiplexedStoreWriteOnly;
|
||||
use repos::RawShardedDbConfig;
|
||||
use repos::RawStorageConfig;
|
||||
|
||||
@ -406,8 +409,15 @@ impl Convert for RawMultiplexedStoreType {
|
||||
|
||||
fn convert(self) -> Result<Self::Output> {
|
||||
match self {
|
||||
RawMultiplexedStoreType::normal(_) => Ok(MultiplexedStoreType::Normal),
|
||||
RawMultiplexedStoreType::write_mostly(_) => Ok(MultiplexedStoreType::WriteMostly),
|
||||
RawMultiplexedStoreType::normal(RawMultiplexedStoreNormal {}) => {
|
||||
Ok(MultiplexedStoreType::Normal)
|
||||
}
|
||||
RawMultiplexedStoreType::write_only(RawMultiplexedStoreWriteOnly {}) => {
|
||||
Ok(MultiplexedStoreType::WriteOnly)
|
||||
}
|
||||
RawMultiplexedStoreType::write_mostly(RawMultiplexedStoreWriteMostly {}) => {
|
||||
Ok(MultiplexedStoreType::WriteOnly)
|
||||
}
|
||||
RawMultiplexedStoreType::UnknownField(field) => {
|
||||
Err(anyhow!("unknown store type {}", field))
|
||||
}
|
||||
|
@ -780,8 +780,8 @@ pub struct StorageConfig {
|
||||
pub enum MultiplexedStoreType {
|
||||
/// Normal operation, no special treatment
|
||||
Normal,
|
||||
/// Only read if Normal blobstores don't provide the blob. Writes go here as per normal
|
||||
WriteMostly,
|
||||
/// Writes go here as per normal, but it doesn't count towards quota, and is never read from.
|
||||
WriteOnly,
|
||||
}
|
||||
|
||||
/// What format should data be in either Raw or a compressed form with compression options like level
|
||||
|
@ -23,7 +23,7 @@ Run a heal
|
||||
|
||||
Failure time - this key will not exist
|
||||
$ echo fake-key | manual_scrub --storage-config-name blobstore --checkpoint-key-file=checkpoint.txt --error-keys-output errors --missing-keys-output missing --success-keys-output success 2>&1 | strip_glog
|
||||
Scrubbing blobstore: ScrubBlobstore[Normal [(BlobstoreId(0), "Fileblob"), (BlobstoreId(1), "Fileblob"), (BlobstoreId(2), "Fileblob")], write mostly []]
|
||||
Scrubbing blobstore: ScrubBlobstore[Normal [(BlobstoreId(0), "Fileblob"), (BlobstoreId(1), "Fileblob"), (BlobstoreId(2), "Fileblob")], write only []]
|
||||
period, rate/s, seconds, success, missing, error, total, skipped, bytes, bytes/s
|
||||
run, *, *, 0, 1, 0, 1, 0, * (glob)
|
||||
delta, *, *, 0, 1, 0, 1, 0, * (glob)
|
||||
@ -59,7 +59,7 @@ Continue from checkpoint
|
||||
> repo0000.hgchangeset.sha1.26805aba1e600a82e93661149f2313866a221a7b
|
||||
> repo0000.hgfilenode.sha1.35e7525ce3a48913275d7061dd9a867ffef1e34d
|
||||
> EOF
|
||||
Scrubbing blobstore: ScrubBlobstore[Normal [(BlobstoreId(0), "Fileblob"), (BlobstoreId(1), "Fileblob"), (BlobstoreId(2), "Fileblob")], write mostly []]
|
||||
Scrubbing blobstore: ScrubBlobstore[Normal [(BlobstoreId(0), "Fileblob"), (BlobstoreId(1), "Fileblob"), (BlobstoreId(2), "Fileblob")], write only []]
|
||||
period, rate/s, seconds, success, missing, error, total, skipped, bytes, bytes/s
|
||||
run, *, *, 1, 0, 0, 1, 2, * (glob)
|
||||
delta, *, *, 1, 0, 0, 1, 2, * (glob)
|
||||
|
@ -28,7 +28,7 @@ use mononoke_app::fb303::Fb303AppExtension;
|
||||
use mononoke_app::fb303::ReadyFlagService;
|
||||
use mononoke_app::MononokeApp;
|
||||
use mononoke_app::MononokeAppBuilder;
|
||||
use multiplexedblob::ScrubWriteMostly;
|
||||
use multiplexedblob::SrubWriteOnly;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[clap(group(
|
||||
@ -74,7 +74,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
|
||||
};
|
||||
|
||||
let scrub_extension = ScrubAppExtension {
|
||||
write_mostly_missing: Some(ScrubWriteMostly::SkipMissing),
|
||||
write_only_missing: Some(SrubWriteOnly::SkipMissing),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user