mononoke: update memblob to be PutBehaviour aware

Summary: Update memblob to be PutBehaviour aware by changing implementation from Blobstore to BlobstoreOps

Differential Revision: D24021166

fbshipit-source-id: 04dd25c5535769ea507120c1886592b808a7bbc6
This commit is contained in:
Alex Hornby 2020-10-07 12:08:51 -07:00 committed by Facebook GitHub Bot
parent fb1d4515df
commit cad15511f8
3 changed files with 90 additions and 43 deletions

View File

@ -36,6 +36,5 @@ fileblob = { path = "fileblob" }
memblob = { path = "memblob" }
mononoke_types = { path = "../mononoke_types" }
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
itertools = "0.8"
tempdir = "0.3"
tokio-compat = "0.1"

View File

@ -10,9 +10,12 @@ use std::fmt;
use std::sync::{Arc, Mutex};
use anyhow::{format_err, Error};
use futures::future::{self, lazy, BoxFuture, FutureExt};
use futures::future::{self, lazy, BoxFuture, FutureExt, TryFutureExt};
use blobstore::{Blobstore, BlobstoreGetData, BlobstoreWithLink};
use blobstore::{
Blobstore, BlobstoreGetData, BlobstorePutOps, BlobstoreWithLink, OverwriteStatus, PutBehaviour,
DEFAULT_PUT_BEHAVIOUR,
};
use context::CoreContext;
use mononoke_types::BlobstoreBytes;
@ -25,11 +28,34 @@ struct MemState {
}
impl MemState {
fn put(&mut self, key: String, value: BlobstoreBytes) {
let id = self.next_id;
self.data.insert(id, value);
self.links.insert(key, id);
self.next_id += 1;
fn put(
&mut self,
key: String,
value: BlobstoreBytes,
put_behaviour: PutBehaviour,
) -> OverwriteStatus {
match put_behaviour {
PutBehaviour::Overwrite => {
let id = self.next_id;
self.data.insert(id, value);
self.links.insert(key, id);
self.next_id += 1;
OverwriteStatus::NotChecked
}
PutBehaviour::IfAbsent | PutBehaviour::OverwriteAndLog => {
if self.links.contains_key(&key) {
if put_behaviour.should_overwrite() {
self.put(key, value, PutBehaviour::Overwrite);
OverwriteStatus::Overwrote
} else {
OverwriteStatus::Prevented
}
} else {
self.put(key, value, PutBehaviour::Overwrite);
OverwriteStatus::New
}
}
}
}
fn link(&mut self, existing_key: String, link_key: String) -> Result<(), Error> {
@ -60,18 +86,21 @@ impl MemState {
#[derive(Clone)]
pub struct EagerMemblob {
state: Arc<Mutex<MemState>>,
put_behaviour: PutBehaviour,
}
/// As EagerMemblob, but methods are lazy - they wait until polled to do anything.
#[derive(Clone)]
pub struct LazyMemblob {
state: Arc<Mutex<MemState>>,
put_behaviour: PutBehaviour,
}
impl EagerMemblob {
pub fn new() -> Self {
pub fn new(put_behaviour: PutBehaviour) -> Self {
Self {
state: Arc::new(Mutex::new(MemState::default())),
put_behaviour,
}
}
@ -83,14 +112,15 @@ impl EagerMemblob {
impl Default for EagerMemblob {
fn default() -> Self {
Self::new()
Self::new(DEFAULT_PUT_BEHAVIOUR)
}
}
impl LazyMemblob {
pub fn new() -> Self {
pub fn new(put_behaviour: PutBehaviour) -> Self {
Self {
state: Arc::new(Mutex::new(MemState::default())),
put_behaviour,
}
}
@ -107,32 +137,48 @@ impl LazyMemblob {
impl Default for LazyMemblob {
fn default() -> Self {
Self::new()
Self::new(DEFAULT_PUT_BEHAVIOUR)
}
}
impl Blobstore for EagerMemblob {
fn put(
impl BlobstorePutOps for EagerMemblob {
fn put_explicit(
&self,
_ctx: CoreContext,
key: String,
value: BlobstoreBytes,
) -> BoxFuture<'static, Result<(), Error>> {
put_behaviour: PutBehaviour,
) -> BoxFuture<'static, Result<OverwriteStatus, Error>> {
let mut inner = self.state.lock().expect("lock poison");
inner.put(key, value);
future::ok(()).boxed()
future::ok(inner.put(key, value, put_behaviour)).boxed()
}
fn put_behaviour(&self) -> PutBehaviour {
self.put_behaviour
}
}
impl Blobstore for EagerMemblob {
fn get(
&self,
_ctx: CoreContext,
key: String,
) -> BoxFuture<'static, Result<Option<BlobstoreGetData>, Error>> {
let inner = self.state.lock().expect("lock poison");
future::ok(inner.get(&key).map(|blob_ref| blob_ref.clone().into())).boxed()
}
fn put(
&self,
ctx: CoreContext,
key: String,
value: BlobstoreBytes,
) -> BoxFuture<'static, Result<(), Error>> {
BlobstorePutOps::put_with_status(self, ctx, key, value)
.map_ok(|_| ())
.boxed()
}
}
impl BlobstoreWithLink for EagerMemblob {
@ -147,24 +193,29 @@ impl BlobstoreWithLink for EagerMemblob {
}
}
impl Blobstore for LazyMemblob {
fn put(
impl BlobstorePutOps for LazyMemblob {
fn put_explicit(
&self,
_ctx: CoreContext,
key: String,
value: BlobstoreBytes,
) -> BoxFuture<'static, Result<(), Error>> {
put_behaviour: PutBehaviour,
) -> BoxFuture<'static, Result<OverwriteStatus, Error>> {
let state = self.state.clone();
lazy(move |_| {
let mut inner = state.lock().expect("lock poison");
inner.put(key, value);
Ok(())
Ok(inner.put(key, value, put_behaviour))
})
.boxed()
}
fn put_behaviour(&self) -> PutBehaviour {
self.put_behaviour
}
}
impl Blobstore for LazyMemblob {
fn get(
&self,
_ctx: CoreContext,
@ -178,6 +229,17 @@ impl Blobstore for LazyMemblob {
})
.boxed()
}
fn put(
&self,
ctx: CoreContext,
key: String,
value: BlobstoreBytes,
) -> BoxFuture<'static, Result<(), Error>> {
BlobstorePutOps::put_with_status(self, ctx, key, value)
.map_ok(|_| ())
.boxed()
}
}
impl BlobstoreWithLink for LazyMemblob {

View File

@ -15,7 +15,6 @@ use std::sync::Arc;
use anyhow::Error;
use bytes::Bytes;
use fbinit::FacebookInit;
use itertools::Either;
use strum::IntoEnumIterator;
use tempdir::TempDir;
@ -122,7 +121,6 @@ macro_rules! blobstore_test_impl {
new: $new_cb: expr,
persistent: $persistent: expr,
has_ctime: $has_ctime: expr,
honors_put_behaviour: $honors_put_behaviour: expr,
}) => {
mod $mod_name {
use super::*;
@ -131,16 +129,8 @@ macro_rules! blobstore_test_impl {
async fn test_overwrite(fb: FacebookInit) -> Result<(), Error> {
let state = $state;
let has_ctime = $has_ctime;
let honors_put_behaviour = $honors_put_behaviour;
let put_behaviours = if honors_put_behaviour {
// try all variants
Either::Left(PutBehaviour::iter())
} else {
// only try one, as it store doesn't support them yet
Either::Right(vec![PutBehaviour::Overwrite].into_iter())
};
let factory = $new_cb;
for b in put_behaviours {
for b in PutBehaviour::iter() {
overwrite(fb, factory(state.clone(), b)?, has_ctime, b).await?
}
Ok(())
@ -181,30 +171,27 @@ macro_rules! blobstore_test_impl {
blobstore_test_impl! {
eager_memblob_test => {
state: (),
new: move |_, _| Ok::<_,Error>(EagerMemblob::new()),
new: move |_, put_behaviour| Ok::<_,Error>(EagerMemblob::new(put_behaviour)),
persistent: false,
has_ctime: false,
honors_put_behaviour: false,
}
}
blobstore_test_impl! {
box_blobstore_test => {
state: (),
new: move |_, _| Ok::<_,Error>(Box::new(EagerMemblob::new())),
new: move |_, put_behaviour| Ok::<_,Error>(Box::new(EagerMemblob::new(put_behaviour))),
persistent: false,
has_ctime: false,
honors_put_behaviour: false,
}
}
blobstore_test_impl! {
lazy_memblob_test => {
state: (),
new: move |_, _| Ok::<_,Error>(LazyMemblob::new()),
new: move |_, put_behaviour| Ok::<_,Error>(LazyMemblob::new(put_behaviour)),
persistent: false,
has_ctime: false,
honors_put_behaviour: false,
}
}
@ -214,7 +201,6 @@ blobstore_test_impl! {
new: move |dir: Arc<TempDir>, put_behaviour,| Fileblob::open(&*dir, put_behaviour),
persistent: true,
has_ctime: true,
honors_put_behaviour: true,
}
}
@ -253,7 +239,7 @@ async fn cache_blob_tests(fb: FacebookInit, expect_zstd: bool) -> Result<(), Err
20 * 1024 * 1024,
)?);
let inner = Arc::new(LazyMemblob::new());
let inner = Arc::new(LazyMemblob::new(PutBehaviour::Overwrite));
let cache_blob =
cacheblob::new_cachelib_blobstore(inner.clone(), blob_pool.clone(), presence_pool, options);