Implement blobstore leases in-process

Summary:
It's wasteful to send the same bunch of bytes to a backing blobstore
twice; introduce an in-memory LeaseOps that, when combined with a presence
cache, ensures that we'll never do that.

Reviewed By: StanislavGlebik

Differential Revision: D8820987

fbshipit-source-id: 406a388cb1f08a35daa49151dadd023fde260e07
This commit is contained in:
Simon Farnsworth 2018-07-18 04:34:12 -07:00 committed by Facebook Github Bot
parent f83fd7225a
commit 0d53c4599e
3 changed files with 83 additions and 1 deletions

View File

@ -17,6 +17,7 @@ use mononoke_types::BlobstoreBytes;
use Blobstore;
use counted_blobstore::CountedBlobstore;
use dummy_lease::DummyLease;
use in_process_lease::InProcessLease;
use locking_cache::CacheBlobstore;
const CACHELIB_MAX_SIZE: usize = 1024000;
@ -52,6 +53,21 @@ where
)
}
pub fn new_cachelib_blobstore<T>(
blobstore: T,
blob_pool: Arc<LruCachePool>,
presence_pool: Arc<LruCachePool>,
) -> CountedBlobstore<CacheBlobstore<CachelibOps, InProcessLease, T>>
where
T: Blobstore + Clone,
{
let cache_ops = CachelibOps::new(blob_pool, presence_pool);
CountedBlobstore::new(
"cachelib",
CacheBlobstore::new(cache_ops, InProcessLease::new(), blobstore),
)
}
impl CacheOps for CachelibOps {
fn get(&self, key: &str) -> BoxFuture<Option<BlobstoreBytes>, ()> {
Ok(self.blob_pool.get(key).map(BlobstoreBytes::from_bytes))

View File

@ -0,0 +1,64 @@
// Copyright (c) 2018-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use std::collections::{HashMap, hash_map::Entry};
use std::sync::{Arc, Mutex};
use futures::{Future, IntoFuture, future::Shared};
use futures::sync::oneshot::{channel, Receiver, Sender};
use futures_ext::{BoxFuture, FutureExt};
use LeaseOps;
/// LeaseOps that use in-memory data structures to avoid two separate tasks writing to the same key
#[derive(Clone)]
pub struct InProcessLease {
leases: Arc<Mutex<HashMap<String, (Sender<()>, Shared<Receiver<()>>)>>>,
}
impl InProcessLease {
pub fn new() -> Self {
Self {
leases: Arc::new(Mutex::new(HashMap::new())),
}
}
}
impl LeaseOps for InProcessLease {
fn try_add_put_lease(&self, key: &str) -> BoxFuture<bool, ()> {
let mut in_flight_leases = self.leases.lock().expect("lock poisoned");
let entry = in_flight_leases.entry(key.to_string());
if let Entry::Occupied(_) = entry {
Ok(false).into_future().boxify()
} else {
let (send, recv) = channel();
entry.or_insert((send, recv.shared()));
Ok(true).into_future().boxify()
}
}
fn wait_for_other_leases(&self, key: &str) -> BoxFuture<(), ()> {
let in_flight_leases = self.leases.lock().expect("lock poisoned");
match in_flight_leases.get(key) {
None => Ok(()).into_future().boxify(),
// The map and map_err are just because FUT.shared() has different Item and Error
// types to FUT.
Some((_, fut)) => fut.clone().map(|_| ()).map_err(|_| ()).boxify(),
}
}
fn release_lease(&self, key: &str, _put_success: bool) -> BoxFuture<(), ()> {
let mut in_flight_leases = self.leases.lock().expect("lock poisoned");
if let Some((sender, _)) = in_flight_leases.remove(key) {
// Don't care if there's no-one listening - just want to wake them up if possible.
let _ = sender.send(());
}
Ok(()).into_future().boxify()
}
}

View File

@ -36,7 +36,7 @@ use futures_ext::{BoxFuture, FutureExt};
use mononoke_types::BlobstoreBytes;
mod cachelib_cache;
pub use cachelib_cache::new_cachelib_blobstore_no_lease;
pub use cachelib_cache::{new_cachelib_blobstore, new_cachelib_blobstore_no_lease};
mod counted_blobstore;
pub use counted_blobstore::CountedBlobstore;
@ -46,6 +46,8 @@ mod dummy_lease;
mod in_memory_cache;
pub use in_memory_cache::MemoizedBlobstore;
mod in_process_lease;
mod locking_cache;
pub use locking_cache::{CacheBlobstore, CacheBlobstoreExt, CacheOps, LeaseOps};