mononoke/blobstore: new is_present semantics via enum

Summary:
This diff introduces new `is_present` semantics that will allow to move the decision logic on the complex multiplex `is_present` result from the multiplex to the callers.

In the current API `is_present` call returns `Result<bool>` and in case it couldn't determine whether the key exists or not, it checks the sync-queue in case the key was written recently and then might check the multiplex stores again, then fails if still unsure. This brings unnecessary complications and makes the multiplex blobstore less reliable.
More details in: https://fb.quip.com/wOCeAhGx6Oa1

This change allows us to get rid of the queue and second store lookups and move the decision-making to the callers.

*This diff shouldn't change the behaviour, but to replace bool value with an enum and add the conversions where needed.*

Reviewed By: StanislavGlebik

Differential Revision: D29377462

fbshipit-source-id: 4b70f772d2ed70d9fffda015ba06c3f16bf1475d
This commit is contained in:
Aida Getoeva 2021-07-05 09:16:32 -07:00 committed by Facebook GitHub Bot
parent fb5c2ad210
commit 8cf1889499
31 changed files with 410 additions and 122 deletions

View File

@ -255,7 +255,11 @@ impl UploadEntries {
}
let key = mfid.blobstore_key();
if !blobstore.is_present(ctx, &key).await? {
if !blobstore
.is_present(ctx, &key)
.await?
.assume_not_found_if_unsure()
{
return Err(BlobstoreError::NotFound(key).into());
}
}
@ -267,7 +271,7 @@ impl UploadEntries {
let envelope = fnid.load(ctx, &blobstore).await?;
let key = envelope.content_id().blobstore_key();
if !blobstore.is_present(ctx, &key).await? {
if !blobstore.is_present(ctx, &key).await?.fail_if_unsure()? {
return Err(BlobstoreError::NotFound(key).into());
}
}

View File

@ -7,7 +7,7 @@
use anyhow::Result;
use async_trait::async_trait;
use blobstore::{Blobstore, BlobstoreGetData};
use blobstore::{Blobstore, BlobstoreGetData, BlobstoreIsPresent};
use context::CoreContext;
use mononoke_types::{BlobstoreBytes, RepositoryId};
use prefixblob::PrefixBlobstore;
@ -76,7 +76,11 @@ impl Blobstore for RepoBlobstore {
) -> Result<()> {
self.0.0.put(ctx, key, value).await
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
self.0.0.is_present(ctx, key).await
}
}

View File

@ -7,7 +7,7 @@
use anyhow::Result;
use async_trait::async_trait;
use blobstore::{Blobstore, BlobstoreGetData};
use blobstore::{Blobstore, BlobstoreGetData, BlobstoreIsPresent};
use context::CoreContext;
use mononoke_types::BlobstoreBytes;
use std::sync::{Arc, Mutex};
@ -61,7 +61,11 @@ impl<T: Blobstore> Blobstore for TracingBlobstore<T> {
self.inner.put(ctx, key, value).await
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
self.inner.is_present(ctx, key).await
}
}

View File

