mononoke: ensure filestore chunks aren't zstd-encoded when cached

Summary:
Those blobs are designed to fit in cachelib, so we shouldn't attempt to
zstd-encoded them (not to mention that they don't usually compress very well
since many of those blobs come from binary files, though that's not true of
all).

However, we weren't actually doing this right now. There were a few reasons:

- Our threshold didn't allow enough headroom. I don't know when exactly this
  got introduced (of indeed if that has worked since we introduced cachelib
  compression).
- We serialize a bunch of extra metadata that we really don't need as it's a
  bit meaningless once it's gone through the cache (we notably don't serialize
  this in Memcache). This diff updates us to just store bytes here.

Differential Revision: D28350387

fbshipit-source-id: 4d684ab58cea137044e20951ec4cbb21240b8dfc
This commit is contained in:
Thomas Orozco 2021-05-12 02:22:12 -07:00 committed by Facebook GitHub Bot
parent 9bd8e54a9f
commit 3296132710
4 changed files with 103 additions and 85 deletions

View File

@ -6,7 +6,7 @@
*/
use async_trait::async_trait;
use blobstore::{Blobstore, BlobstoreGetData, CountedBlobstore};
use blobstore::{Blobstore, BlobstoreBytes, BlobstoreGetData, CountedBlobstore};
use bytes::Bytes;
use cachelib::LruCachePool;
use context::PerfCounterType;
@ -122,7 +122,9 @@ impl CacheOps for CachelibOps {
async fn get(&self, key: &str) -> Option<BlobstoreGetData> {
let blob = self.blob_pool.get(key);
blob.ok()?.map(BlobstoreGetData::decode).transpose().ok()?
let blob = blob.ok()??;
let blob = BlobstoreBytes::decode(blob).ok()?;
Some(blob.into())
}
async fn put(&self, key: &str, value: BlobstoreGetData) {
@ -134,7 +136,7 @@ impl CacheOps for CachelibOps {
} else {
None
};
if let Ok(bytes) = value.encode(encode_limit) {
if let Ok(bytes) = value.into_bytes().encode(encode_limit) {
let _ = self.blob_pool.set(key, bytes);
}
}

View File

@ -96,59 +96,6 @@ impl BlobstoreGetData {
pub fn remove_ctime(&mut self) {
self.meta.ctime = None;
}
pub fn encode(self, encode_limit: Option<u64>) -> Result<Bytes, ()> {
let mut bytes = vec![UNCOMPRESSED];
let get_data = BlobstoreGetDataSerialisable::from(self);
unsafe {
abomonation::encode(&get_data, &mut bytes).map_err(|_| ())?
};
match encode_limit {
Some(encode_limit) if bytes.len() as u64 >= encode_limit => {
let mut compressed = Vec::with_capacity(bytes.len());
compressed.push(COMPRESSED);
let mut cursor = Cursor::new(bytes);
cursor.set_position(1);
zstd::stream::copy_encode(cursor, &mut compressed, 0 /* use default */)
.map_err(|_| ())?;
Ok(Bytes::from(compressed))
}
_ => Ok(Bytes::from(bytes)),
}
}
pub fn decode(mut bytes: Bytes) -> Result<Self, ()> {
let prefix_size = 1;
if bytes.len() < prefix_size {
return Err(());
}
let is_compressed = bytes.split_to(prefix_size);
let mut bytes: Vec<u8> = if is_compressed[0] == COMPRESSED {
let cursor = Cursor::new(bytes);
zstd::decode_all(cursor).map_err(|_| ())?
} else {
bytes.bytes().into()
};
let get_data_serialisable =
unsafe { abomonation::decode::<BlobstoreGetDataSerialisable>(&mut bytes) };
let result = get_data_serialisable.and_then(|(get_data_serialisable, tail)| {
if tail.is_empty() {
Some(get_data_serialisable.clone().into())
} else {
None
}
});
match result {
Some(val) => Ok(val),
None => Err(()),
}
}
}
impl From<BlobstoreBytes> for BlobstoreGetData {
@ -248,14 +195,59 @@ impl BlobstoreBytes {
pub fn as_bytes(&self) -> &Bytes {
&self.0
}
}
/// Serialisable counterpart of BlobstoreGetDataSerialisable fields mimic exactly
/// its original struct except in types that cannot be serialised
#[derive(Abomonation, Clone)]
struct BlobstoreGetDataSerialisable {
meta: BlobstoreMetadata,
bytes: BlobstoreBytesSerialisable,
pub fn encode(self, encode_limit: Option<u64>) -> Result<Bytes, ()> {
let mut bytes = vec![UNCOMPRESSED];
let prepared = BlobstoreBytesSerialisable::from(self);
unsafe {
abomonation::encode(&prepared, &mut bytes).map_err(|_| ())?
};
match encode_limit {
Some(encode_limit) if bytes.len() as u64 >= encode_limit => {
let mut compressed = Vec::with_capacity(bytes.len());
compressed.push(COMPRESSED);
let mut cursor = Cursor::new(bytes);
cursor.set_position(1);
zstd::stream::copy_encode(cursor, &mut compressed, 0 /* use default */)
.map_err(|_| ())?;
Ok(Bytes::from(compressed))
}
_ => Ok(Bytes::from(bytes)),
}
}
pub fn decode(mut bytes: Bytes) -> Result<Self, ()> {
let prefix_size = 1;
if bytes.len() < prefix_size {
return Err(());
}
let is_compressed = bytes.split_to(prefix_size);
let mut bytes: Vec<u8> = if is_compressed[0] == COMPRESSED {
let cursor = Cursor::new(bytes);
zstd::decode_all(cursor).map_err(|_| ())?
} else {
bytes.bytes().into()
};
let get_data_serialisable =
unsafe { abomonation::decode::<BlobstoreBytesSerialisable>(&mut bytes) };
let result = get_data_serialisable.and_then(|(get_data_serialisable, tail)| {
if tail.is_empty() {
Some(get_data_serialisable.clone().into())
} else {
None
}
});
match result {
Some(val) => Ok(val),
None => Err(()),
}
}
}
/// Serialisable counterpart of BlobstoreBytes fields mimic exactly its original
@ -263,24 +255,6 @@ struct BlobstoreGetDataSerialisable {
#[derive(Abomonation, Clone)]
struct BlobstoreBytesSerialisable(Vec<u8>);
impl From<BlobstoreGetData> for BlobstoreGetDataSerialisable {
fn from(blob: BlobstoreGetData) -> Self {
BlobstoreGetDataSerialisable {
meta: blob.meta,
bytes: blob.bytes.into(),
}
}
}
impl From<BlobstoreGetDataSerialisable> for BlobstoreGetData {
fn from(blob: BlobstoreGetDataSerialisable) -> Self {
BlobstoreGetData {
meta: blob.meta,
bytes: blob.bytes.into(),
}
}
}
impl From<BlobstoreBytes> for BlobstoreBytesSerialisable {
fn from(blob_bytes: BlobstoreBytes) -> Self {
BlobstoreBytesSerialisable(blob_bytes.into_bytes().bytes().into())

View File

@ -26,6 +26,7 @@ tunables = { version = "0.1.0", path = "../../tunables" }
twox-hash = "1.5"
[dev-dependencies]
assert_matches = "1.5"
borrowed = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbinit = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbinit-tokio = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }

View File

@ -41,9 +41,9 @@ define_stats! {
}
// 4MiB, minus a little space for the STORED prefix and the key. Note that we also arbitrarily add
// 8 bytes because 4 * 1024 * 1024 - 1024 is also our Filestore threshold so it's a good idea to
// 128 bytes because 4 * 1024 * 1024 - 1024 is also our Filestore threshold so it's a good idea to
// not attempt to recompress that given it was chosen to fit in cachelib.
const MAX_CACHELIB_VALUE_SIZE: u64 = 4 * 1024 * 1024 - 1024 + 8;
const MAX_CACHELIB_VALUE_SIZE: u64 = 4 * 1024 * 1024 - 1024 + 128;
const NOT_STORABLE: Bytes = Bytes::from_static(&[0]);
const STORED: Bytes = Bytes::from_static(&[1]);
@ -81,8 +81,9 @@ impl CacheData {
}
if prefix.as_ref() == STORED {
let val = BlobstoreGetData::decode(val)
.map_err(|()| anyhow!("Invalid data in blob cache"))?;
let val = BlobstoreBytes::decode(val)
.map_err(|()| anyhow!("Invalid data in blob cache"))?
.into();
return Ok(Self::Stored(val));
}
@ -249,6 +250,7 @@ impl Cache {
None
};
let stored = value
.into_bytes()
.encode(encode_limit)
.map_err(|()| anyhow!("Could not encode"))
.and_then(|encoded| {
@ -555,6 +557,45 @@ mod test {
Err(anyhow!("Rejected!"))
}
mod caching {
use super::*;
use mononoke_types::{content_chunk::new_blob_and_pointer, MononokeId};
#[fbinit::test]
fn test_filestore_chunk_is_not_compressed(fb: FacebookInit) -> Result<()> {
fn assert_below_threshold(b: &Bytes) -> Result<()> {
assert!(
(b.len() as u64) < MAX_CACHELIB_VALUE_SIZE,
"Blob of size {} would have triggered compression!",
b.len()
);
Ok(())
}
let mut cache = make_cache(fb, "blobs", assert_below_threshold)?;
cache.cachelib_options.attempt_zstd = false;
let (blob, _) = new_blob_and_pointer(vec![0; 4193280]);
// Use a key that looks reasonably close to what we'd actually use in prod.
let key = format!("repo1234.{}", blob.id().blobstore_key());
let key = CacheKey::from_key(&key);
let bytes = BlobstoreBytes::from(blob);
cache.set_in_cache(&key, PresenceData::Get, bytes.clone().into())?;
let cached = cache.get_from_cache(&key)?.context("Blob not in cache")?;
assert_matches::assert_matches!(cached, CacheData::Stored(cached) => {
let expected = BlobstoreGetData::new(BlobstoreMetadata::default(), bytes);
assert_eq!(expected, cached);
});
Ok(())
}
}
mod sharding {
use super::*;
use borrowed::borrowed;