mononoke/blobstore: single lookup for is_present multiplex

Summary:
Currently `is_present` makes a blobstores lookup and in case it couldn't determine whether the key exists or not, it checks the sync-queue (in case the key was written recently) and then might check the multiplex stores again, then fails if still unsure. This brings unnecessary complications and makes the multiplex blobstore less reliable.
More details in: https://fb.quip.com/wOCeAhGx6Oa1

This diff allows us to get rid of the queue and second store lookups and move the decision-making to the callers. The new logic is under the tunable for the safer rollout.

*This diff is safe to land.*

Reviewed By: StanislavGlebik

Differential Revision: D29428268

fbshipit-source-id: 9fc286ed4290defe16d58b2b9983e3baaf1a3fe4
This commit is contained in:
Aida Getoeva 2021-07-05 11:12:20 -07:00 committed by Facebook GitHub Bot
parent fcaa5c72d6
commit 498416a53c
4 changed files with 147 additions and 3 deletions

View File

@ -27,6 +27,7 @@ strum_macros = "0.19"
thiserror = "1.0"
time_ext = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
tokio = { version = "1.7.1", features = ["full", "test-util"] }
tunables = { version = "0.1.0", path = "../../tunables" }
twox-hash = "1.5"
[dev-dependencies]
@ -35,6 +36,7 @@ bytes = { version = "1.0", features = ["serde"] }
fbinit = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbinit-tokio = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
lock_ext = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
maplit = "1.0"
memblob = { version = "0.1.0", path = "../memblob" }
nonzero_ext = "0.2"
readonlyblob = { version = "0.1.0", path = "../readonlyblob" }

View File

