mononoke: add optional compress to packblob put

Summary:
Add optional compress on put controlled by a command line option.

Other than costing some CPU time, this may be a good option when populating repos from existing uncompressed stores to new stores.

Reviewed By: farnz

Differential Revision: D22037756

fbshipit-source-id: e75190ddf9cfd4ed3ea9a18a0ec6d9342a90707b
This commit is contained in:
Alex Hornby 2020-06-17 02:31:22 -07:00 committed by Facebook GitHub Bot
parent 1a3968376c
commit 9c53e07e46
6 changed files with 135 additions and 18 deletions

View File

@ -21,7 +21,7 @@ use metaconfig_types::{
ShardableRemoteDatabaseConfig,
};
use multiplexedblob::{LoggingScrubHandler, MultiplexedBlobstore, ScrubBlobstore, ScrubHandler};
use packblob::PackBlob;
use packblob::{PackBlob, PackOptions};
use readonlyblob::ReadOnlyBlobstore;
use scuba::ScubaSampleBuilder;
use slog::Logger;
@ -39,6 +39,7 @@ pub struct BlobstoreOptions {
pub chaos_options: ChaosOptions,
pub throttle_options: ThrottleOptions,
pub manifold_api_key: Option<String>,
pub pack_options: PackOptions,
}
impl BlobstoreOptions {
@ -46,11 +47,13 @@ impl BlobstoreOptions {
chaos_options: ChaosOptions,
throttle_options: ThrottleOptions,
manifold_api_key: Option<String>,
pack_options: PackOptions,
) -> Self {
Self {
chaos_options,
throttle_options,
manifold_api_key,
pack_options,
}
}
}
@ -61,6 +64,7 @@ impl Default for BlobstoreOptions {
ChaosOptions::new(None, None),
ThrottleOptions::new(None, None),
None,
PackOptions::default(),
)
}
}
@ -268,7 +272,8 @@ pub fn make_blobstore<'a>(
)
.await?;
Arc::new(PackBlob::new(store)) as Arc<dyn Blobstore>
Arc::new(PackBlob::new(store, blobstore_options.pack_options.clone()))
as Arc<dyn Blobstore>
}
};

View File

@ -11,6 +11,7 @@ mod facebook;
mod sql;
pub use chaosblob::ChaosOptions;
pub use packblob::PackOptions;
pub use throttledblob::ThrottleOptions;
pub use crate::blobstore::{make_blobstore, make_blobstore_multiplexed, BlobstoreOptions};

View File

@ -9,4 +9,4 @@ mod envelope;
mod pack;
mod store;
pub use store::PackBlob;
pub use store::{PackBlob, PackOptions};

View File

