Refactor MemcacheBlobstore to split into common and memcache parts

Summary:
When we add cachelib bindings to Rust, we're going to want to implement a
cachelib blobstore that's more or less the same as the memcache version, but
backed by a cachelib pool instead of a memcache instance.

Split this code up so that we don't duplicate functionality

Reviewed By: StanislavGlebik

Differential Revision: D8523713

fbshipit-source-id: 882298abab8c208103f6d8c74fee60a768c877f6
This commit is contained in:
Simon Farnsworth 2018-07-10 10:00:20 -07:00 committed by Facebook Github Bot
parent fc880f518b
commit d40a725f7b
7 changed files with 427 additions and 236 deletions

View File

@ -26,7 +26,8 @@ use stats::Timeseries;
use time_ext::DurationExt;
use uuid::Uuid;
use blobstore::{Blobstore, EagerMemblob, MemcacheBlobstore, MemoizedBlobstore, PrefixBlobstore};
use blobstore::{new_memcache_blobstore, Blobstore, EagerMemblob, MemoizedBlobstore,
PrefixBlobstore};
use bookmarks::{self, Bookmark, BookmarkPrefix, Bookmarks};
use changesets::{CachingChangests, ChangesetInsert, Changesets, MysqlChangesets, SqliteChangesets};
use dbbookmarks::{MysqlDbBookmarks, SqliteDbBookmarks};
@ -268,7 +269,7 @@ impl BlobRepo {
io_remotes.iter().collect(),
args.max_concurrent_requests_per_io_thread,
);
let blobstore = MemcacheBlobstore::new(blobstore, "manifold", args.bucket.as_ref())?;
let blobstore = new_memcache_blobstore(blobstore, "manifold", args.bucket.as_ref())?;
let blobstore = MemoizedBlobstore::new(blobstore, usize::MAX, args.blobstore_cache_size);
let filenodes = MysqlFilenodes::open(&args.db_address, DEFAULT_INSERT_CHUNK_SIZE)

View File