@ -21,6 +21,7 @@ use scuba_ext::MononokeScubaSampleBuilder;
use std::fmt;
use std::num::{NonZeroU64, NonZeroUsize};
use std::sync::Arc;
use tunables::tunables;
const SYNC_QUEUE: &str = "mysql_sync_queue";
@ -170,6 +171,11 @@ impl Blobstore for MultiplexedBlobstore {
key: &'a str,
) -> Result<BlobstoreIsPresent> {
let result = self.blobstore.is_present(ctx, key).await?;
if !tunables().get_multiplex_blobstore_is_present_do_queue_lookup() {
// trust the first lookup, don't check the sync-queue
return Ok(result);
}
match &result {
BlobstoreIsPresent::Present | BlobstoreIsPresent::Absent => Ok(result),
BlobstoreIsPresent::ProbablyNotPresent(_) => {

View File

@ -39,6 +39,7 @@ use futures::{
task::{Context, Poll},
};
use lock_ext::LockExt;
use maplit::hashmap;
use memblob::Memblob;
use metaconfig_types::{BlobstoreId, MultiplexId};
use mononoke_types::{BlobstoreBytes, DateTime};
@ -46,6 +47,7 @@ use nonzero_ext::nonzero;
use readonlyblob::ReadOnlyBlobstore;
use scuba_ext::MononokeScubaSampleBuilder;
use sql_construct::SqlConstruct;
use tunables::{with_tunables_async, MononokeTunables};
pub struct Tickable<T> {
pub storage: Arc<Mutex<HashMap<String, T>>>,
@ -490,6 +492,15 @@ async fn multiplexed(fb: FacebookInit) {
nonzero!(1u64),
);
// enable new `is_present` semantics
let get_tunables = || {
let tunables = MononokeTunables::default();
tunables.update_bools(
&hashmap! {"multiplex_blobstore_is_present_do_queue_lookup".to_string() => true},
);
tunables
};
// non-existing key when one blobstore failing
{
let k0 = "k0";
@ -507,7 +518,9 @@ async fn multiplexed(fb: FacebookInit) {
// test `is_present`
let mut present_fut = bs.is_present(ctx, k0).map_err(|_| ()).boxed();
let mut present_fut = with_tunables_async(get_tunables(), bs.is_present(ctx, k0).boxed())
.map_err(|_| ())
.boxed();
assert!(PollOnce::new(Pin::new(&mut present_fut)).await.is_pending());
bs0.tick(None);
@ -558,7 +571,9 @@ async fn multiplexed(fb: FacebookInit) {
assert!(get_fut.await.is_err());
// test `is_present`
let mut present_fut = bs.is_present(ctx, k1).map_err(|_| ()).boxed();
let mut present_fut = with_tunables_async(get_tunables(), bs.is_present(ctx, k1).boxed())
.map_err(|_| ())
.boxed();
assert!(PollOnce::new(Pin::new(&mut present_fut)).await.is_pending());
bs0.tick(Some("case 2: bs0 failed"));
bs1.tick(None);
@ -592,7 +607,125 @@ async fn multiplexed(fb: FacebookInit) {
assert!(get_fut.await.is_err());
// test `is_present`
let mut present_fut = bs.is_present(ctx, k2).map_err(|_| ()).boxed();
let mut present_fut = with_tunables_async(get_tunables(), bs.is_present(ctx, k2).boxed())
.map_err(|_| ())
.boxed();
assert!(PollOnce::new(Pin::new(&mut present_fut)).await.is_pending());
bs0.tick(Some("case 3: bs0 failed"));
bs1.tick(Some("case 3: bs1 failed"));
assert!(present_fut.await.is_err());
}
}
#[fbinit::test]
async fn multiplexed_new_semantics(fb: FacebookInit) {
let ctx = CoreContext::test_mock(fb);
borrowed!(ctx);
let queue = Arc::new(SqlBlobstoreSyncQueue::with_sqlite_in_memory().unwrap());
let bid0 = BlobstoreId::new(0);
let bs0 = Arc::new(Tickable::new());
let bid1 = BlobstoreId::new(1);
let bs1 = Arc::new(Tickable::new());
let bs = MultiplexedBlobstore::new(
MultiplexId::new(1),
vec![(bid0, bs0.clone()), (bid1, bs1.clone())],
vec![],
nonzero!(1usize),
queue.clone(),
MononokeScubaSampleBuilder::with_discard(),
nonzero!(1u64),
);
// enable new `is_present` semantics
let get_tunables = || {
let tunables = MononokeTunables::default();
tunables.update_bools(
&hashmap! {"multiplex_blobstore_is_present_do_queue_lookup".to_string() => false},
);
tunables
};
// non-existing key when one blobstore failing
{
let k0 = "k0";
// test `is_present`
let mut present_fut = with_tunables_async(get_tunables(), bs.is_present(ctx, k0).boxed())
.map_err(|_| ())
.boxed();
assert!(PollOnce::new(Pin::new(&mut present_fut)).await.is_pending());
bs0.tick(None);
bs1.tick(Some("case 1: bs1 failed"));
let expected =
"Some blobstores failed, and other returned None: {BlobstoreId(1): case 1: bs1 failed}"
.to_owned();
match present_fut.await.unwrap() {
BlobstoreIsPresent::ProbablyNotPresent(er) => {
assert_eq!(er.to_string(), expected);
}
_ => {
panic!("case 1: the presence must not be determined");
}
}
}
// only replica containing key failed
{
let v1 = make_value("v1");
let k1 = "k1";
let mut put_fut = bs
.put(ctx, k1.to_owned(), v1.clone())
.map_err(|_| ())
.boxed();
assert_eq!(PollOnce::new(Pin::new(&mut put_fut)).await, Poll::Pending);
bs0.tick(None);
bs1.tick(Some("case 2: bs1 failed"));
put_fut.await.expect("case 2 put_fut failed");
match queue
.get(ctx, k1)
.await
.expect("case 2 get failed")
.as_slice()
{
[entry] => assert_eq!(entry.blobstore_id, bid0),
_ => panic!("only one entry expected"),
}
// test `is_present`
// Now we send only one blobstore request
let mut present_fut = with_tunables_async(get_tunables(), bs.is_present(ctx, k1).boxed())
.map_err(|_| ())
.boxed();
assert!(PollOnce::new(Pin::new(&mut present_fut)).await.is_pending());
bs0.tick(Some("case 2: bs0 failed"));
bs1.tick(None);
let expected =
"Some blobstores failed, and other returned None: {BlobstoreId(0): case 2: bs0 failed}"
.to_owned();
match present_fut.await.unwrap() {
BlobstoreIsPresent::ProbablyNotPresent(er) => {
assert_eq!(er.to_string(), expected);
}
_ => {
panic!("case 1: the key should be absent");
}
}
}
// both replicas fail
{
let k2 = "k2";
// test `is_present`
let mut present_fut = with_tunables_async(get_tunables(), bs.is_present(ctx, k2).boxed())
.map_err(|_| ())
.boxed();
assert!(PollOnce::new(Pin::new(&mut present_fut)).await.is_pending());
bs0.tick(Some("case 3: bs0 failed"));
bs1.tick(Some("case 3: bs1 failed"));

View File

@ -188,6 +188,9 @@ pub struct MononokeTunables {
derived_data_types_disabled: TunableVecOfStringsByRepo,
// How often to check if derived data is disabled or not
derived_data_disabled_watcher_delay_secs: AtomicI64,
// multiplexed blobstore is_present/get new semantics rollout
multiplex_blobstore_is_present_do_queue_lookup: AtomicBool,
}
fn log_tunables(tunables: &TunablesStruct) -> String {