cacheblob: close the persist hole in MemWritesBlobstore

Summary:
As described in D32638849 (ec370e31b7), there is a footgun in `MemWritesBlobstore::persist`.
For the duration of the persist, the items being persisted are temporarily
invisible from outside.

Previous attempts to fix this were made by trying to stop people from accessing
the blobstore while the persist was ongoing.  Instead, let's just extend
`MemWritesBlobstore` so it doesn't have the hole in the first place.  While we
are persisting the items, we keep a reference to the items being persisted in
the cache.  Any key lookups that fail for the live cache can also be checked
against the flushing cache.  Once the flush has completed, we can clear those
items out.

A downside here is that we must clone the keys and values: there need to be two
copies, one left in the flushing portion of the cache, and one that is passed
by value to `Blobstore::put`.  However, since this is a `String` and a
`BlobstoreBytes` (which reference-counts the bytes), cloning should be
relatively cheap.

Reviewed By: mitrandir77

Differential Revision: D34074710

fbshipit-source-id: 4562833266b80bf95ed21a01b4f871e341b2f357
This commit is contained in:
Mark Juggurnauth-Thomas 2022-02-15 10:46:57 -08:00 committed by Facebook GitHub Bot
parent 95ece5fa91
commit 1b339b37b8
2 changed files with 177 additions and 20 deletions

View File

@ -9,6 +9,7 @@ license = "GPLv2+"
[dependencies]
anyhow = "1.0.51"
async-stream = "0.3"
async-trait = "0.1.52"
auto_impl = "0.4"
blobstore = { version = "0.1.0", path = ".." }

View File