@ -11,7 +11,7 @@ use stats::DynamicTimeseries;
use mononoke_types::BlobstoreBytes;
use {Blobstore, MemcacheBlobstoreExt};
use {Blobstore, CacheBlobstoreExt};
define_stats! {
prefix = "mononoke.blobstore";
@ -117,14 +117,14 @@ impl<T: Blobstore> Blobstore for CountedBlobstore<T> {
}
}
impl<T: MemcacheBlobstoreExt> MemcacheBlobstoreExt for CountedBlobstore<T> {
impl<T: CacheBlobstoreExt> CacheBlobstoreExt for CountedBlobstore<T> {
#[inline]
fn get_no_cache_fill(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
self.as_inner().get_no_cache_fill(key)
}
#[inline]
fn get_memcache_only(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
self.as_inner().get_memcache_only(key)
fn get_cache_only(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
self.as_inner().get_cache_only(key)
}
}

View File

@ -40,11 +40,14 @@ pub use counted_blobstore::CountedBlobstore;
mod in_memory_cache;
pub use in_memory_cache::MemoizedBlobstore;
mod locking_cache;
pub use locking_cache::{CacheBlobstore, CacheBlobstoreExt, CacheOps, LeaseOps};
mod memblob;
pub use memblob::{EagerMemblob, LazyMemblob};
mod memcache_cache;
pub use memcache_cache::{MemcacheBlobstore, MemcacheBlobstoreExt};
pub use memcache_cache::new_memcache_blobstore;
mod prefix;
pub use prefix::PrefixBlobstore;

View File

@ -0,0 +1,322 @@
// Copyright (c) 2018-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use std::sync::Arc;
use failure::Error;
use futures::{future, Future, IntoFuture, future::Either};
use futures_ext::{BoxFuture, FutureExt};
use tokio;
use mononoke_types::BlobstoreBytes;
use Blobstore;
/// Extra operations that can be performed on a cache. Other wrappers can implement this trait for
/// e.g. all `WrapperBlobstore<CacheBlobstore<T>>`.
///
/// This is primarily used by the admin command to manually check memcache.
pub trait CacheBlobstoreExt: Blobstore {
fn get_no_cache_fill(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error>;
fn get_cache_only(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error>;
}
/// The operations a cache must provide in order to be usable as the caching layer for a
/// caching blobstore that caches blob contents and blob presence.
/// For caches that do no I/O (e.g. in-memory caches), use Result::into_future() to create the
/// return types - it is up to CacheBlobstore to use future::lazy where this would be unacceptable
/// Errors returned by the cache are always ignored.
///
/// The cache is expected to act as-if each entry is in one of four states:
/// 1. Empty, implying that the cache has no knowledge of the backing store state for this key.
/// 2. Leased, implying that the cache is aware that there is an attempt being made to update the
/// backing store for this key.
/// 3. Present, implying that the cache is aware that a backing store entry exists for this key
/// but does not have a copy of the blob.
/// 4. Known, implying that the cache has a copy of the blob for this key.
///
/// When the cache engages in eviction, it demotes entries according to the following plan:
/// Present and Leased can only demote to Empty.
/// Known can demote to Present or Empty.
/// No state is permitted to demote to Leased.
/// Caches that do not support LeaseOps do not have the Leased state.
pub trait CacheOps: Send + Sync + 'static {
/// Fetch the blob from the cache, if possible. Return `None` if the cache does not have a
/// copy of the blob (i.e. the cache entry is not in Known state).
fn get(&self, key: &str) -> BoxFuture<Option<BlobstoreBytes>, ()>;
/// Tell the cache that the backing store value for this `key` is `value`. This should put the
/// cache entry for this `key` into Known state or a demotion of Known state (Present, Empty).
fn put(&self, key: &str, value: BlobstoreBytes) -> BoxFuture<(), ()>;
/// Ask the cache if it knows whether the backing store has a value for this key. Returns
/// `true` if there is definitely a value (i.e. cache entry in Present or Known state), `false`
/// otherwise (Empty or Leased states).
fn check_present(&self, key: &str) -> BoxFuture<bool, ()>;
}
/// The operations a cache must provide to take part in the update lease protocol. This reduces the
/// thundering herd on writes by using the Leased state to ensure that only one user of this cache
/// can write to the backing store at any time. Note that this is not a guarantee that there will
/// be only one writer to the backing store for any given key - notably, the cache can demote
/// Leased to Empty, thus letting another writer that shares the same cache through to the backing
/// store.
pub trait LeaseOps: Send + Sync + 'static {
/// Ask the cache to attempt to lock out other users of this cache for a particular key.
/// This is an atomic test-and-set of the cache entry; it tests that the entry is Empty, and if
/// the entry is Empty, it changes it to the Leased state.
/// The result is `true` if the test-and-set changed the entry to Leased state, `false`
/// otherwise
fn try_add_put_lease(&self, key: &str) -> BoxFuture<bool, ()>;
/// Wait for a suitable (cache-defined) period between `try_add_put_lease` attempts.
/// For caches without a notification method, this should just be a suitable delay.
/// For caches that can notify on key change, this should wait for that notification.
/// It is acceptable to return from this future without checking the state of the cache entry.
fn wait_for_other_leases(&self, key: &str) -> BoxFuture<(), ()>;
/// Releases any leases held on `key`. `put_success` is a hint; if it is `true`, the entry
/// can transition from Leased to either Present or Empty, while if it is `false`, the entry
/// must transition from Leased to Empty.
fn release_lease(&self, key: &str, put_success: bool) -> BoxFuture<(), ()>;
}
impl<C> CacheOps for Arc<C>
where
C: CacheOps,
{
fn get(&self, key: &str) -> BoxFuture<Option<BlobstoreBytes>, ()> {
self.as_ref().get(key)
}
fn put(&self, key: &str, value: BlobstoreBytes) -> BoxFuture<(), ()> {
self.as_ref().put(key, value)
}
fn check_present(&self, key: &str) -> BoxFuture<bool, ()> {
self.as_ref().check_present(key)
}
}
impl<L> LeaseOps for Arc<L>
where
L: LeaseOps,
{
fn try_add_put_lease(&self, key: &str) -> BoxFuture<bool, ()> {
self.as_ref().try_add_put_lease(key)
}
fn wait_for_other_leases(&self, key: &str) -> BoxFuture<(), ()> {
self.as_ref().wait_for_other_leases(key)
}
fn release_lease(&self, key: &str, put_success: bool) -> BoxFuture<(), ()> {
self.as_ref().release_lease(key, put_success)
}
}
/// A caching layer over a blobstore, using a cache defined by its CacheOps. The idea is that
/// generic code that any caching layer needs is defined here, while code that's cache-specific
/// goes into CacheOps
#[derive(Clone)]
pub struct CacheBlobstore<C, L, T>
where
C: CacheOps + Clone,
L: LeaseOps + Clone,
T: Blobstore + Clone,
{
blobstore: T,
cache: C,
lease: L,
}
impl<C, L, T> CacheBlobstore<C, L, T>
where
C: CacheOps + Clone,
L: LeaseOps + Clone,
T: Blobstore + Clone,
{
pub fn new(cache: C, lease: L, blobstore: T) -> Self {
Self {
blobstore,
cache,
lease,
}
}
fn cache_get(
&self,
key: &str,
) -> impl Future<Item = Option<BlobstoreBytes>, Error = Error> + Send {
self.cache.get(key).or_else(|_| Ok(None))
}
fn cache_put_closure(
&self,
key: &str,
) -> impl FnOnce(Option<BlobstoreBytes>) -> Option<BlobstoreBytes> {
let key = key.to_string();
let cache = self.cache.clone();
move |value| {
if let Some(ref value) = value {
tokio::spawn(cache.put(&key, value.clone()));
}
value
}
}
fn cache_put(
&self,
key: &str,
value: BlobstoreBytes,
) -> impl Future<Item = (), Error = Error> + Send {
let key = key.to_string();
let cache = self.cache.clone();
future::lazy(move || cache.put(&key, value).or_else(|_| Ok(()).into_future()))
}
fn cache_is_present(&self, key: &str) -> impl Future<Item = bool, Error = Error> + Send {
self.cache.check_present(key).or_else(|_| Ok(false))
}
fn take_put_lease(&self, key: &str) -> impl Future<Item = bool, Error = Error> + Send {
self.lease
.try_add_put_lease(key)
.or_else(|_| Ok(false))
.and_then({
let cache = self.cache.clone();
let lease = self.lease.clone();
let this = self.clone();
let key = key.to_string();
move |leased| {
if leased {
Either::A(Ok(true).into_future())
} else {
Either::B(cache.check_present(&key).or_else(|_| Ok(false)).and_then(
move |present| {
if present {
Either::A(Ok(false).into_future())
} else {
Either::B(
lease
.wait_for_other_leases(&key)
.then(move |_| this.take_put_lease(&key).boxify()),
)
}
},
))
}
}
})
}
}
impl<C, L, T> Blobstore for CacheBlobstore<C, L, T>
where
C: CacheOps + Clone,
L: LeaseOps + Clone,
T: Blobstore + Clone,
{
fn get(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
let cache_get = self.cache_get(&key);
let cache_put = self.cache_put_closure(&key);
let blobstore_get = future::lazy({
let blobstore = self.blobstore.clone();
move || blobstore.get(key)
});
cache_get
.and_then({
move |blob| {
if blob.is_some() {
future::Either::A(Ok(blob).into_future())
} else {
future::Either::B(blobstore_get.map(cache_put))
}
}
})
.boxify()
}
fn put(&self, key: String, value: BlobstoreBytes) -> BoxFuture<(), Error> {
let can_put = self.take_put_lease(&key);
let cache_put = self.cache_put(&key, value.clone())
.join(future::lazy({
let lease = self.lease.clone();
let key = key.clone();
move || lease.release_lease(&key, false).or_else(|_| Ok(()))
}))
.then(|_| Ok(()));
let blobstore_put = future::lazy({
let blobstore = self.blobstore.clone();
let lease = self.lease.clone();
let key = key.clone();
move || {
blobstore
.put(key.clone(), value)
.or_else(move |r| lease.release_lease(&key, false).then(|_| Err(r)))
}
});
can_put
.and_then(move |can_put| {
if can_put {
Either::A(blobstore_put.and_then(move |_| cache_put))
} else {
Either::B(Ok(()).into_future())
}
})
.boxify()
}
fn is_present(&self, key: String) -> BoxFuture<bool, Error> {
let cache_check = self.cache_is_present(&key);
let blobstore_check = future::lazy({
let blobstore = self.blobstore.clone();
move || blobstore.is_present(key)
});
cache_check
.and_then(|present| {
if present {
Either::A(Ok(true).into_future())
} else {
Either::B(blobstore_check)
}
})
.boxify()
}
}
impl<C, L, T> CacheBlobstoreExt for CacheBlobstore<C, L, T>
where
C: CacheOps + Clone,
L: LeaseOps + Clone,
T: Blobstore + Clone,
{
fn get_no_cache_fill(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
let cache_get = self.cache_get(&key);
let blobstore_get = self.blobstore.get(key);
cache_get
.and_then(move |blob| {
if blob.is_some() {
Ok(blob).into_future().boxify()
} else {
blobstore_get.boxify()
}
})
.boxify()
}
fn get_cache_only(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
self.cache_get(&key).boxify()
}
}

View File

@ -7,24 +7,25 @@
use std::time::Duration;
use failure::{err_msg, Error};
use futures::{future, Future, IntoFuture, future::Either};
use futures::{Future, IntoFuture, future::Either};
use futures_ext::{BoxFuture, FutureExt};
use memcache::{KeyGen, MemcacheClient};
use rust_thrift::compact_protocol;
use tokio;
use tokio_timer::Timer;
use fbwhoami::FbWhoAmI;
use mononoke_types::BlobstoreBytes;
use Blobstore;
use CacheBlobstore;
use CacheOps;
use CountedBlobstore;
use LeaseOps;
use memcache_lock_thrift::LockState;
/// A caching layer over an existing blobstore, backed by memcache
#[derive(Clone)]
pub struct MemcacheBlobstore<T: Blobstore + Clone> {
blobstore: T,
pub struct MemcacheOps {
memcache: MemcacheClient,
timer: Timer,
keygen: KeyGen,
@ -32,15 +33,6 @@ pub struct MemcacheBlobstore<T: Blobstore + Clone> {
hostname: String,
}
/// Extra operations that can be performed on memcache. Other wrappers can implement this trait for
/// e.g. all `WrapperBlobstore<MemcacheBlobstore<T>>`.
///
/// This is primarily used by the admin command to manually check memcache.
pub trait MemcacheBlobstoreExt: Blobstore + Clone {
fn get_no_cache_fill(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error>;
fn get_memcache_only(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error>;
}
const MEMCACHE_MAX_SIZE: usize = 1024000;
const MC_CODEVER: u32 = 0;
const MC_SITEVER: u32 = 0;
@ -51,7 +43,7 @@ fn mc_raw_put(
key: String,
value: BlobstoreBytes,
presence_key: String,
) -> impl Future<Item = (), Error = Error> {
) -> impl Future<Item = (), Error = ()> {
let uploaded = compact_protocol::serialize(&LockState::uploaded_key(orig_key));
memcache.set(presence_key, uploaded).then(move |_| {
@ -67,12 +59,8 @@ fn mc_raw_put(
})
}
impl<T: Blobstore + Clone> MemcacheBlobstore<T> {
pub fn new<S>(
blobstore: T,
backing_store_name: S,
backing_store_params: S,
) -> Result<CountedBlobstore<Self>, Error>
impl MemcacheOps {
pub fn new<S>(backing_store_name: S, backing_store_params: S) -> Result<Self, Error>
where
S: AsRef<str>,
{
@ -88,61 +76,19 @@ impl<T: Blobstore + Clone> MemcacheBlobstore<T> {
+ backing_store_name.as_ref() + "."
+ backing_store_params.as_ref();
Ok(CountedBlobstore::new(
"memcache",
MemcacheBlobstore {
blobstore: blobstore,
memcache: MemcacheClient::new(),
timer: Timer::default(),
keygen: KeyGen::new(blob_key, MC_CODEVER, MC_SITEVER),
presence_keygen: KeyGen::new(presence_key, MC_CODEVER, MC_SITEVER),
hostname,
},
))
Ok(Self {
memcache: MemcacheClient::new(),
timer: Timer::default(),
keygen: KeyGen::new(blob_key, MC_CODEVER, MC_SITEVER),
presence_keygen: KeyGen::new(presence_key, MC_CODEVER, MC_SITEVER),
hostname,
})
}
// Turns errors to Ok(None)
fn mc_get(&self, key: &String) -> impl Future<Item = Option<BlobstoreBytes>, Error = Error> {
let mc_key = self.keygen.key(key);
self.memcache
.get(mc_key)
.map(|buf| buf.map(|buf| BlobstoreBytes::from_bytes(buf)))
.or_else(|_| Ok(None).into_future())
}
fn mc_put(&self, key: &String, value: BlobstoreBytes) -> impl Future<Item = (), Error = Error> {
let mc_key = self.keygen.key(key);
let presence_key = self.presence_keygen.key(key);
let orig_key = key.clone();
let mc = self.memcache.clone();
future::lazy(move || mc_raw_put(mc, orig_key, mc_key, value, presence_key))
}
fn mc_put_closure(
&self,
key: &String,
) -> impl FnOnce(Option<BlobstoreBytes>) -> Option<BlobstoreBytes> {
let mc_key = self.keygen.key(key);
let presence_key = self.presence_keygen.key(key);
let orig_key = key.clone();
let memcache = self.memcache.clone();
move |value| {
if let Some(ref value) = value {
tokio::spawn(
mc_raw_put(memcache, orig_key, mc_key, value.clone(), presence_key)
.map_err(|_| ()),
);
}
value
}
}
fn mc_get_lock_state(
fn get_lock_state(
&self,
key: String,
) -> impl Future<Item = Option<LockState>, Error = Error> + Send {
) -> impl Future<Item = Option<LockState>, Error = ()> + Send {
let mc_key = self.presence_keygen.key(key.clone());
self.memcache
.get(mc_key.clone())
@ -151,184 +97,103 @@ impl<T: Blobstore + Clone> MemcacheBlobstore<T> {
move |opt_blob| {
let opt_res = opt_blob
.and_then(|blob| compact_protocol::deserialize(Vec::from(blob)).ok());
if let Some(LockState::uploaded_key(up_key)) = &opt_res {
if key != *up_key {
// The lock state is invalid - fix it up by dropping the lock
return Either::A(mc.del(mc_key).map(|_| None));
}
}
Either::B(Ok(opt_res).into_future())
}
})
.or_else(|_| Ok(None).into_future())
}
}
fn mc_is_present(&self, key: &String) -> impl Future<Item = bool, Error = Error> + Send {
let lock_presence = self.mc_get_lock_state(key.clone())
pub fn new_memcache_blobstore<T, S>(
blobstore: T,
backing_store_name: S,
backing_store_params: S,
) -> Result<CountedBlobstore<CacheBlobstore<MemcacheOps, MemcacheOps, T>>, Error>
where
T: Blobstore + Clone,
S: AsRef<str>,
{
let cache_ops = MemcacheOps::new(backing_store_name, backing_store_params)?;
Ok(CountedBlobstore::new(
"memcache",
CacheBlobstore::new(cache_ops.clone(), cache_ops, blobstore),
))
}
impl CacheOps for MemcacheOps {
// Turns errors to Ok(None)
fn get(&self, key: &str) -> BoxFuture<Option<BlobstoreBytes>, ()> {
let mc_key = self.keygen.key(key);
self.memcache
.get(mc_key)
.map(|buf| buf.map(|buf| BlobstoreBytes::from_bytes(buf)))
.boxify()
}
fn put(&self, key: &str, value: BlobstoreBytes) -> BoxFuture<(), ()> {
let mc_key = self.keygen.key(key);
let presence_key = self.presence_keygen.key(key);
let orig_key = key.to_string();
mc_raw_put(self.memcache.clone(), orig_key, mc_key, value, presence_key).boxify()
}
fn check_present(&self, key: &str) -> BoxFuture<bool, ()> {
let lock_presence = self.get_lock_state(key.to_string())
.map(|lockstate| match lockstate {
// mc_get_lock_state will delete the lock and return None if there's a bad
// get_lock_state will delete the lock and return None if there's a bad
// uploaded_key
Some(LockState::uploaded_key(_)) => true,
_ => false,
});
let blob_presence = self.mc_get(key).map(|blob| blob.is_some());
let mc_key = self.keygen.key(key);
let blob_presence = self.memcache.get(mc_key).map(|blob| blob.is_some());
lock_presence.and_then(move |present| {
if present {
Either::A(Ok(true).into_future())
} else {
Either::B(blob_presence)
}
})
}
fn mc_can_put_to_bs(&self, key: String) -> impl Future<Item = bool, Error = Error> + Send {
// We can't put if the key is present.
// Otherwise, we sleep and retry
self.mc_is_present(&key).and_then({
let mc = self.memcache.clone();
let mc_key = self.presence_keygen.key(key.clone());
let hostname = self.hostname.clone();
let timer = self.timer.clone();
let this = self.clone();
move |present| {
if present {
// It's in the blobstore already
Either::A(Ok(false).into_future())
} else {
let lockstate = compact_protocol::serialize(&LockState::locked_by(hostname));
let lock_ttl = Duration::from_secs(10);
let retry_delay = Duration::from_millis(200);
Either::B(
mc.add_with_ttl(mc_key, lockstate, lock_ttl)
.then(move |locked| {
if Ok(true) == locked {
// We own the lock
Either::A(Ok(true).into_future())
} else {
// Someone else owns the lock, or memcache failed
Either::B(future::lazy(move || {
timer
.sleep(retry_delay)
.then(move |_| this.mc_can_put_to_bs(key))
.boxify()
}))
}
}),
)
}
}
})
}
// The following are used by the admin command to manually check on memcache
pub fn get_no_cache_fill(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
let mc_get = self.mc_get(&key);
let bs_get = self.blobstore.get(key);
mc_get
.and_then(move |blob| {
if blob.is_some() {
Ok(blob).into_future().boxify()
} else {
bs_get.boxify()
}
})
.boxify()
}
pub fn get_memcache_only(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
self.mc_get(&key).boxify()
}
}
impl<T: Blobstore + Clone> Blobstore for MemcacheBlobstore<T> {
fn get(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
let mc_get = self.mc_get(&key);
let mc_put = self.mc_put_closure(&key);
let bs_get = future::lazy({
let blobstore = self.blobstore.clone();
move || blobstore.get(key)
});
mc_get
.and_then({
move |blob| {
if blob.is_some() {
future::Either::A(Ok(blob).into_future())
} else {
future::Either::B(bs_get.map(mc_put))
}
}
})
.boxify()
}
fn put(&self, key: String, value: BlobstoreBytes) -> BoxFuture<(), Error> {
let presence_key = self.presence_keygen.key(key.clone());
let can_put = self.mc_can_put_to_bs(key.clone());
let mc_put = self.mc_put(&key, value.clone());
let bs_put = future::lazy({
let mc = self.memcache.clone();
let blobstore = self.blobstore.clone();
move || {
blobstore
.put(key, value)
.or_else(move |r| mc.del(presence_key).then(|_| Err(r)))
}
});
can_put
.and_then(|can_put| {
if can_put {
Either::A(bs_put.and_then(move |_| mc_put))
} else {
Either::B(Ok(()).into_future())
}
})
.boxify()
}
fn is_present(&self, key: String) -> BoxFuture<bool, Error> {
let mc_check = self.mc_is_present(&key);
let bs_check = future::lazy({
let blobstore = self.blobstore.clone();
move || blobstore.is_present(key)
});
mc_check
.and_then(|present| {
lock_presence
.and_then(move |present| {
if present {
Either::A(Ok(true).into_future())
} else {
Either::B(bs_check)
Either::B(blob_presence)
}
})
.boxify()
}
}
impl<T: Blobstore + Clone> MemcacheBlobstoreExt for MemcacheBlobstore<T> {
fn get_no_cache_fill(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
let mc_get = self.mc_get(&key);
let bs_get = self.blobstore.get(key);
impl LeaseOps for MemcacheOps {
fn try_add_put_lease(&self, key: &str) -> BoxFuture<bool, ()> {
let lockstate = compact_protocol::serialize(&LockState::locked_by(self.hostname.clone()));
let lock_ttl = Duration::from_secs(10);
let mc_key = self.presence_keygen.key(key.clone());
mc_get
.and_then(move |blob| {
if blob.is_some() {
Ok(blob).into_future().boxify()
} else {
bs_get.boxify()
}
})
self.memcache
.add_with_ttl(mc_key, lockstate, lock_ttl)
.boxify()
}
fn get_memcache_only(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
self.mc_get(&key).boxify()
fn wait_for_other_leases(&self, _key: &str) -> BoxFuture<(), ()> {
let retry_delay = Duration::from_millis(200);
self.timer.sleep(retry_delay).map_err(|_| ()).boxify()
}
fn release_lease(&self, key: &str, put_success: bool) -> BoxFuture<(), ()> {
let mc_key = self.presence_keygen.key(key.clone());
if put_success {
let uploaded = compact_protocol::serialize(&LockState::uploaded_key(key.to_string()));
self.memcache.set(mc_key, uploaded).boxify()
} else {
self.memcache.del(mc_key).boxify()
}
}
}

View File

@ -9,7 +9,7 @@ use futures_ext::BoxFuture;
use mononoke_types::BlobstoreBytes;
use {Blobstore, MemcacheBlobstoreExt};
use {Blobstore, CacheBlobstoreExt};
/// A layer over an existing blobstore that prepends a fixed string to each get and put.
#[derive(Clone)]
@ -30,15 +30,15 @@ impl<T: Blobstore + Clone> PrefixBlobstore<T> {
}
}
impl<T: MemcacheBlobstoreExt> MemcacheBlobstoreExt for PrefixBlobstore<T> {
impl<T: CacheBlobstoreExt + Clone> CacheBlobstoreExt for PrefixBlobstore<T> {
#[inline]
fn get_no_cache_fill(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
self.blobstore.get_no_cache_fill(self.prepend(key))
}
#[inline]
fn get_memcache_only(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
self.blobstore.get_memcache_only(self.prepend(key))
fn get_cache_only(&self, key: String) -> BoxFuture<Option<BlobstoreBytes>, Error> {
self.blobstore.get_cache_only(self.prepend(key))
}
}

View File

@ -35,7 +35,7 @@ use futures::stream::iter_ok;
use tokio_core::reactor::Core;
use blobrepo::{BlobRepo, ManifoldArgs};
use blobstore::{Blobstore, MemcacheBlobstore, MemcacheBlobstoreExt, PrefixBlobstore};
use blobstore::{new_memcache_blobstore, Blobstore, CacheBlobstoreExt, PrefixBlobstore};
use cmdlib::args;
use futures_ext::{BoxFuture, FutureExt};
use manifoldblob::ManifoldBlob;
@ -163,13 +163,13 @@ fn fetch_content(
.boxify()
}
fn get_memcache<B: MemcacheBlobstoreExt>(
fn get_cache<B: CacheBlobstoreExt>(
blobstore: &B,
key: String,
mode: String,
) -> BoxFuture<Option<BlobstoreBytes>, Error> {
if mode == "cache-only" {
blobstore.get_memcache_only(key)
blobstore.get_cache_only(key)
} else if mode == "no-fill" {
blobstore.get_no_cache_fill(key)
} else {
@ -213,21 +213,21 @@ fn main() {
}
(None, true) => blobstore.get(key.clone()).boxify(),
(Some(mode), false) => {
let blobstore = MemcacheBlobstore::new(
let blobstore = new_memcache_blobstore(
blobstore,
"manifold",
manifold_args.bucket.as_ref(),
).unwrap();
let blobstore = PrefixBlobstore::new(blobstore, repo_id.prefix());
get_memcache(&blobstore, key.clone(), mode)
get_cache(&blobstore, key.clone(), mode)
}
(Some(mode), true) => {
let blobstore = MemcacheBlobstore::new(
let blobstore = new_memcache_blobstore(
blobstore,
"manifold",
manifold_args.bucket.as_ref(),
).unwrap();
get_memcache(&blobstore, key.clone(), mode)
get_cache(&blobstore, key.clone(), mode)
}
}.map(move |value| {
println!("{:?}", value);