mirror of
https://github.com/facebook/sapling.git
synced 2024-10-09 16:31:02 +03:00
mononoke/virtually_sharded_blobstore: refactor for better testability
Summary: See the next diff for motivation: this makes it easier to implement. Differential Revision: D28350388 fbshipit-source-id: 026605cf8296a945d6cc81b7f36d9198325bf13c
This commit is contained in:
parent
aa95a51112
commit
9bd8e54a9f
@ -63,6 +63,7 @@ impl AsRef<[u8]> for CacheKey {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum CacheData {
|
||||
/// Represents data that was found in cache.
|
||||
Stored(BlobstoreGetData),
|
||||
@ -176,14 +177,14 @@ impl<T> VirtuallyShardedBlobstore<T> {
|
||||
shards: NonZeroUsize,
|
||||
cachelib_options: CachelibBlobstoreOptions,
|
||||
) -> Self {
|
||||
let inner = Inner::new(
|
||||
blobstore,
|
||||
let cache = Cache {
|
||||
blob_pool,
|
||||
presence_pool,
|
||||
shards,
|
||||
allow_all_filter,
|
||||
cache_filter: allow_all_filter,
|
||||
cachelib_options,
|
||||
);
|
||||
};
|
||||
|
||||
let inner = Inner::new(blobstore, shards, cache);
|
||||
|
||||
Self {
|
||||
inner: Arc::new(inner),
|
||||
@ -201,38 +202,14 @@ impl<T: fmt::Debug> fmt::Debug for VirtuallyShardedBlobstore<T> {
|
||||
}
|
||||
}
|
||||
|
||||
struct Inner<T> {
|
||||
blobstore: T,
|
||||
write_shards: Shards,
|
||||
read_shards: Shards,
|
||||
struct Cache {
|
||||
presence_pool: VolatileLruCachePool,
|
||||
blob_pool: VolatileLruCachePool,
|
||||
cache_filter: fn(&Bytes) -> Result<()>,
|
||||
cachelib_options: CachelibBlobstoreOptions,
|
||||
}
|
||||
|
||||
impl<T> Inner<T> {
|
||||
pub fn new(
|
||||
blobstore: T,
|
||||
blob_pool: VolatileLruCachePool,
|
||||
presence_pool: VolatileLruCachePool,
|
||||
shards: NonZeroUsize,
|
||||
cache_filter: fn(&Bytes) -> Result<()>,
|
||||
cachelib_options: CachelibBlobstoreOptions,
|
||||
) -> Self {
|
||||
Self {
|
||||
blobstore,
|
||||
write_shards: Shards::new(shards, PerfCounterType::BlobPutsShardAccessWait),
|
||||
read_shards: Shards::new(shards, PerfCounterType::BlobGetsShardAccessWait),
|
||||
blob_pool,
|
||||
presence_pool,
|
||||
cache_filter,
|
||||
cachelib_options,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Inner<T> {
|
||||
impl Cache {
|
||||
fn get_from_cache(&self, key: &CacheKey) -> Result<Option<CacheData>> {
|
||||
let val = match self.blob_pool.get(key)? {
|
||||
Some(val) => val,
|
||||
@ -332,6 +309,24 @@ impl<T> Inner<T> {
|
||||
}
|
||||
}
|
||||
|
||||
struct Inner<T> {
|
||||
blobstore: T,
|
||||
write_shards: Shards,
|
||||
read_shards: Shards,
|
||||
cache: Cache,
|
||||
}
|
||||
|
||||
impl<T> Inner<T> {
|
||||
fn new(blobstore: T, shards: NonZeroUsize, cache: Cache) -> Self {
|
||||
Self {
|
||||
blobstore,
|
||||
write_shards: Shards::new(shards, PerfCounterType::BlobPutsShardAccessWait),
|
||||
read_shards: Shards::new(shards, PerfCounterType::BlobGetsShardAccessWait),
|
||||
cache,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn report_deduplicated_put(ctx: &CoreContext, key: &str) {
|
||||
STATS::puts_deduped.add_value(1);
|
||||
|
||||
@ -363,7 +358,7 @@ impl<T: Blobstore + 'static> Blobstore for VirtuallyShardedBlobstore<T> {
|
||||
|
||||
// First, check the cache, and acquire a permit for this key if necessary.
|
||||
|
||||
let take_lease = match inner.get_from_cache(&cache_key)? {
|
||||
let take_lease = match inner.cache.get_from_cache(&cache_key)? {
|
||||
Some(CacheData::Stored(v)) => {
|
||||
ctx.perf_counters()
|
||||
.increment_counter(PerfCounterType::CachelibHits);
|
||||
@ -387,7 +382,9 @@ impl<T: Blobstore + 'static> Blobstore for VirtuallyShardedBlobstore<T> {
|
||||
let permit = if take_lease {
|
||||
let acq = inner
|
||||
.read_shards
|
||||
.acquire(&ctx, &key, ticket, || inner.get_from_cache(&cache_key))
|
||||
.acquire(&ctx, &key, ticket, || {
|
||||
inner.cache.get_from_cache(&cache_key)
|
||||
})
|
||||
.await?;
|
||||
|
||||
match acq {
|
||||
@ -425,7 +422,9 @@ impl<T: Blobstore + 'static> Blobstore for VirtuallyShardedBlobstore<T> {
|
||||
|
||||
// And finally, attempt to cache what we got back.
|
||||
if let Some(ref data) = res {
|
||||
let _ = inner.set_in_cache(&cache_key, PresenceData::Get, data.clone());
|
||||
let _ = inner
|
||||
.cache
|
||||
.set_in_cache(&cache_key, PresenceData::Get, data.clone());
|
||||
}
|
||||
|
||||
Ok(res)
|
||||
@ -446,7 +445,7 @@ impl<T: Blobstore + 'static> Blobstore for VirtuallyShardedBlobstore<T> {
|
||||
let cache_key = CacheKey::from_key(&key);
|
||||
let presence = PresenceData::from_put(&value);
|
||||
|
||||
if let Ok(Some(KnownToExist)) = inner.check_presence(&cache_key, presence) {
|
||||
if let Ok(Some(KnownToExist)) = inner.cache.check_presence(&cache_key, presence) {
|
||||
report_deduplicated_put(&ctx, &key);
|
||||
return Ok(());
|
||||
}
|
||||
@ -457,7 +456,7 @@ impl<T: Blobstore + 'static> Blobstore for VirtuallyShardedBlobstore<T> {
|
||||
let acq = inner
|
||||
.write_shards
|
||||
.acquire(&ctx, &key, ticket, || {
|
||||
inner.check_presence(&cache_key, presence)
|
||||
inner.cache.check_presence(&cache_key, presence)
|
||||
})
|
||||
.await?;
|
||||
|
||||
@ -475,7 +474,7 @@ impl<T: Blobstore + 'static> Blobstore for VirtuallyShardedBlobstore<T> {
|
||||
let res = inner.blobstore.put(&ctx, key, value.clone()).await?;
|
||||
|
||||
let value = BlobstoreGetData::new(BlobstoreMetadata::default(), value);
|
||||
let _ = inner.set_in_cache(&cache_key, presence, value);
|
||||
let _ = inner.cache.set_in_cache(&cache_key, presence, value);
|
||||
|
||||
Ok(res)
|
||||
};
|
||||
@ -489,7 +488,7 @@ impl<T: Blobstore + 'static> Blobstore for VirtuallyShardedBlobstore<T> {
|
||||
let cache_key = CacheKey::from_key(key);
|
||||
let presence = PresenceData::Get;
|
||||
|
||||
if let Ok(Some(KnownToExist)) = inner.check_presence(&cache_key, presence) {
|
||||
if let Ok(Some(KnownToExist)) = inner.cache.check_presence(&cache_key, presence) {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
@ -498,7 +497,7 @@ impl<T: Blobstore + 'static> Blobstore for VirtuallyShardedBlobstore<T> {
|
||||
let exists = inner.blobstore.is_present(ctx, key).await?;
|
||||
|
||||
if exists {
|
||||
let _ = inner.set_is_present(&cache_key, presence);
|
||||
let _ = inner.cache.set_is_present(&cache_key, presence);
|
||||
}
|
||||
|
||||
Ok(exists)
|
||||
@ -516,11 +515,25 @@ mod test {
|
||||
|
||||
fn make_blobstore<B: Blobstore>(
|
||||
fb: FacebookInit,
|
||||
blob: B,
|
||||
blobstore: B,
|
||||
blob_pool_name: &str,
|
||||
cache_shards: NonZeroUsize,
|
||||
cache_filter: fn(&Bytes) -> Result<()>,
|
||||
) -> Result<VirtuallyShardedBlobstore<B>> {
|
||||
let cache = make_cache(fb, blob_pool_name, cache_filter)?;
|
||||
|
||||
let inner = Inner::new(blobstore, cache_shards, cache);
|
||||
|
||||
Ok(VirtuallyShardedBlobstore {
|
||||
inner: Arc::new(inner),
|
||||
})
|
||||
}
|
||||
|
||||
fn make_cache(
|
||||
fb: FacebookInit,
|
||||
blob_pool_name: &str,
|
||||
cache_filter: fn(&Bytes) -> Result<()>,
|
||||
) -> Result<Cache> {
|
||||
static INSTANCE: OnceCell<()> = OnceCell::new();
|
||||
INSTANCE.get_or_init(|| {
|
||||
let config = cachelib::LruCacheConfig::new(64 * 1024 * 1024);
|
||||
@ -530,17 +543,11 @@ mod test {
|
||||
let blob_pool = cachelib::get_or_create_volatile_pool(blob_pool_name, 8 * 1024 * 1024)?;
|
||||
let presence_pool = cachelib::get_or_create_volatile_pool("presence", 8 * 1024 * 1024)?;
|
||||
|
||||
let inner = Inner::new(
|
||||
blob,
|
||||
blob_pool,
|
||||
Ok(Cache {
|
||||
presence_pool,
|
||||
cache_shards,
|
||||
blob_pool,
|
||||
cache_filter,
|
||||
CachelibBlobstoreOptions::default(),
|
||||
);
|
||||
|
||||
Ok(VirtuallyShardedBlobstore {
|
||||
inner: Arc::new(inner),
|
||||
cachelib_options: CachelibBlobstoreOptions::default(),
|
||||
})
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user