@ -9,26 +9,37 @@ use anyhow::{anyhow, Result};
use async_trait::async_trait;
use blobstore::{Blobstore, BlobstoreGetData};
use context::CoreContext;
use futures::{
future,
stream::{self, StreamExt, TryStreamExt},
};
use futures::future;
use futures::stream::{StreamExt, TryStreamExt};
use lock_ext::LockExt;
use mononoke_types::BlobstoreBytes;
use std::{
collections::HashMap,
mem,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
};
use std::collections::HashMap;
use std::mem;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::Mutex as AsyncMutex;
#[derive(Default, Debug)]
pub struct Cache {
live: HashMap<String, BlobstoreBytes>,
flushing: Option<Arc<HashMap<String, BlobstoreBytes>>>,
}
impl Cache {
pub fn len(&self) -> usize {
self.live.len() + self.flushing.as_ref().map_or(0, |flushing| flushing.len())
}
}
/// A blobstore wrapper that reads from the underlying blobstore but writes to memory.
#[derive(Clone, Debug)]
pub struct MemWritesBlobstore<T> {
inner: T,
cache: Arc<Mutex<HashMap<String, BlobstoreBytes>>>,
cache: Arc<Mutex<Cache>>,
// Mutex to ensure only one task is flushing the cache at a time.
// Note: this doesn't wrap the cache as read access is permitted while
// the mutex is held.
flush_mutex: Arc<AsyncMutex<()>>,
no_access_to_inner: Arc<AtomicBool>,
}
@ -43,6 +54,7 @@ impl<T: Blobstore + Clone> MemWritesBlobstore<T> {
Self {
inner: blobstore,
cache: Default::default(),
flush_mutex: Default::default(),
no_access_to_inner: Default::default(),
}
}
@ -57,19 +69,47 @@ impl<T: Blobstore + Clone> MemWritesBlobstore<T> {
));
}
let items = self.cache.with(|cache| mem::replace(cache, HashMap::new()));
stream::iter(items)
.map(|(key, value)| self.inner.put(ctx, key, value))
// Obtain the flush mutex. This should ensure that only one persist
// happens at a time.
let _flush_guard = self.flush_mutex.lock().await;
let items = self.cache.with(|cache| {
if cache.flushing.is_some() {
// This should be prevented by the flush guard.
return Err(anyhow!(
"unexpected persist while another persist is ongoing"
));
}
let flushing = Arc::new(mem::take(&mut cache.live));
cache.flushing = Some(flushing.clone());
Ok(flushing)
})?;
let flush = async_stream::stream! {
for (key, value) in items.iter() {
yield self.inner.put(ctx, key.clone(), value.clone());
}
};
let result = flush
.buffered(4096)
.try_for_each(|_| future::ready(Ok(())))
.await
.await;
// Discard flushing items, whether or not we were successful at
// flushing the cache.
self.cache.with(|cache| {
cache.flushing = None;
});
result
}
pub fn get_inner(&self) -> T {
self.inner.clone()
}
pub fn get_cache(&self) -> &Arc<Mutex<HashMap<String, BlobstoreBytes>>> {
pub fn get_cache(&self) -> &Arc<Mutex<Cache>> {
&self.cache
}
@ -87,7 +127,7 @@ impl<T: Blobstore + Clone> Blobstore for MemWritesBlobstore<T> {
key: String,
value: BlobstoreBytes,
) -> Result<()> {
self.cache.with(|cache| cache.insert(key, value));
self.cache.with(|cache| cache.live.insert(key, value));
Ok(())
}
@ -96,7 +136,17 @@ impl<T: Blobstore + Clone> Blobstore for MemWritesBlobstore<T> {
ctx: &'a CoreContext,
key: &'a str,
) -> Result<Option<BlobstoreGetData>> {
match self.cache.with(|cache| cache.get(key).cloned()) {
let value = self.cache.with(|cache| {
if let Some(value) = cache.live.get(key) {
Some(value.clone())
} else if let Some(flushing) = cache.flushing.as_ref() {
flushing.get(key).cloned()
} else {
None
}
});
match value {
Some(value) => Ok(Some(value.into())),
None => {
if self.no_access_to_inner.load(Ordering::Relaxed) {
@ -112,10 +162,14 @@ impl<T: Blobstore + Clone> Blobstore for MemWritesBlobstore<T> {
#[cfg(test)]
mod test {
use super::*;
use blobstore::PutBehaviour;
use borrowed::borrowed;
use bytes::Bytes;
use cloned::cloned;
use fbinit::FacebookInit;
use memblob::Memblob;
use std::time::Duration;
use tokio::sync::watch;
#[fbinit::test]
async fn basic_read(fb: FacebookInit) {
@ -219,4 +273,106 @@ mod test {
Ok(())
}
/// A blobstore wrapper that prevents writes until a flag is set.
#[derive(Clone, Debug)]
pub struct GatedBlobstore<T> {
inner: T,
allow: watch::Receiver<bool>,
}
impl<T: Blobstore + Clone> GatedBlobstore<T> {
pub fn new(blobstore: T, allow: watch::Receiver<bool>) -> Self {
Self {
inner: blobstore,
allow,
}
}
}
impl<T: std::fmt::Display> std::fmt::Display for GatedBlobstore<T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "GatedBlobstore<{}>", self.inner)
}
}
#[async_trait]
impl<T: Blobstore + Clone> Blobstore for GatedBlobstore<T> {
async fn put<'a>(
&'a self,
ctx: &'a CoreContext,
key: String,
value: BlobstoreBytes,
) -> Result<()> {
let mut allow = self.allow.clone();
while !*allow.borrow() {
// Wait until we are allowed to continue.
allow.changed().await?;
}
self.inner.put(ctx, key, value).await
}
async fn get<'a>(
&'a self,
ctx: &'a CoreContext,
key: &'a str,
) -> Result<Option<BlobstoreGetData>> {
self.inner.get(ctx, key).await
}
}
#[fbinit::test]
async fn test_persist_concurrency(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb);
borrowed!(ctx);
let inner = Memblob::new(PutBehaviour::Overwrite);
let (allow_tx, allow_rx) = watch::channel(false);
let delay = Arc::new(GatedBlobstore::new(inner.clone(), allow_rx));
let outer = MemWritesBlobstore::new(delay);
let key = "key";
let value = BlobstoreBytes::from_bytes("value");
outer.put(ctx, key.to_owned(), value.clone()).await?;
assert!(inner.get(ctx, key).await?.is_none());
let persist = tokio::spawn({
cloned!(ctx, outer);
async move { outer.persist(&ctx).await }
});
// Wait until a write is in flight (there will be more than 1
// receiver, as each write clones the receiver)
while allow_tx.receiver_count() <= 1 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(inner.get(ctx, key).await?, None);
assert_eq!(outer.get(ctx, key).await?, Some(value.clone().into()));
let key2 = "key2";
let value2 = BlobstoreBytes::from_bytes("value2");
outer.put(ctx, key.to_owned(), value2.clone()).await?;
outer.put(ctx, key2.to_owned(), value2.clone()).await?;
assert_eq!(inner.get(ctx, key).await?, None);
assert_eq!(inner.get(ctx, key2).await?, None);
assert_eq!(outer.get(ctx, key).await?, Some(value2.clone().into()));
assert_eq!(outer.get(ctx, key2).await?, Some(value2.clone().into()));
allow_tx.send(true)?;
persist.await??;
assert_eq!(inner.get(ctx, key).await?, Some(value.into()));
assert_eq!(inner.get(ctx, key2).await?, None);
assert_eq!(outer.get(ctx, key).await?, Some(value2.clone().into()));
assert_eq!(outer.get(ctx, key2).await?, Some(value2.clone().into()));
outer.persist(ctx).await?;
assert_eq!(inner.get(ctx, key).await?, Some(value2.clone().into()));
assert_eq!(inner.get(ctx, key2).await?, Some(value2.into()));
Ok(())
}
}