mononoke: try to compress values if they above cachelib limit

Summary: If a value is above cachelib limit let's try to compress it.

Reviewed By: krallin

Differential Revision: D22139644

fbshipit-source-id: 9eb366e8ec94fe66529d27892a988b035989332a
This commit is contained in:
Stanislau Hlebik 2020-06-20 01:04:21 -07:00 committed by Facebook GitHub Bot
parent 09c2e232f7
commit dc84f9741d
9 changed files with 102 additions and 11 deletions

View File

@ -25,8 +25,10 @@ bytes = { version = "0.5", features = ["serde"] }
futures = { version = "0.3", features = ["async-await", "compat"] }
futures-old = { package = "futures", version = "0.1" }
thiserror = "1.0"
zstd = "=0.4.23"
[dev-dependencies]
cacheblob = { path = "cacheblob" }
fileblob = { path = "fileblob" }
memblob = { path = "memblob" }
mononoke_types = { path = "../mononoke_types" }

View File

@ -22,6 +22,8 @@ use crate::dummy::DummyLease;
use crate::in_process_lease::InProcessLease;
use crate::locking_cache::CacheBlobstore;
const MAX_CACHELIB_VALUE_SIZE: u64 = 4 * 1024 * 1024;
/// A caching layer over an existing blobstore, backed by cachelib
#[derive(Clone)]
pub struct CachelibOps {
@ -87,7 +89,7 @@ impl CacheOps for CachelibOps {
let _ = self.presence_pool.set(key, Bytes::from(b"P".as_ref()));
value
.encode()
.encode(MAX_CACHELIB_VALUE_SIZE)
.and_then(|bytes| self.blob_pool.set(key, bytes).map(|_| ()).map_err(|_| ()))
.into_future()
.boxify()

View File

@ -20,7 +20,7 @@ anyhow = "1.0"
bytes = { version = "0.5", features = ["serde"] }
futures = { version = "0.3", features = ["async-await", "compat"] }
futures-old = { package = "futures", version = "0.1" }
once_cell = "1.2"
once_cell = "1.4"
rand = { version = "0.7", features = ["small_rng"] }
tokio = { version = "=0.2.13", features = ["full"] }
twox-hash = "1.5"

View File

@ -16,6 +16,7 @@ use anyhow::Error;
use futures::future::BoxFuture;
use futures_ext::{BoxFuture as BoxFuture01, FutureExt as FutureExt01};
use futures_old::future::{self as future01, Future as Future01};
use std::io::Cursor;
use thiserror::Error;
use context::CoreContext;
@ -35,6 +36,9 @@ pub struct BlobstoreGetData {
bytes: BlobstoreBytes,
}
const UNCOMPRESSED: u8 = b'0';
const COMPRESSED: u8 = b'1';
impl BlobstoreGetData {
#[inline]
pub fn new(meta: BlobstoreMetadata, bytes: BlobstoreBytes) -> Self {
@ -79,16 +83,39 @@ impl BlobstoreGetData {
self.meta.ctime = None;
}
pub fn encode(self) -> Result<Bytes, ()> {
let mut bytes = Vec::new();
pub fn encode(self, encode_limit: u64) -> Result<Bytes, ()> {
let mut bytes = vec![UNCOMPRESSED];
let get_data = BlobstoreGetDataSerialisable::from(self);
unsafe { abomonation::encode(&get_data, &mut bytes).map_err(|_| ())? };
Ok(Bytes::from(bytes))
if bytes.len() as u64 >= encode_limit {
let mut compressed = Vec::with_capacity(bytes.len());
compressed.push(COMPRESSED);
let mut cursor = Cursor::new(bytes);
cursor.set_position(1);
zstd::stream::copy_encode(cursor, &mut compressed, 0 /* use default */)
.map_err(|_| ())?;
Ok(Bytes::from(compressed))
} else {
Ok(Bytes::from(bytes))
}
}
pub fn decode(bytes: Bytes) -> Result<Self, ()> {
let mut bytes: Vec<u8> = bytes.bytes().into();
pub fn decode(mut bytes: Bytes) -> Result<Self, ()> {
let prefix_size = 1;
if bytes.len() < prefix_size {
return Err(());
}
let is_compressed = bytes.split_to(prefix_size);
let mut bytes: Vec<u8> = if is_compressed[0] == COMPRESSED {
let cursor = Cursor::new(bytes);
zstd::decode_all(cursor).map_err(|_| ())?
} else {
bytes.bytes().into()
};
let get_data_serialisable =
unsafe { abomonation::decode::<BlobstoreGetDataSerialisable>(&mut bytes) };

View File

@ -161,3 +161,63 @@ blobstore_test_impl! {
has_ctime: true,
}
}
#[cfg(fbcode_build)]
fn create_cache(fb: FacebookInit) -> Result<(), Error> {
let config = cachelib::LruCacheConfig::new(128 * 1024 * 1024);
cachelib::init_cache_once(fb, config)?;
Ok(())
}
#[cfg(fbcode_build)]
#[fbinit::compat_test]
async fn test_cache_blob(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
create_cache(fb)?;
let blob_pool = cachelib::get_or_create_pool("blob_pool", 20 * 1024 * 1024)?;
let presence_pool = cachelib::get_or_create_pool("presence_pool", 20 * 1024 * 1024)?;
let inner = LazyMemblob::new();
let cache_blob =
cacheblob::new_cachelib_blobstore(inner, Arc::new(blob_pool), Arc::new(presence_pool));
let small_key = "small_key".to_string();
let value = BlobstoreBytes::from_bytes(Bytes::copy_from_slice(b"smalldata"));
cache_blob
.put(ctx.clone(), small_key.clone(), value.clone())
.compat()
.await?;
assert_eq!(
cache_blob
.get(ctx.clone(), small_key)
.compat()
.await?
.map(|bytes| bytes.into_bytes()),
Some(value)
);
let large_key = "large_key".to_string();
let size = 5 * 1024 * 1024;
let mut large_value = Vec::with_capacity(size);
for _ in 0..size {
large_value.push(b'a');
}
let large_value = BlobstoreBytes::from_bytes(large_value);
cache_blob
.put(ctx.clone(), large_key.clone(), large_value.clone())
.compat()
.await?;
assert_eq!(
cache_blob
.get(ctx, large_key)
.compat()
.await?
.map(|bytes| bytes.into_bytes()),
Some(large_value)
);
Ok(())
}

View File

@ -31,7 +31,7 @@ gotham_derive = "=0.5.0-dev"
http = "0.2"
hyper = "0.13"
mime = "0.3.14"
once_cell = "1.2"
once_cell = "1.4"
serde = { version = "1.0", features = ["derive", "rc"] }
serde_cbor = "0.11"
serde_json = "1.0"

View File

@ -11,7 +11,7 @@ anyhow = "1.0"
faster-hex = "0.4"
http = "0.2"
mime = "0.3.14"
once_cell = "1.2"
once_cell = "1.4"
quickcheck = "0.9"
serde = { version = "1.0", features = ["derive", "rc"] }

View File

@ -13,7 +13,7 @@ cached_config = { git = "https://github.com/facebookexperimental/rust-shed.git",
anyhow = "1.0"
arc-swap = "0.4"
futures = { version = "0.3", features = ["async-await", "compat"] }
once_cell = "1.2"
once_cell = "1.4"
serde_json = "1.0"
slog = { version = "2.5", features = ["max_level_debug"] }

View File

@ -50,7 +50,7 @@ hex = "0.4"
inlinable_string = "0.1"
itertools = "0.8"
lazy_static = "1.0"
once_cell = "1.2"
once_cell = "1.4"
percent-encoding = "2.1"
regex = "1.3.7"
slog = { version = "2.5", features = ["max_level_debug"] }