indexedlog: move checksum_type to OpenOptions

Summary:
The motivation of this is, LogRotate might copy dirty (non-flushed) entries
from one Log to another, and it cannot preserve the checksum type for those
entries. There are 2 solutions:

- Make `iter_dirty` return checksum type.
- Make checksum type known by Log directly.

The second choice provides a simpler public API. `append_advanced` can be
removed, then `iter_dirty` is still consistent with `iter`. Therefore this
change.

Differential Revision: D14688174

fbshipit-source-id: 09e07d64c886a5ce9bc48dce8e29d036af1c0381
This commit is contained in:
Jun Wu 2019-04-01 17:08:43 -07:00 committed by Facebook Github Bot
parent 8fc9742997
commit 277d25b581
3 changed files with 89 additions and 46 deletions

View File

@ -8,7 +8,7 @@ extern crate minibench;
extern crate rand;
extern crate tempdir;
use indexedlog::log::{ChecksumType, IndexDef, IndexOutput, Log};
use indexedlog::log::{self, ChecksumType, IndexDef, IndexOutput, Log};
use minibench::{bench, elapsed};
use rand::{ChaChaRng, Rng};
use tempdir::TempDir;
@ -36,12 +36,15 @@ fn main() {
bench("log insertion (no checksum)", || {
let dir = TempDir::new("log").expect("TempDir::new");
let mut log = Log::open(dir.path(), vec![]).unwrap();
let mut log = log::OpenOptions::new()
.create(true)
.checksum_type(ChecksumType::None)
.open(dir.path())
.unwrap();
let buf = gen_buf(N * 20);
elapsed(move || {
for i in 0..N {
log.append_advanced(&buf[20 * i..20 * (i + 1)], ChecksumType::None)
.unwrap();
log.append(&buf[20 * i..20 * (i + 1)]).unwrap();
}
})
});

View File

@ -152,8 +152,11 @@ pub enum IndexOutput {
}
/// What checksum function to use for an entry.
#[derive(Copy, Clone, Debug)]
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum ChecksumType {
/// Choose xxhash64 or xxhash32 automatically based on data size.
Auto,
/// No checksum. Suitable for data that have their own checksum logic.
/// For example, source control commit data might have SHA1 that can
/// verify themselves.
@ -210,6 +213,7 @@ struct LogMetadata {
pub struct OpenOptions {
index_defs: Vec<IndexDef>,
create: bool,
checksum_type: ChecksumType,
}
// Some design notes:
@ -239,40 +243,35 @@ impl Log {
///
/// To write in-memory entries and indexes to disk, call [Log::flush].
pub fn append<T: AsRef<[u8]>>(&mut self, data: T) -> io::Result<()> {
// xxhash64 is slower for smaller data. A quick benchmark on x64 platform shows:
//
// bytes xxhash32 xxhash64 (MB/s)
// 32 1882 1600
// 40 1739 1538
// 48 2285 1846
// 56 2153 2000
// 64 2666 2782
// 72 2400 2322
// 80 2962 2758
// 88 2750 2750
// 96 3200 3692
// 104 2810 3058
// 112 3393 3500
// 120 3000 3428
// 128 3459 4266
const XXHASH64_THRESHOLD: usize = 88;
let data = data.as_ref();
let checksum_type = if data.len() >= XXHASH64_THRESHOLD {
ChecksumType::Xxhash64
} else {
ChecksumType::Xxhash32
};
self.append_advanced(data, checksum_type)
}
/// Advanced version of [Log::append], with more controls, like specifying
/// the checksum algorithm.
pub fn append_advanced<T: AsRef<[u8]>>(
&mut self,
data: T,
checksum_type: ChecksumType,
) -> io::Result<()> {
let data = data.as_ref();
let checksum_type = if self.open_options.checksum_type == ChecksumType::Auto {
// xxhash64 is slower for smaller data. A quick benchmark on x64 platform shows:
//
// bytes xxhash32 xxhash64 (MB/s)
// 32 1882 1600
// 40 1739 1538
// 48 2285 1846
// 56 2153 2000
// 64 2666 2782
// 72 2400 2322
// 80 2962 2758
// 88 2750 2750
// 96 3200 3692
// 104 2810 3058
// 112 3393 3500
// 120 3000 3428
// 128 3459 4266
const XXHASH64_THRESHOLD: usize = 88;
if data.len() >= XXHASH64_THRESHOLD {
ChecksumType::Xxhash64
} else {
ChecksumType::Xxhash32
}
} else {
self.open_options.checksum_type
};
let offset = self.meta.primary_len + self.mem_buf.len() as u64;
// Design note: Currently checksum_type is the only thing that decides
@ -287,6 +286,7 @@ impl Log {
ChecksumType::None => 0,
ChecksumType::Xxhash64 => ENTRY_FLAG_HAS_XXHASH64,
ChecksumType::Xxhash32 => ENTRY_FLAG_HAS_XXHASH32,
ChecksumType::Auto => unreachable!(),
};
self.mem_buf.write_vlq(entry_flags)?;
@ -300,6 +300,7 @@ impl Log {
ChecksumType::Xxhash32 => {
self.mem_buf.write_u32::<LittleEndian>(xxhash32(data))?;
}
ChecksumType::Auto => unreachable!(),
};
self.mem_buf.write_all(data)?;
@ -813,6 +814,7 @@ impl OpenOptions {
Self {
create: false,
index_defs: Vec::new(),
checksum_type: ChecksumType::Auto,
}
}
@ -834,6 +836,14 @@ impl OpenOptions {
self
}
/// Sets the checksum type.
///
/// See [ChecksumType] for details.
pub fn checksum_type(mut self, checksum_type: ChecksumType) -> Self {
self.checksum_type = checksum_type;
self
}
/// Construct [Log] at given directory. Incrementally build up specified
/// indexes.
///
@ -1139,27 +1149,41 @@ mod tests {
}
#[test]
fn test_append_advanced() {
fn test_checksum_type() {
let dir = TempDir::new("log").unwrap();
let log_path = dir.path().join("log");
let mut log = Log::open(&log_path, Vec::new()).unwrap();
let open = |checksum_type| {
OpenOptions::new()
.checksum_type(checksum_type)
.create(true)
.open(&log_path)
.unwrap()
};
let short_bytes = vec![12; 20];
let long_bytes = vec![24; 200];
let mut expected = Vec::new();
let mut log = open(ChecksumType::Auto);
log.append(&short_bytes).unwrap();
expected.push(short_bytes.clone());
log.append(&long_bytes).unwrap();
expected.push(long_bytes.clone());
log.append_advanced(&short_bytes, ChecksumType::None)
.unwrap();
log.flush().unwrap();
let mut log = open(ChecksumType::None);
log.append(&short_bytes).unwrap();
expected.push(short_bytes.clone());
log.append_advanced(&long_bytes, ChecksumType::Xxhash32)
.unwrap();
log.flush().unwrap();
let mut log = open(ChecksumType::Xxhash32);
log.append(&long_bytes).unwrap();
expected.push(long_bytes.clone());
log.append_advanced(&short_bytes, ChecksumType::Xxhash64)
.unwrap();
log.flush().unwrap();
let mut log = open(ChecksumType::Xxhash64);
log.append(&short_bytes).unwrap();
expected.push(short_bytes.clone());
assert_eq!(

View File

@ -36,6 +36,7 @@ const LATEST_FILE: &str = "latest";
pub struct OpenOptions {
max_bytes_per_log: u64,
max_log_count: u64,
checksum_type: log::ChecksumType,
create: bool,
index_defs: Vec<IndexDef>,
}
@ -56,6 +57,7 @@ impl OpenOptions {
max_bytes_per_log,
max_log_count,
index_defs: Vec::new(),
checksum_type: log::ChecksumType::Auto,
create: false,
}
}
@ -74,6 +76,14 @@ impl OpenOptions {
self
}
/// Sets the checksum type.
///
/// See [log::ChecksumType] for details.
pub fn checksum_type(mut self, checksum_type: log::ChecksumType) -> Self {
self.checksum_type = checksum_type;
self
}
/// Set whether create the [LogRotate] structure if it does not exist.
pub fn create(mut self, create: bool) -> Self {
self.create = create;
@ -273,6 +283,7 @@ fn create_empty_log(dir: &Path, open_options: &OpenOptions, latest: u64) -> io::
let log_path = dir.join(&latest_str);
let log = log::OpenOptions::new()
.create(true)
.checksum_type(open_options.checksum_type)
.index_defs(open_options.index_defs.clone())
.open(log_path)?;
AtomicFile::new(&latest_path, AllowOverwrite).write(|f| f.write_all(latest_str.as_bytes()))?;
@ -293,6 +304,7 @@ fn read_logs(dir: &Path, open_options: &OpenOptions, latest: u64) -> io::Result<
let log_path = dir.join(format!("{}", current));
if let Ok(log) = log::OpenOptions::new()
.create(false)
.checksum_type(open_options.checksum_type)
.index_defs(open_options.index_defs.clone())
.open(&log_path)
{
@ -328,7 +340,11 @@ mod tests {
assert!(OpenOptions::new().create(false).open(&path).is_err());
assert!(OpenOptions::new().create(true).open(&path).is_ok());
assert!(OpenOptions::new().create(false).open(&path).is_ok());
assert!(OpenOptions::new()
.checksum_type(log::ChecksumType::None)
.create(false)
.open(&path)
.is_ok());
}
// lookup via index 0