@ -8,7 +8,7 @@
use anyhow::Result;
use async_trait::async_trait;
use auto_impl::auto_impl;
use blobstore::{Blobstore, BlobstoreGetData, CountedBlobstore};
use blobstore::{Blobstore, BlobstoreGetData, BlobstoreIsPresent, CountedBlobstore};
use cloned::cloned;
use context::{CoreContext, PerfCounterType};
use futures::future::{BoxFuture, FutureExt};
@ -233,11 +233,15 @@ where
Ok(())
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
let present = self.cache.check_present(key).await;
if present {
STATS::presence_hit.add_value(1, (C::CACHE_NAME,));
Ok(true)
Ok(BlobstoreIsPresent::Present)
} else {
STATS::presence_miss.add_value(1, (C::CACHE_NAME,));
self.blobstore.is_present(ctx, key).await

View File

@ -116,6 +116,8 @@ mod test {
.is_present(ctx, foo_key)
.await
.expect("is_present to inner should work")
.fail_if_unsure()
.expect("is_present to inner should work")
);
assert_eq!(
@ -150,6 +152,8 @@ mod test {
!inner
.is_present(ctx, foo_key)
.await
.expect("is_present on inner should work")
.fail_if_unsure()
.expect("is_present on inner should work"),
"foo should not be present in inner",
);
@ -158,7 +162,9 @@ mod test {
outer
.is_present(ctx, foo_key)
.await
.expect("is_present on outer should work"),
.expect("is_present on outer should work")
.fail_if_unsure()
.expect("is_present on inner should work"),
"foo should be present in outer",
);

View File

@ -7,7 +7,9 @@
use anyhow::Result;
use async_trait::async_trait;
use blobstore::{Blobstore, BlobstoreGetData, BlobstorePutOps, OverwriteStatus, PutBehaviour};
use blobstore::{
Blobstore, BlobstoreGetData, BlobstoreIsPresent, BlobstorePutOps, OverwriteStatus, PutBehaviour,
};
use context::CoreContext;
use mononoke_types::BlobstoreBytes;
use rand::{thread_rng, Rng};
@ -113,7 +115,11 @@ impl<T: Blobstore + BlobstorePutOps> Blobstore for ChaosBlobstore<T> {
}
#[inline]
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
let should_error = thread_rng().gen::<f32>() > self.sample_threshold_read;
let is_present = self.blobstore.is_present(ctx, key);
if should_error {
@ -198,7 +204,12 @@ mod test {
)
.await;
assert!(!r.is_ok());
let base_present = base.is_present(ctx, key).await.unwrap();
let base_present = base
.is_present(ctx, key)
.await
.unwrap()
.fail_if_unsure()
.unwrap();
assert!(!base_present);
}
@ -219,7 +230,12 @@ mod test {
)
.await;
assert!(!r.is_ok());
let base_present = base.is_present(ctx, key).await.unwrap();
let base_present = base
.is_present(ctx, key)
.await
.unwrap()
.fail_if_unsure()
.unwrap();
assert!(!base_present);
}
@ -240,7 +256,12 @@ mod test {
)
.await;
assert!(r.is_ok());
let base_present = base.is_present(ctx, key).await.unwrap();
let base_present = base
.is_present(ctx, key)
.await
.unwrap()
.fail_if_unsure()
.unwrap();
assert!(base_present);
let r = wrapper.get(ctx, key).await;
assert!(!r.is_ok());

View File

@ -14,7 +14,7 @@ use async_trait::async_trait;
use rand::Rng;
use rand_distr::Distribution;
use blobstore::{Blobstore, BlobstoreGetData};
use blobstore::{Blobstore, BlobstoreGetData, BlobstoreIsPresent};
use context::CoreContext;
use mononoke_types::BlobstoreBytes;
@ -64,7 +64,11 @@ impl<B: Blobstore> Blobstore for DelayedBlobstore<B> {
self.inner.put(ctx, key, value).await
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
delay(self.get_dist).await;
self.inner.is_present(ctx, key).await
}

View File

@ -19,8 +19,9 @@ use async_trait::async_trait;
use percent_encoding::{percent_encode, AsciiSet, CONTROLS};
use blobstore::{
Blobstore, BlobstoreEnumerationData, BlobstoreGetData, BlobstoreKeyParam, BlobstoreKeySource,
BlobstoreMetadata, BlobstorePutOps, BlobstoreWithLink, OverwriteStatus, PutBehaviour,
Blobstore, BlobstoreEnumerationData, BlobstoreGetData, BlobstoreIsPresent, BlobstoreKeyParam,
BlobstoreKeySource, BlobstoreMetadata, BlobstorePutOps, BlobstoreWithLink, OverwriteStatus,
PutBehaviour,
};
use context::CoreContext;
use mononoke_types::BlobstoreBytes;
@ -165,15 +166,23 @@ impl Blobstore for Fileblob {
Ok(ret)
}
async fn is_present<'a>(&'a self, _ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
_ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
let p = self.path(key);
let ret = match File::open(&p).await {
let present = match File::open(&p).await {
Err(ref e) if e.kind() == io::ErrorKind::NotFound => false,
Err(e) => return Err(e.into()),
Ok(_) => true,
};
Ok(ret)
Ok(if present {
BlobstoreIsPresent::Present
} else {
BlobstoreIsPresent::Absent
})
}
async fn put<'a>(

View File

@ -13,7 +13,9 @@ use async_trait::async_trait;
use futures_stats::TimedFutureExt;
use scuba_ext::MononokeScubaSampleBuilder;
use blobstore::{Blobstore, BlobstoreGetData, BlobstorePutOps, OverwriteStatus, PutBehaviour};
use blobstore::{
Blobstore, BlobstoreGetData, BlobstoreIsPresent, BlobstorePutOps, OverwriteStatus, PutBehaviour,
};
use blobstore_stats::{record_get_stats, record_put_stats, OperationType};
use context::{CoreContext, PerfCounterType};
use mononoke_types::BlobstoreBytes;
@ -78,7 +80,11 @@ impl<B: Blobstore + BlobstorePutOps> Blobstore for LogBlob<B> {
result
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
ctx.perf_counters()
.increment_counter(PerfCounterType::BlobPresenceChecks);
self.inner.is_present(ctx, key).await

View File

@ -7,7 +7,9 @@
use anyhow::{anyhow, Error, Result};
use async_trait::async_trait;
use blobstore::{Blobstore, BlobstoreGetData, BlobstorePutOps, OverwriteStatus, PutBehaviour};
use blobstore::{
Blobstore, BlobstoreGetData, BlobstoreIsPresent, BlobstorePutOps, OverwriteStatus, PutBehaviour,
};
use blobstore_stats::{record_get_stats, record_put_stats, OperationType};
use blobstore_sync_queue::OperationKey;
use cloned::cloned;
@ -484,7 +486,11 @@ impl Blobstore for MultiplexedBlobstoreBase {
blobstore_get(ctx, blobstores, write_mostly_blobstores, key, scuba).await
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
let blobstores_count = self.blobstores.len() + self.write_mostly_blobstores.len();
let main_requests: FuturesUnordered<_> = self
@ -515,21 +521,35 @@ impl Blobstore for MultiplexedBlobstoreBase {
.increment_counter(PerfCounterType::BlobPresenceChecks);
while let Some(result) = requests.next().await {
match result {
(_, Ok(true)) => {
return Ok(true);
(_, Ok(BlobstoreIsPresent::Present)) => {
return Ok(BlobstoreIsPresent::Present);
}
(_, Ok(BlobstoreIsPresent::Absent)) => {}
// is_present failed for the underlying blobstore
(blobstore_id, Err(error)) => {
errors.insert(blobstore_id, error);
}
(_, Ok(false)) => {}
(blobstore_id, Ok(BlobstoreIsPresent::ProbablyNotPresent(err))) => {
let err = err.context(format!(
"Received 'ProbablyNotPresent' from the underlying blobstore"
));
errors.insert(blobstore_id, err);
}
}
}
if errors.is_empty() {
Ok(false)
Ok(BlobstoreIsPresent::Absent)
} else if errors.len() == blobstores_count {
Err(ErrorKind::AllFailed(Arc::new(errors)))
} else {
Err(write_mostly_error(&blobstores, errors))
let write_mostly_err = write_mostly_error(&blobstores, errors);
if let ErrorKind::SomeFailedOthersNone(errors) = write_mostly_err {
let err = Error::from(ErrorKind::SomeFailedOthersNone(errors));
Ok(BlobstoreIsPresent::ProbablyNotPresent(err))
} else {
Err(write_mostly_err)
}
}
}
.timed()

View File

@ -8,7 +8,9 @@
use crate::base::{ErrorKind, MultiplexedBlobstoreBase, MultiplexedBlobstorePutHandler};
use anyhow::Result;
use async_trait::async_trait;
use blobstore::{Blobstore, BlobstoreGetData, BlobstorePutOps, OverwriteStatus, PutBehaviour};
use blobstore::{
Blobstore, BlobstoreGetData, BlobstoreIsPresent, BlobstorePutOps, OverwriteStatus, PutBehaviour,
};
use blobstore_stats::{record_queue_stats, OperationType};
use blobstore_sync_queue::{BlobstoreSyncQueue, BlobstoreSyncQueueEntry, OperationKey};
use context::CoreContext;
@ -162,14 +164,15 @@ impl Blobstore for MultiplexedBlobstore {
self.blobstore.put(ctx, key, value).await
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
let result = self.blobstore.is_present(ctx, key).await;
match result {
Ok(value) => Ok(value),
Err(error) => {
if let Some(ErrorKind::AllFailed(_)) = error.downcast_ref() {
return Err(error);
}
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
let result = self.blobstore.is_present(ctx, key).await?;
match &result {
BlobstoreIsPresent::Present | BlobstoreIsPresent::Absent => Ok(result),
BlobstoreIsPresent::ProbablyNotPresent(_) => {
// If a subset of blobstores failed, then we go to the queue. This is a way to
// "break the tie" if we had at least one blobstore that said the content didn't
// exist but the others failed to give a response: if any of those failing
@ -177,7 +180,7 @@ impl Blobstore for MultiplexedBlobstore {
// pruned yet because if it was, then it would be in the blobstore that succeeded).
let entries = self.queue.get(&ctx, &key).await?;
if entries.is_empty() {
Ok(false)
Ok(BlobstoreIsPresent::Absent)
} else {
// Oh boy. If we found this on the queue but we didn't find it in the
// blobstores, it's possible that the content got written to the blobstore in

View File

@ -13,7 +13,8 @@ use crate::{
use anyhow::Result;
use async_trait::async_trait;
use blobstore::{
Blobstore, BlobstoreGetData, BlobstoreMetadata, BlobstorePutOps, OverwriteStatus, PutBehaviour,
Blobstore, BlobstoreGetData, BlobstoreIsPresent, BlobstoreMetadata, BlobstorePutOps,
OverwriteStatus, PutBehaviour,
};
use blobstore_sync_queue::BlobstoreSyncQueue;
use chrono::Duration as ChronoDuration;
@ -387,7 +388,11 @@ impl Blobstore for ScrubBlobstore {
.await
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
self.inner.is_present(ctx, key).await
}

View File

@ -22,7 +22,8 @@ use crate::scrub::{
use anyhow::{bail, Result};
use async_trait::async_trait;
use blobstore::{
Blobstore, BlobstoreGetData, BlobstoreMetadata, BlobstorePutOps, OverwriteStatus, PutBehaviour,
Blobstore, BlobstoreGetData, BlobstoreIsPresent, BlobstoreMetadata, BlobstorePutOps,
OverwriteStatus, PutBehaviour,
};
use blobstore_sync_queue::{
BlobstoreSyncQueue, BlobstoreSyncQueueEntry, OperationKey, SqlBlobstoreSyncQueue,
@ -513,7 +514,12 @@ async fn multiplexed(fb: FacebookInit) {
assert!(PollOnce::new(Pin::new(&mut present_fut)).await.is_pending());
bs1.tick(Some("case 1: bs1 failed"));
assert!(!present_fut.await.unwrap());
match present_fut.await.unwrap() {
BlobstoreIsPresent::Absent => {}
_ => {
panic!("case 1: the key should be absent");
}
}
}
// only replica containing key failed
@ -560,7 +566,18 @@ async fn multiplexed(fb: FacebookInit) {
assert!(PollOnce::new(Pin::new(&mut present_fut)).await.is_pending());
bs0.tick(Some("case 2: bs0 failed"));
bs1.tick(None);
assert!(present_fut.await.is_err());
let expected =
"Some blobstores failed, and other returned None: {BlobstoreId(0): case 2: bs0 failed}"
.to_owned();
match present_fut.await.unwrap() {
BlobstoreIsPresent::ProbablyNotPresent(er) => {
assert_eq!(er.to_string(), expected);
}
_ => {
panic!("case 1: the key should be absent");
}
}
}
// both replicas fail

View File

@ -11,8 +11,9 @@ use crate::pack;
use anyhow::{Context, Result};
use async_trait::async_trait;
use blobstore::{
Blobstore, BlobstoreEnumerationData, BlobstoreGetData, BlobstoreKeyParam, BlobstoreKeySource,
BlobstoreMetadata, BlobstorePutOps, BlobstoreWithLink, OverwriteStatus, PutBehaviour,
Blobstore, BlobstoreEnumerationData, BlobstoreGetData, BlobstoreIsPresent, BlobstoreKeyParam,
BlobstoreKeySource, BlobstoreMetadata, BlobstorePutOps, BlobstoreWithLink, OverwriteStatus,
PutBehaviour,
};
use context::CoreContext;
use futures::stream::{FuturesUnordered, TryStreamExt};
@ -83,7 +84,11 @@ impl<T: Blobstore + BlobstorePutOps> Blobstore for PackBlob<T> {
Ok(Some(BlobstoreGetData::new(meta, decoded)))
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
self.inner
.is_present(ctx, &[key, ENVELOPE_SUFFIX].concat())
.await
@ -322,11 +327,17 @@ mod tests {
assert_ne!(value, fetched_value.into_bytes());
// Check is_present matches
let is_present = inner_blobstore.is_present(ctx, inner_key).await?;
let is_present = inner_blobstore
.is_present(ctx, inner_key)
.await?
.fail_if_unsure()?;
assert!(is_present);
// Check the key without suffix is not there
let is_not_present = !inner_blobstore.is_present(ctx, outer_key).await?;
let is_not_present = !inner_blobstore
.is_present(ctx, outer_key)
.await?
.fail_if_unsure()?;
assert!(is_not_present);
Ok(inner_key.to_owned())
@ -369,7 +380,10 @@ mod tests {
.await?;
// Check the inner key is not visible, the pack operation unlinks it
let is_present = inner_blobstore.is_present(ctx, &inner_key).await?;
let is_present = inner_blobstore
.is_present(ctx, &inner_key)
.await?
.fail_if_unsure()?;
assert!(!is_present);
for (expected, i) in input_values.into_iter().zip(0..3usize) {

View File

@ -14,7 +14,8 @@ use inlinable_string::InlinableString;
use context::CoreContext;
use blobstore::{
Blobstore, BlobstoreGetData, BlobstorePutOps, BlobstoreWithLink, OverwriteStatus, PutBehaviour,
Blobstore, BlobstoreGetData, BlobstoreIsPresent, BlobstorePutOps, BlobstoreWithLink,
OverwriteStatus, PutBehaviour,
};
use mononoke_types::BlobstoreBytes;
@ -76,7 +77,11 @@ impl<T: Blobstore> Blobstore for PrefixBlobstore<T> {
}
#[inline]
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
self.blobstore.is_present(ctx, &self.prepend(key)).await
}
}
@ -178,11 +183,15 @@ mod test {
.is_present(ctx, &unprefixed_key)
.await
.expect("is_present should succeed")
.fail_if_unsure()
.expect("is_present should succeed")
);
assert!(
base.is_present(ctx, &prefixed_key)
.await
.expect("is_present should succeed")
.fail_if_unsure()
.expect("is_present should succeed")
);
}
}

View File

@ -7,7 +7,9 @@
use anyhow::Result;
use async_trait::async_trait;
use blobstore::{Blobstore, BlobstoreGetData, BlobstorePutOps, OverwriteStatus, PutBehaviour};
use blobstore::{
Blobstore, BlobstoreGetData, BlobstoreIsPresent, BlobstorePutOps, OverwriteStatus, PutBehaviour,
};
use context::CoreContext;
use mononoke_types::BlobstoreBytes;
mod errors;
@ -53,7 +55,11 @@ impl<T: Blobstore> Blobstore for ReadOnlyBlobstore<T> {
}
#[inline]
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
self.blobstore.is_present(ctx, key).await
}
}
@ -104,7 +110,12 @@ mod test {
)
.await;
assert!(!r.is_ok());
let base_present = base.is_present(ctx, key).await.unwrap();
let base_present = base
.is_present(ctx, key)
.await
.unwrap()
.fail_if_unsure()
.unwrap();
assert!(!base_present);
}
@ -124,7 +135,12 @@ mod test {
)
.await;
assert!(!r.is_ok());
let base_present = base.is_present(ctx, key).await.unwrap();
let base_present = base
.is_present(ctx, key)
.await
.unwrap()
.fail_if_unsure()
.unwrap();
assert!(!base_present);
}
}

View File

@ -13,7 +13,7 @@ pub mod store;
use anyhow::{Error, Result};
use async_trait::async_trait;
use blobstore::{Blobstore, BlobstoreGetData};
use blobstore::{Blobstore, BlobstoreGetData, BlobstoreIsPresent};
use context::CoreContext;
use mononoke_types::BlobstoreBytes;
use scuba_ext::MononokeScubaSampleBuilder;
@ -210,7 +210,11 @@ impl<T: Blobstore> Blobstore for RedactedBlobstoreInner<T> {
blobstore.put(ctx, key, value).await
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
self.blobstore.is_present(ctx, key).await
}
}
@ -232,7 +236,11 @@ impl<B: Blobstore> Blobstore for RedactedBlobstore<B> {
) -> Result<()> {
self.inner.put(ctx, key, value).await
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
self.inner.is_present(ctx, key).await
}
}

View File

@ -8,7 +8,7 @@
use anyhow::Result;
use async_trait::async_trait;
use blobstore::{Blobstore, BlobstoreGetData};
use blobstore::{Blobstore, BlobstoreGetData, BlobstoreIsPresent};
use context::CoreContext;
use mononoke_types::BlobstoreBytes;
use prefixblob::PrefixBlobstore;
@ -49,7 +49,11 @@ impl Blobstore for RedactionConfigBlobstore {
) -> Result<()> {
self.0.put(ctx, key, value).await
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
self.0.is_present(ctx, key).await
}
}

View File

@ -9,7 +9,9 @@
use anyhow::Result;
use async_trait::async_trait;
use blobstore::{Blobstore, BlobstoreGetData, BlobstorePutOps, OverwriteStatus, PutBehaviour};
use blobstore::{
Blobstore, BlobstoreGetData, BlobstoreIsPresent, BlobstorePutOps, OverwriteStatus, PutBehaviour,
};
use context::CoreContext;
use metaconfig_types::BlobstoreId;
use mononoke_types::BlobstoreBytes;
@ -27,7 +29,12 @@ pub trait SamplingHandler: std::fmt::Debug + Send + Sync {
Ok(())
}
fn sample_is_present(&self, _ctx: &CoreContext, _key: &str, _value: bool) -> Result<()> {
fn sample_is_present(
&self,
_ctx: &CoreContext,
_key: &str,
_value: &BlobstoreIsPresent,
) -> Result<()> {
Ok(())
}
}
@ -78,10 +85,15 @@ impl<T: Blobstore> Blobstore for SamplingBlobstore<T> {
}
#[inline]
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
let is_present = self.inner.is_present(ctx, key).await?;
self.handler.sample_is_present(ctx, key, is_present)?;
Ok(is_present)
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
let result = self.inner.is_present(ctx, key).await?;
self.handler.sample_is_present(ctx, key, &result)?;
Ok(result)
}
}
@ -109,7 +121,7 @@ pub trait ComponentSamplingHandler: std::fmt::Debug + Send + Sync {
&self,
_ctx: &CoreContext,
_key: &str,
_value: bool,
_value: &BlobstoreIsPresent,
_inner_id: Option<BlobstoreId>,
) -> Result<()> {
Ok(())
@ -175,11 +187,16 @@ impl<T: Blobstore + BlobstorePutOps> Blobstore for SamplingBlobstorePutOps<T> {
}
#[inline]
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
let is_present = self.inner.is_present(ctx, key).await?;
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
let result = self.inner.is_present(ctx, key).await?;
self.handler
.sample_is_present(ctx, key, is_present, self.inner_id)?;
Ok(is_present)
.sample_is_present(ctx, key, &result, self.inner_id)?;
Ok(result)
}
}
@ -248,7 +265,12 @@ mod test {
fn sample_put(&self, ctx: &CoreContext, _key: &str, _value: &BlobstoreBytes) -> Result<()> {
self.check_sample(ctx)
}
fn sample_is_present(&self, ctx: &CoreContext, _key: &str, _value: bool) -> Result<()> {
fn sample_is_present(
&self,
ctx: &CoreContext,
_key: &str,
_value: &BlobstoreIsPresent,
) -> Result<()> {
self.check_sample(ctx)
}
}
@ -279,11 +301,21 @@ mod test {
assert!(!was_sampled);
let ctx = ctx.clone_and_sample(sample_this);
borrowed!(ctx);
let base_present = base.is_present(ctx, key).await.unwrap();
let base_present = base
.is_present(ctx, key)
.await
.unwrap()
.fail_if_unsure()
.unwrap();
assert!(base_present);
let was_sampled = handler.sampled.load(Ordering::Relaxed);
assert!(!was_sampled);
let wrapper_present = wrapper.is_present(ctx, key).await.unwrap();
let wrapper_present = wrapper
.is_present(ctx, key)
.await
.unwrap()
.fail_if_unsure()
.unwrap();
assert!(wrapper_present);
let was_sampled = handler.sampled.load(Ordering::Relaxed);
assert!(was_sampled);
@ -312,7 +344,7 @@ mod test {
&self,
ctx: &CoreContext,
_key: &str,
_value: bool,
_value: &BlobstoreIsPresent,
_inner_id: Option<BlobstoreId>,
) -> Result<()> {
self.check_sample(ctx)
@ -348,11 +380,21 @@ mod test {
assert!(!was_sampled);
let ctx = ctx.clone_and_sample(sample_this);
borrowed!(ctx);
let base_present = base.is_present(ctx, key).await.unwrap();
let base_present = base
.is_present(ctx, key)
.await
.unwrap()
.fail_if_unsure()
.unwrap();
assert!(base_present);
let was_sampled = handler.sampled.load(Ordering::Relaxed);
assert!(!was_sampled);
let wrapper_present = wrapper.is_present(ctx, key).await.unwrap();
let wrapper_present = wrapper
.is_present(ctx, key)
.await
.unwrap()
.fail_if_unsure()
.unwrap();
assert!(wrapper_present);
let was_sampled = handler.sampled.load(Ordering::Relaxed);
assert!(was_sampled);

View File

@ -25,8 +25,8 @@ use crate::store::{ChunkSqlStore, ChunkingMethod, DataSqlStore};
use anyhow::{bail, format_err, Error, Result};
use async_trait::async_trait;
use blobstore::{
Blobstore, BlobstoreGetData, BlobstoreMetadata, BlobstorePutOps, BlobstoreWithLink,
CountedBlobstore, OverwriteStatus, PutBehaviour,
Blobstore, BlobstoreGetData, BlobstoreIsPresent, BlobstoreMetadata, BlobstorePutOps,
BlobstoreWithLink, CountedBlobstore, OverwriteStatus, PutBehaviour,
};
use bytes::{Bytes, BytesMut};
use cached_config::{ConfigHandle, ConfigStore, TestSource};
@ -449,8 +449,17 @@ impl Blobstore for Sqlblob {
}
}
async fn is_present<'a>(&'a self, _ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
self.data_store.is_present(&key).await
async fn is_present<'a>(
&'a self,
_ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
let present = self.data_store.is_present(&key).await?;
Ok(if present {
BlobstoreIsPresent::Present
} else {
BlobstoreIsPresent::Absent
})
}
async fn put<'a>(

View File

@ -55,7 +55,7 @@ async fn read_write_size(
let blobstore_bytes = BlobstoreBytes::from_bytes(Bytes::copy_from_slice(&bytes_in));
assert!(
!bs.is_present(ctx, &key).await?,
!bs.is_present(ctx, &key).await?.fail_if_unsure()?,
"Blob should not exist yet"
);
@ -65,7 +65,10 @@ async fn read_write_size(
let bytes_out = bs.get(ctx, &key).await?;
assert_eq!(&bytes_in.to_vec(), bytes_out.unwrap().as_raw_bytes());
assert!(bs.is_present(ctx, &key).await?, "Blob should exist now");
assert!(
bs.is_present(ctx, &key).await?.fail_if_unsure()?,
"Blob should exist now"
);
Ok(())
})
.await
@ -98,7 +101,7 @@ async fn double_put(fb: FacebookInit) -> Result<(), Error> {
let blobstore_bytes = BlobstoreBytes::from_bytes(Bytes::copy_from_slice(&bytes_in));
assert!(
!bs.is_present(ctx, &key).await?,
!bs.is_present(ctx, &key).await?.fail_if_unsure()?,
"Blob should not exist yet"
);
@ -107,7 +110,10 @@ async fn double_put(fb: FacebookInit) -> Result<(), Error> {
// Write it again
bs.put(ctx, key.clone(), blobstore_bytes.clone()).await?;
assert!(bs.is_present(ctx, &key).await?, "Blob should exist now");
assert!(
bs.is_present(ctx, &key).await?.fail_if_unsure()?,
"Blob should exist now"
);
Ok(())
})
.await
@ -159,12 +165,12 @@ async fn dedup(fb: FacebookInit) -> Result<(), Error> {
let blobstore_bytes = BlobstoreBytes::from_bytes(Bytes::copy_from_slice(&bytes_in));
assert!(
!bs.is_present(ctx, &key1).await?,
!bs.is_present(ctx, &key1).await?.fail_if_unsure()?,
"Blob should not exist yet"
);
assert!(
!bs.is_present(ctx, &key2).await?,
!bs.is_present(ctx, &key2).await?.fail_if_unsure()?,
"Blob should not exist yet"
);
@ -204,12 +210,12 @@ async fn link(fb: FacebookInit) -> Result<(), Error> {
let blobstore_bytes = BlobstoreBytes::from_bytes(Bytes::copy_from_slice(&bytes_in));
assert!(
!bs.is_present(ctx, &key1).await?,
!bs.is_present(ctx, &key1).await?.fail_if_unsure()?,
"Blob should not exist yet"
);
assert!(
!bs.is_present(ctx, &key2).await?,
!bs.is_present(ctx, &key2).await?.fail_if_unsure()?,
"Blob should not exist yet"
);

View File

@ -14,8 +14,8 @@ use stats::prelude::*;
use context::CoreContext;
use crate::{
Blobstore, BlobstoreBytes, BlobstoreGetData, BlobstorePutOps, BlobstoreWithLink,
OverwriteStatus, PutBehaviour,
Blobstore, BlobstoreBytes, BlobstoreGetData, BlobstoreIsPresent, BlobstorePutOps,
BlobstoreWithLink, OverwriteStatus, PutBehaviour,
};
define_stats_struct! {
@ -101,7 +101,11 @@ impl<T: Blobstore> Blobstore for CountedBlobstore<T> {
res
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
self.stats.is_present.add_value(1);
let res = self.blobstore.is_present(ctx, key).await;
match res {

View File

@ -267,6 +267,35 @@ impl From<BlobstoreBytesSerialisable> for BlobstoreBytes {
}
}
#[derive(Debug)]
pub enum BlobstoreIsPresent {
// The blob is definitely present in the blobstore
Present,
/// The blob is definitely not present
Absent,
/// The blobstore has no evidence that the blob is present,
/// however some of the operations resulted in errors.
ProbablyNotPresent(Error),
}
impl BlobstoreIsPresent {
pub fn assume_not_found_if_unsure(self) -> bool {
match self {
BlobstoreIsPresent::Present => true,
BlobstoreIsPresent::Absent => false,
BlobstoreIsPresent::ProbablyNotPresent(_) => false,
}
}
pub fn fail_if_unsure(self) -> Result<bool, Error> {
match self {
BlobstoreIsPresent::Present => Ok(true),
BlobstoreIsPresent::Absent => Ok(false),
BlobstoreIsPresent::ProbablyNotPresent(err) => Err(err),
}
}
}
/// The blobstore interface, shared across all blobstores.
/// A blobstore must provide the following guarantees:
/// 1. `get` and `put` are atomic with respect to each other; a put will either put the entire
@ -312,9 +341,20 @@ pub trait Blobstore: fmt::Display + fmt::Debug + Send + Sync {
/// Check that `get` will return a value for a given `key`, and not None. The provided
/// implentation just calls `get`, and discards the return value; this can be overridden to
/// avoid transferring data. In the absence of concurrent `put` calls, this must return
/// `false` if `get` would return `None`, and `true` if `get` would return `Some(_)`.
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
Ok(self.get(ctx, key).await?.is_some())
/// `BlobstoreIsPresent::Absent` if `get` would return `None`, and `BlobstoreIsPresent::Present`
/// if `get` would return `Some(_)`.
/// In some cases, when it couldn't determine whether the key exists or not, it would
/// return `BlobstoreIsPresent::ProbablyNotPresent`.
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
Ok(if self.get(ctx, key).await?.is_some() {
BlobstoreIsPresent::Present
} else {
BlobstoreIsPresent::Absent
})
}
}

View File

@ -129,13 +129,13 @@ async fn roundtrip_and_link<B: BlobstoreWithLink>(
assert_eq!(orig_ctime, new_ctime);
assert_eq!(value, newvalue.into_bytes());
let newkey_is_present = blobstore.is_present(ctx, newkey).await?;
let newkey_is_present = blobstore.is_present(ctx, newkey).await?.fail_if_unsure()?;
assert!(newkey_is_present);
// Try unlink
blobstore.unlink(ctx, newkey).await?;
let newkey_is_present2 = blobstore.is_present(ctx, newkey).await?;
let newkey_is_present2 = blobstore.is_present(ctx, newkey).await?.fail_if_unsure()?;
assert!(!newkey_is_present2);
// Check we get error when unlinking an unknown key

View File

@ -22,7 +22,9 @@ use std::{
time::Duration,
};
use blobstore::{Blobstore, BlobstoreGetData, BlobstorePutOps, OverwriteStatus, PutBehaviour};
use blobstore::{
Blobstore, BlobstoreGetData, BlobstoreIsPresent, BlobstorePutOps, OverwriteStatus, PutBehaviour,
};
use context::CoreContext;
use mononoke_types::BlobstoreBytes;
@ -174,7 +176,11 @@ impl<T: Blobstore> Blobstore for ThrottledBlob<T> {
self.blobstore.put(ctx, key, value).await
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
if let Some(limiter) = self.read_qps_limiter.as_ref() {
limiter.until_ready_with_jitter(jitter()).await;
}

View File

@ -10,7 +10,7 @@ mod shard;
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use blobstore::{Blobstore, BlobstoreGetData, BlobstoreMetadata};
use blobstore::{Blobstore, BlobstoreGetData, BlobstoreIsPresent, BlobstoreMetadata};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use cacheblob::CachelibBlobstoreOptions;
use cachelib::VolatileLruCachePool;
@ -484,25 +484,28 @@ impl<T: Blobstore + 'static> Blobstore for VirtuallyShardedBlobstore<T> {
tokio::spawn(fut).await?
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
cloned!(self.inner);
let cache_key = CacheKey::from_key(key);
let presence = PresenceData::Get;
if let Ok(Some(KnownToExist)) = inner.cache.check_presence(&cache_key, presence) {
return Ok(true);
return Ok(BlobstoreIsPresent::Present);
}
Ticket::new(ctx, AccessReason::Read).finish().await?;
let exists = inner.blobstore.is_present(ctx, key).await?;
if exists {
let result = inner.blobstore.is_present(ctx, key).await?;
if let BlobstoreIsPresent::Present = &result {
let _ = inner.cache.set_is_present(&cache_key, presence);
}
Ok(exists)
Ok(result)
}
}
@ -1110,8 +1113,8 @@ mod test {
&'a self,
_ctx: &'a CoreContext,
_key: &'a str,
) -> Result<bool> {
Ok(true)
) -> Result<BlobstoreIsPresent> {
Ok(BlobstoreIsPresent::Present)
}
}

View File

@ -10,7 +10,7 @@
use anyhow::Result;
use async_trait::async_trait;
use blobstore::{Blobstore, BlobstoreGetData};
use blobstore::{Blobstore, BlobstoreGetData, BlobstoreIsPresent};
use blobstore_sync_queue::{BlobstoreSyncQueue, BlobstoreSyncQueueEntry};
use context::CoreContext;
use metaconfig_types::MultiplexId;
@ -60,7 +60,11 @@ impl<B: Blobstore> Blobstore for DummyBlobstore<B> {
Ok(())
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
self.inner.is_present(ctx, key).await
}
}

View File

@ -214,7 +214,10 @@ pub async fn exists<B: Blobstore>(
})?;
match maybe_id {
Some(id) => blobstore.is_present(ctx, &id.blobstore_key()).await,
Some(id) => blobstore
.is_present(ctx, &id.blobstore_key())
.await?
.fail_if_unsure(),
None => Ok(false),
}
}

View File

@ -7,7 +7,7 @@
use anyhow::Result;
use async_trait::async_trait;
use blobstore::{Blobstore, BlobstoreGetData};
use blobstore::{Blobstore, BlobstoreGetData, BlobstoreIsPresent};
use context::CoreContext;
use mononoke_types::BlobstoreBytes;
use rand::{thread_rng, Rng};
@ -67,7 +67,11 @@ impl<B: Blobstore> Blobstore for FailingBlobstore<B> {
}
}
async fn is_present<'a>(&'a self, ctx: &'a CoreContext, key: &'a str) -> Result<bool> {
async fn is_present<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<BlobstoreIsPresent> {
if thread_rng().gen_bool(self.read_success_probability) {
self.inner.is_present(ctx, key).await
} else {

View File

@ -268,7 +268,8 @@ async fn resolve_internal_object(
let exists = blobstore
.is_present(&ctx.ctx, &content_id.blobstore_key())
.await
.with_context(|| format!("Failed to check for existence of: {:?}", content_id))?;
.with_context(|| format!("Failed to check for existence of: {:?}", content_id))?
.assume_not_found_if_unsure();
if exists {
return Ok(Some(InternalObject::new(content_id, oid, None)));

View File

@ -85,11 +85,19 @@ impl HgRepoContext {
async fn is_key_present_in_blobstore(&self, key: &str) -> Result<bool, MononokeError> {
// TODO (liubovd): check in all multiplexes blobstores
self.blob_repo()
.blobstore()
.is_present(self.ctx(), &key)
.await
.map_err(MononokeError::from)
async move {
self.blob_repo()
.blobstore()
.is_present(self.ctx(), &key)
.await
.map(|is_present| {
// if we can't resolve the presence (some blobstores failed, some returned None)
// we can re-upload the blob
is_present.assume_not_found_if_unsure()
})
}
.await
.map_err(MononokeError::from)
}
/// Look up in blobstore by `ContentId`