diff --git a/eden/mononoke/blobstore/Cargo.toml b/eden/mononoke/blobstore/Cargo.toml index a09b163fca..202499d3ad 100644 --- a/eden/mononoke/blobstore/Cargo.toml +++ b/eden/mononoke/blobstore/Cargo.toml @@ -36,6 +36,5 @@ fileblob = { path = "fileblob" } memblob = { path = "memblob" } mononoke_types = { path = "../mononoke_types" } fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } -itertools = "0.8" tempdir = "0.3" tokio-compat = "0.1" diff --git a/eden/mononoke/blobstore/memblob/src/lib.rs b/eden/mononoke/blobstore/memblob/src/lib.rs index b93b7dfb95..77ca5b2658 100644 --- a/eden/mononoke/blobstore/memblob/src/lib.rs +++ b/eden/mononoke/blobstore/memblob/src/lib.rs @@ -10,9 +10,12 @@ use std::fmt; use std::sync::{Arc, Mutex}; use anyhow::{format_err, Error}; -use futures::future::{self, lazy, BoxFuture, FutureExt}; +use futures::future::{self, lazy, BoxFuture, FutureExt, TryFutureExt}; -use blobstore::{Blobstore, BlobstoreGetData, BlobstoreWithLink}; +use blobstore::{ + Blobstore, BlobstoreGetData, BlobstorePutOps, BlobstoreWithLink, OverwriteStatus, PutBehaviour, + DEFAULT_PUT_BEHAVIOUR, +}; use context::CoreContext; use mononoke_types::BlobstoreBytes; @@ -25,11 +28,34 @@ struct MemState { } impl MemState { - fn put(&mut self, key: String, value: BlobstoreBytes) { - let id = self.next_id; - self.data.insert(id, value); - self.links.insert(key, id); - self.next_id += 1; + fn put( + &mut self, + key: String, + value: BlobstoreBytes, + put_behaviour: PutBehaviour, + ) -> OverwriteStatus { + match put_behaviour { + PutBehaviour::Overwrite => { + let id = self.next_id; + self.data.insert(id, value); + self.links.insert(key, id); + self.next_id += 1; + OverwriteStatus::NotChecked + } + PutBehaviour::IfAbsent | PutBehaviour::OverwriteAndLog => { + if self.links.contains_key(&key) { + if put_behaviour.should_overwrite() { + self.put(key, value, PutBehaviour::Overwrite); + OverwriteStatus::Overwrote + } else { + OverwriteStatus::Prevented + } + } else { + self.put(key, value, PutBehaviour::Overwrite); + OverwriteStatus::New + } + } + } } fn link(&mut self, existing_key: String, link_key: String) -> Result<(), Error> { @@ -60,18 +86,21 @@ impl MemState { #[derive(Clone)] pub struct EagerMemblob { state: Arc>, + put_behaviour: PutBehaviour, } /// As EagerMemblob, but methods are lazy - they wait until polled to do anything. #[derive(Clone)] pub struct LazyMemblob { state: Arc>, + put_behaviour: PutBehaviour, } impl EagerMemblob { - pub fn new() -> Self { + pub fn new(put_behaviour: PutBehaviour) -> Self { Self { state: Arc::new(Mutex::new(MemState::default())), + put_behaviour, } } @@ -83,14 +112,15 @@ impl EagerMemblob { impl Default for EagerMemblob { fn default() -> Self { - Self::new() + Self::new(DEFAULT_PUT_BEHAVIOUR) } } impl LazyMemblob { - pub fn new() -> Self { + pub fn new(put_behaviour: PutBehaviour) -> Self { Self { state: Arc::new(Mutex::new(MemState::default())), + put_behaviour, } } @@ -107,32 +137,48 @@ impl LazyMemblob { impl Default for LazyMemblob { fn default() -> Self { - Self::new() + Self::new(DEFAULT_PUT_BEHAVIOUR) } } -impl Blobstore for EagerMemblob { - fn put( +impl BlobstorePutOps for EagerMemblob { + fn put_explicit( &self, _ctx: CoreContext, key: String, value: BlobstoreBytes, - ) -> BoxFuture<'static, Result<(), Error>> { + put_behaviour: PutBehaviour, + ) -> BoxFuture<'static, Result> { let mut inner = self.state.lock().expect("lock poison"); - inner.put(key, value); - future::ok(()).boxed() + future::ok(inner.put(key, value, put_behaviour)).boxed() } + fn put_behaviour(&self) -> PutBehaviour { + self.put_behaviour + } +} + +impl Blobstore for EagerMemblob { fn get( &self, _ctx: CoreContext, key: String, ) -> BoxFuture<'static, Result, Error>> { let inner = self.state.lock().expect("lock poison"); - future::ok(inner.get(&key).map(|blob_ref| blob_ref.clone().into())).boxed() } + + fn put( + &self, + ctx: CoreContext, + key: String, + value: BlobstoreBytes, + ) -> BoxFuture<'static, Result<(), Error>> { + BlobstorePutOps::put_with_status(self, ctx, key, value) + .map_ok(|_| ()) + .boxed() + } } impl BlobstoreWithLink for EagerMemblob { @@ -147,24 +193,29 @@ impl BlobstoreWithLink for EagerMemblob { } } -impl Blobstore for LazyMemblob { - fn put( +impl BlobstorePutOps for LazyMemblob { + fn put_explicit( &self, _ctx: CoreContext, key: String, value: BlobstoreBytes, - ) -> BoxFuture<'static, Result<(), Error>> { + put_behaviour: PutBehaviour, + ) -> BoxFuture<'static, Result> { let state = self.state.clone(); lazy(move |_| { let mut inner = state.lock().expect("lock poison"); - - inner.put(key, value); - Ok(()) + Ok(inner.put(key, value, put_behaviour)) }) .boxed() } + fn put_behaviour(&self) -> PutBehaviour { + self.put_behaviour + } +} + +impl Blobstore for LazyMemblob { fn get( &self, _ctx: CoreContext, @@ -178,6 +229,17 @@ impl Blobstore for LazyMemblob { }) .boxed() } + + fn put( + &self, + ctx: CoreContext, + key: String, + value: BlobstoreBytes, + ) -> BoxFuture<'static, Result<(), Error>> { + BlobstorePutOps::put_with_status(self, ctx, key, value) + .map_ok(|_| ()) + .boxed() + } } impl BlobstoreWithLink for LazyMemblob { diff --git a/eden/mononoke/blobstore/test/main.rs b/eden/mononoke/blobstore/test/main.rs index 998f206af5..61dba9cf39 100644 --- a/eden/mononoke/blobstore/test/main.rs +++ b/eden/mononoke/blobstore/test/main.rs @@ -15,7 +15,6 @@ use std::sync::Arc; use anyhow::Error; use bytes::Bytes; use fbinit::FacebookInit; -use itertools::Either; use strum::IntoEnumIterator; use tempdir::TempDir; @@ -122,7 +121,6 @@ macro_rules! blobstore_test_impl { new: $new_cb: expr, persistent: $persistent: expr, has_ctime: $has_ctime: expr, - honors_put_behaviour: $honors_put_behaviour: expr, }) => { mod $mod_name { use super::*; @@ -131,16 +129,8 @@ macro_rules! blobstore_test_impl { async fn test_overwrite(fb: FacebookInit) -> Result<(), Error> { let state = $state; let has_ctime = $has_ctime; - let honors_put_behaviour = $honors_put_behaviour; - let put_behaviours = if honors_put_behaviour { - // try all variants - Either::Left(PutBehaviour::iter()) - } else { - // only try one, as it store doesn't support them yet - Either::Right(vec![PutBehaviour::Overwrite].into_iter()) - }; let factory = $new_cb; - for b in put_behaviours { + for b in PutBehaviour::iter() { overwrite(fb, factory(state.clone(), b)?, has_ctime, b).await? } Ok(()) @@ -181,30 +171,27 @@ macro_rules! blobstore_test_impl { blobstore_test_impl! { eager_memblob_test => { state: (), - new: move |_, _| Ok::<_,Error>(EagerMemblob::new()), + new: move |_, put_behaviour| Ok::<_,Error>(EagerMemblob::new(put_behaviour)), persistent: false, has_ctime: false, - honors_put_behaviour: false, } } blobstore_test_impl! { box_blobstore_test => { state: (), - new: move |_, _| Ok::<_,Error>(Box::new(EagerMemblob::new())), + new: move |_, put_behaviour| Ok::<_,Error>(Box::new(EagerMemblob::new(put_behaviour))), persistent: false, has_ctime: false, - honors_put_behaviour: false, } } blobstore_test_impl! { lazy_memblob_test => { state: (), - new: move |_, _| Ok::<_,Error>(LazyMemblob::new()), + new: move |_, put_behaviour| Ok::<_,Error>(LazyMemblob::new(put_behaviour)), persistent: false, has_ctime: false, - honors_put_behaviour: false, } } @@ -214,7 +201,6 @@ blobstore_test_impl! { new: move |dir: Arc, put_behaviour,| Fileblob::open(&*dir, put_behaviour), persistent: true, has_ctime: true, - honors_put_behaviour: true, } } @@ -253,7 +239,7 @@ async fn cache_blob_tests(fb: FacebookInit, expect_zstd: bool) -> Result<(), Err 20 * 1024 * 1024, )?); - let inner = Arc::new(LazyMemblob::new()); + let inner = Arc::new(LazyMemblob::new(PutBehaviour::Overwrite)); let cache_blob = cacheblob::new_cachelib_blobstore(inner.clone(), blob_pool.clone(), presence_pool, options);