@ -10,26 +10,52 @@ use crate::pack;
use anyhow::{format_err, Context, Error};
use blobstore::{Blobstore, BlobstoreGetData, BlobstoreWithLink};
use bytes::Bytes;
use context::CoreContext;
use futures::{
compat::Future01CompatExt,
stream::{FuturesUnordered, TryStreamExt},
FutureExt, TryFutureExt,
};
use futures_ext::{BoxFuture as BoxFuture01, FutureExt as Future01Ext};
use futures_ext::{try_boxfuture, BoxFuture as BoxFuture01, FutureExt as Future01Ext};
use mononoke_types::BlobstoreBytes;
use packblob_thrift::{PackedEntry, SingleValue, StorageEnvelope, StorageFormat};
use std::convert::TryInto;
use std::{convert::TryInto, io::Cursor};
#[derive(Clone, Debug, Default)]
pub struct PackOptions {
// If Some, this is used as zstd compression level on put.
// Some(0) means use zstd default level.
put_compress_level: Option<i32>,
}
impl PackOptions {
pub fn new(put_compress_level: Option<i32>) -> Self {
Self { put_compress_level }
}
}
/// A layer over an existing blobstore that uses thrift blob wrappers to allow packing and compression
#[derive(Clone, Debug)]
pub struct PackBlob<T: Blobstore + Clone> {
inner: T,
options: PackOptions,
}
impl<T: Blobstore + Clone> PackBlob<T> {
pub fn new(inner: T) -> Self {
Self { inner }
pub fn new(inner: T, options: PackOptions) -> Self {
Self { inner, options }
}
}
// If compressed version is smaller, use it, otherwise return raw
fn compress_if_worthwhile(value: Bytes, zstd_level: i32) -> Result<SingleValue, Error> {
let cursor = Cursor::new(value.clone());
let compressed = zstd::encode_all(cursor, zstd_level)?;
if compressed.len() < value.len() {
Ok(SingleValue::Zstd(compressed))
} else {
Ok(SingleValue::Raw(value.to_vec()))
}
}
@ -80,9 +106,18 @@ impl<T: Blobstore + Clone> Blobstore for PackBlob<T> {
value: BlobstoreBytes,
) -> BoxFuture01<(), Error> {
key.push_str(ENVELOPE_SUFFIX);
let value = value.into_bytes();
let single = if let Some(zstd_level) = self.options.put_compress_level {
try_boxfuture!(compress_if_worthwhile(value, zstd_level))
} else {
SingleValue::Raw(value.to_vec())
};
// Wrap in thrift encoding
let envelope: PackEnvelope = PackEnvelope(StorageEnvelope {
storage: StorageFormat::Single(SingleValue::Raw(value.into_bytes().to_vec())),
storage: StorageFormat::Single(single),
});
// pass through the put after wrapping
self.inner.put(ctx, key, envelope.into())
@ -145,19 +180,70 @@ mod tests {
use fbinit::FacebookInit;
use memblob::EagerMemblob;
use packblob_thrift::{PackedEntry, PackedValue, SingleValue};
use rand::{RngCore, SeedableRng};
use rand_xorshift::XorShiftRng;
use std::sync::Arc;
#[fbinit::compat_test]
async fn simple_roundtrip_test(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let inner_blobstore = Arc::new(EagerMemblob::new());
let packblob = PackBlob::new(inner_blobstore.clone(), PackOptions::default());
let outer_key = "repo0000.randomkey".to_string();
let packblob = PackBlob::new(inner_blobstore.clone());
let value = BlobstoreBytes::from_bytes(Bytes::copy_from_slice(b"appleveldata"));
let _ = roundtrip(ctx, inner_blobstore.clone(), &packblob, outer_key, value).await?;
Ok(())
}
#[fbinit::compat_test]
async fn compressible_roundtrip_test(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let innerblob = Arc::new(EagerMemblob::new());
let packblob = PackBlob::new(innerblob.clone(), PackOptions::new(Some(0)));
let bytes_in = Bytes::from(vec![7u8; 65535]);
let value = BlobstoreBytes::from_bytes(bytes_in.clone());
let outer_key = "repo0000.compressible".to_string();
let inner_key =
roundtrip(ctx.clone(), innerblob.clone(), &packblob, outer_key, value).await?;
// check inner value is smaller
let inner_value = innerblob.get(ctx.clone(), inner_key).compat().await?;
assert!(inner_value.unwrap().into_bytes().len() < bytes_in.len());
Ok(())
}
#[fbinit::compat_test]
async fn incompressible_roundtrip_test(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let innerblob = Arc::new(EagerMemblob::new());
let packblob = PackBlob::new(innerblob.clone(), PackOptions::new(Some(0)));
let mut rng = XorShiftRng::seed_from_u64(0); // reproducable Rng
let mut bytes_in = vec![7u8; 65535];
rng.fill_bytes(&mut bytes_in);
let bytes_in = Bytes::from(bytes_in);
let outer_key = "repo0000.incompressible".to_string();
let value = BlobstoreBytes::from_bytes(bytes_in.clone());
let inner_key =
roundtrip(ctx.clone(), innerblob.clone(), &packblob, outer_key, value).await?;
// check inner value is larger (due to being raw plus thrift encoding)
let inner_value = innerblob.get(ctx.clone(), inner_key).compat().await?;
assert!(inner_value.unwrap().into_bytes().len() > bytes_in.len());
Ok(())
}
async fn roundtrip(
ctx: CoreContext,
inner_blobstore: Arc<EagerMemblob>,
packblob: &PackBlob<Arc<EagerMemblob>>,
outer_key: String,
value: BlobstoreBytes,
) -> Result<String, Error> {
// Put, this will apply the thrift envelope and save to the inner store
packblob
.put(ctx.clone(), outer_key.clone(), value.clone())
@ -187,7 +273,7 @@ mod tests {
// Check is_present matches
let is_present = inner_blobstore
.is_present(ctx.clone(), inner_key)
.is_present(ctx.clone(), inner_key.clone())
.compat()
.await?;
assert!(is_present);
@ -199,7 +285,7 @@ mod tests {
.await?;
assert!(is_not_present);
Ok(())
Ok(inner_key)
}
#[fbinit::compat_test]
@ -222,7 +308,7 @@ mod tests {
let ctx = CoreContext::test_mock(fb);
let inner_blobstore = EagerMemblob::new();
let packblob = PackBlob::new(inner_blobstore.clone());
let packblob = PackBlob::new(inner_blobstore.clone(), PackOptions::default());
// put_packed, this will apply the thrift envelope and save to the inner store
let inner_key = packblob

View File

@ -37,7 +37,7 @@ use slog_glog_fmt::{kv_categorizer::FacebookCategorizer, kv_defaults::FacebookKV
use blobrepo::BlobRepo;
use blobrepo_factory::{BlobrepoBuilder, Caching, ReadOnlyStorage};
use blobstore_factory::{BlobstoreOptions, ChaosOptions, Scrubbing, ThrottleOptions};
use blobstore_factory::{BlobstoreOptions, ChaosOptions, PackOptions, Scrubbing, ThrottleOptions};
use metaconfig_parser::{RepoConfigs, StorageConfigs};
use metaconfig_types::{BlobConfig, CommonConfig, Redaction, RepoConfig, ScrubAction};
use mononoke_types::RepositoryId;
@ -75,6 +75,7 @@ const READ_QPS_ARG: &str = "blobstore-read-qps";
const WRITE_QPS_ARG: &str = "blobstore-write-qps";
const READ_CHAOS_ARG: &str = "blobstore-read-chaos-rate";
const WRITE_CHAOS_ARG: &str = "blobstore-write-chaos-rate";
const WRITE_ZSTD_ARG: &str = "blobstore-write-zstd-level";
const MANIFOLD_API_KEY_ARG: &str = "manifold-api-key";
const CRYPTO_PROJECT: &str = "SCM";
@ -685,6 +686,13 @@ pub fn add_blobstore_args<'a, 'b>(app: App<'a, 'b>) -> App<'a, 'b> {
.required(false)
.help("Rate of errors on writes. Pass N, it will error randomly 1/N times. For multiplexed stores will only apply to the first store in the multiplex."),
)
.arg(
Arg::with_name(WRITE_ZSTD_ARG)
.long(WRITE_ZSTD_ARG)
.takes_value(true)
.required(false)
.help("Set the zstd compression level to be used on writes via the packed blobstore (if configured). Default is None."),
)
.arg(
Arg::with_name(MANIFOLD_API_KEY_ARG)
.long(MANIFOLD_API_KEY_ARG)
@ -971,10 +979,16 @@ pub fn parse_blobstore_options<'a>(matches: &ArgMatches<'a>) -> BlobstoreOptions
.value_of(MANIFOLD_API_KEY_ARG)
.map(|api_key| api_key.to_string());
let write_zstd_level: Option<i32> = matches.value_of(WRITE_ZSTD_ARG).map(|v| {
v.parse()
.expect("Provided Zstd compression level is not i32")
});
BlobstoreOptions::new(
ChaosOptions::new(read_chaos, write_chaos),
ThrottleOptions::new(read_qps, write_qps),
manifold_api_key,
PackOptions::new(write_zstd_level),
)
}

View File

@ -6,7 +6,7 @@
$ . "${TEST_FIXTURES}/library.sh"
setup configuration
setup configuration in usual uncompressed way
$ MULTIPLEXED=1 PACK_BLOB=0 default_setup_blobimport "blob_files"
hg repo
o C [draft;rev=2;26805aba1e60]
@ -23,6 +23,17 @@ Check the stores have expected counts
$ ls blobstore/1/blobs/ | wc -l
30
Check that the store sizes are different due to the packblob wrappers on store 0
Check that the packed sizes are larger due to the packblob wrappers on store 0
$ PACKED=$(du -s --bytes blobstore/0/blobs/ | cut -f1); UNPACKED=$(du -s --bytes blobstore/1/blobs/ | cut -f1)
$ if [[ "$PACKED" -le "$UNPACKED" ]]; then echo "expected packed $PACKED to be larger than unpacked $UNPACKED"; fi
$ if [[ "$PACKED" -le "$UNPACKED" ]]; then echo "expected packed $PACKED to be larger than unpacked $UNPACKED due to thift wrappers"; fi
Move the uncompressed packed store aside
$ mv "$TESTTMP/blobstore/0" "$TESTTMP/blobstore.raw"
$ rm -rf "$TESTTMP/monsql/sqlite_dbs" "$TESTTMP/blobstore_sync_queue/sqlite_dbs" "$TESTTMP/blobstore"
Blobimport again, but this time enable zstd compression
$ blobimport repo-hg/.hg repo --blobstore-write-zstd-level 0
Check that the packed sizes are smaller due to compression
$ PACKED=$(du -s --bytes blobstore/0/blobs/ | cut -f1); OLDPACKED=$(du -s --bytes blobstore.raw/blobs/ | cut -f1)
$ if [[ "$PACKED" -ge "$OLDPACKED" ]]; then echo "expected packed $PACKED to be smaller than packed $OLDPACKED due to compression"; fi