indexedlog: change IndexDef.lag_threshold from bytes to entries

Summary:
This is more friendly for indexedlog users - deciding lag_threshold by number
of entries is easier than by bytes.

Initially, I thought checking `bytes` is cheaper and checking `entries` is more
expensive. However, practically we will have to build indexes for `entires`
anyway. So we do know the number of entries lagging behind.

Reviewed By: DurhamG

Differential Revision: D20042045

fbshipit-source-id: 73042e406bd8b262d5ef9875e45a3fd5f29f78cf
This commit is contained in:
Jun Wu 2020-02-28 09:19:41 -08:00 committed by Facebook Github Bot
parent 55363a78a7
commit afb24f8a8a
4 changed files with 107 additions and 35 deletions

View File

@ -99,6 +99,9 @@ pub struct Log {
pub(crate) mem_buf: Pin<Box<Vec<u8>>>,
meta: LogMetadata,
indexes: Vec<Index>,
// On-disk indexes are lagging. How many entires do they lag?
// This affects whether `sync()` updates indexes.
index_lags: Vec<usize>,
// Whether the index and the log is out-of-sync. In which case, index-based reads (lookups)
// should return errors because it can no longer be trusted.
// This could be improved to be per index. For now, it's a single state for simplicity. It's
@ -360,12 +363,14 @@ impl Log {
}
// Create the new Log.
let index_lags = self.index_lags.clone();
let mut log = Log {
dir: self.dir.clone(),
disk_buf,
mem_buf,
meta: self.meta.clone(),
indexes,
index_lags,
index_corrupted: false,
open_options: self.open_options.clone(),
};
@ -565,19 +570,6 @@ impl Log {
meta.primary_len += self.mem_buf.len() as u64;
self.mem_buf.clear();
// Decide what indexes need to be updated on disk.
let indexes_to_flush: Vec<usize> = self
.open_options
.index_defs
.iter()
.enumerate()
.filter(|&(_i, def)| {
let indexed = self.meta.indexes.get(&def.metaname()).cloned().unwrap_or(0);
indexed.saturating_add(def.lag_threshold) < meta.primary_len
})
.map(|(i, _def)| i)
.collect();
// Step 3: Reload primary log and indexes to get the latest view.
let (disk_buf, indexes) = Self::load_log_and_indexes(
&self.dir,
@ -599,6 +591,7 @@ impl Log {
Self::set_index_log_len(self.indexes.iter_mut(), meta.primary_len);
Some(&self.indexes)
},
&mut self.index_lags,
self.open_options.fsync,
)?;
@ -608,11 +601,16 @@ impl Log {
// Step 4: Update the indexes. Optionally flush them.
self.update_indexes_for_on_disk_entries()?;
for i in indexes_to_flush {
for (i, count) in self.index_lags.clone().into_iter().enumerate() {
if self.open_options.index_defs[i].lag_threshold >= count {
continue;
}
let new_length = self.indexes[i].flush();
let new_length = self.maybe_set_index_error(new_length.map_err(Into::into))?;
let name = self.open_options.index_defs[i].metaname();
self.meta.indexes.insert(name, new_length);
// The index will be no longer lagging, since meta is being updated.
self.index_lags[i] = 0;
}
// Step 5: Write the updated meta file.
@ -997,7 +995,13 @@ impl Log {
offset: u64,
data_offset: u64,
) -> crate::Result<()> {
for (index, def) in self.indexes.iter_mut().zip(&self.open_options.index_defs) {
for (i, (index, def)) in self
.indexes
.iter_mut()
.zip(&self.open_options.index_defs)
.enumerate()
{
self.index_lags[i] += 1;
for index_output in (def.func)(data) {
match index_output {
IndexOutput::Reference(range) => {
@ -1024,6 +1028,8 @@ impl Log {
}
/// Build in-memory index so they cover all entries stored in `self.disk_buf`.
///
/// Returns number of entries built per index.
fn update_indexes_for_on_disk_entries(&mut self) -> crate::Result<()> {
let result = self.update_indexes_for_on_disk_entries_unchecked();
self.maybe_set_index_error(result)
@ -1031,15 +1037,20 @@ impl Log {
fn update_indexes_for_on_disk_entries_unchecked(&mut self) -> crate::Result<()> {
// It's a programming error to call this when mem_buf is not empty.
assert!(self.mem_buf.is_empty());
for (index, def) in self.indexes.iter_mut().zip(&self.open_options.index_defs) {
Self::update_index_for_on_disk_entry_unchecked(
for (i, (index, def)) in self
.indexes
.iter_mut()
.zip(&self.open_options.index_defs)
.enumerate()
{
let count = Self::update_index_for_on_disk_entry_unchecked(
&self.dir,
index,
def,
&self.disk_buf,
self.meta.primary_len,
)?;
self.index_lags[i] += count;
}
Ok(())
}
@ -1050,9 +1061,11 @@ impl Log {
def: &IndexDef,
disk_buf: &Bytes,
primary_len: u64,
) -> crate::Result<()> {
) -> crate::Result<usize> {
// The index meta is used to store the next offset the index should be built.
let mut offset = Self::get_index_log_len(index)?;
// How many times the index function gets called?
let mut count = 0;
// PERF: might be worthwhile to cache xxhash verification result.
while let Some(entry_result) =
Self::read_entry_from_buf(&path, disk_buf, offset).context(|| {
@ -1062,6 +1075,7 @@ impl Log {
)
})?
{
count += 1;
let data = entry_result.data;
for index_output in (def.func)(data) {
match index_output {
@ -1090,7 +1104,7 @@ impl Log {
// The index now contains all entries. Write "next_offset" as the index meta.
Self::set_index_log_len(std::iter::once(index), primary_len);
Ok(())
Ok(count)
}
/// Read [`LogMetadata`] from the given directory. If `create` is `true`,
@ -1164,12 +1178,16 @@ impl Log {
/// If `reuse_indexes` is not None, they are existing indexes that match `index_defs`
/// order. This should only be used in `sync` code path when the on-disk `meta` matches
/// the in-memory `meta`. Otherwise it is not a sound use.
///
/// The indexes loaded by this function can be lagging.
/// Use `update_indexes_for_on_disk_entries` to update them.
fn load_log_and_indexes(
dir: &GenericPath,
meta: &LogMetadata,
index_defs: &[IndexDef],
mem_buf: &Pin<Box<Vec<u8>>>,
reuse_indexes: Option<&Vec<Index>>,
index_lags: &mut Vec<usize>,
fsync: bool,
) -> crate::Result<(Bytes, Vec<Index>)> {
let primary_buf = match dir.as_opt_path() {
@ -1189,7 +1207,10 @@ impl Log {
None => {
// No indexes are reused, reload them.
let mut indexes = Vec::with_capacity(index_defs.len());
for def in index_defs.iter() {
for (i, def) in index_defs.iter().enumerate() {
// Re-calculate index_lag since the index is freshly loaded
// without any pending entries.
index_lags[i] = 0;
let index_len = meta.indexes.get(&def.metaname()).cloned().unwrap_or(0);
indexes.push(Self::load_index(
dir,
@ -1203,14 +1224,19 @@ impl Log {
}
Some(indexes) => {
assert_eq!(index_defs.len(), indexes.len());
assert_eq!(index_lags.len(), indexes.len());
let mut new_indexes = Vec::with_capacity(indexes.len());
// Avoid reloading the index from disk.
// Update their ExternalKeyBuffer so they have the updated meta.primary_len.
for (index, def) in indexes.iter().zip(index_defs) {
for (i, (index, def)) in indexes.iter().zip(index_defs).enumerate() {
let index_len = meta.indexes.get(&def.metaname()).cloned().unwrap_or(0);
let index = if index_len > Self::get_index_log_len(index).unwrap_or(0) {
// The on-disk index covers more entries. Loading it is probably
// better than reusing the existing in-memory index.
//
// Re-calculate index_lag since the index is freshly loaded
// without any pending entries.
index_lags[i] = 0;
Self::load_index(dir, &def, index_len, key_buf.clone(), fsync)?
} else {
let mut index = index.try_clone()?;
@ -1231,6 +1257,7 @@ impl Log {
}
/// Load a single index.
/// Callsites should set related index_lag to 0.
fn load_index(
dir: &GenericPath,
def: &IndexDef,

View File

@ -51,7 +51,7 @@ pub struct IndexDef {
/// `name` is used so the existing index won't be reused incorrectly.
pub(crate) name: &'static str,
/// How many bytes (as counted in the file backing [`Log`]) could be left not
/// How many entries (as counted in the file backing [`Log`]) could be left not
/// indexed on-disk.
///
/// This is related to [`Index`] implementation detail. Since it's append-only
@ -62,7 +62,7 @@ pub struct IndexDef {
/// [`Log::open`].
///
/// Practically, this correlates to how fast `func` is.
pub(crate) lag_threshold: u64,
pub(crate) lag_threshold: usize,
}
/// Output of an index function. Bytes that can be used for lookups.
@ -175,9 +175,12 @@ impl IndexDef {
func: index_func,
name,
// For a typical commit hash index (20-byte). IndexedLog insertion
// overhead is about 1500 entries per millisecond. Allow about 3ms
// lagging in that case.
lag_threshold: 5000,
// overhead is about 1500 entries per millisecond. For other things
// the xxhash check might take some time. 500 entries takes <1ms
// for commit hash index, and might be okay for non-commit-hash
// indexes. Users should customize the value if the default is not
// good enough.
lag_threshold: 500,
}
}
@ -192,7 +195,7 @@ impl IndexDef {
/// [`Log::open`].
///
/// Practically, this correlates to how fast `func` is.
pub fn lag_threshold(self, lag_threshold: u64) -> Self {
pub fn lag_threshold(self, lag_threshold: usize) -> Self {
Self {
func: self.func,
name: self.name,
@ -338,21 +341,23 @@ impl OpenOptions {
let result: crate::Result<_> = (|| {
let meta = LogMetadata::new_with_primary_len(PRIMARY_START_OFFSET);
let mem_buf = Box::pin(Vec::new());
let mut index_lags = vec![0; self.index_defs.len()];
let (disk_buf, indexes) = Log::load_log_and_indexes(
&dir,
&meta,
&self.index_defs,
&mem_buf,
None,
&mut index_lags,
self.fsync,
)?;
Ok(Log {
dir,
disk_buf,
mem_buf,
meta,
indexes,
index_lags,
index_corrupted: false,
open_options: self.clone(),
})
@ -399,12 +404,14 @@ impl OpenOptions {
})?;
let mem_buf = Box::pin(Vec::new());
let mut index_lags = vec![0; self.index_defs.len()];
let (disk_buf, indexes) = Log::load_log_and_indexes(
dir,
&meta,
&self.index_defs,
&mem_buf,
reuse_indexes,
&mut index_lags,
self.fsync,
)?;
let mut log = Log {
@ -413,6 +420,7 @@ impl OpenOptions {
mem_buf,
meta,
indexes,
index_lags,
index_corrupted: false,
open_options: self.clone(),
};

View File

@ -189,7 +189,7 @@ fn test_iter_and_iter_dirty() {
);
}
fn get_index_defs(lag_threshold: u64) -> Vec<IndexDef> {
fn get_index_defs(lag_threshold: usize) -> Vec<IndexDef> {
// Two index functions. First takes every 2 bytes as references. The second takes every 3
// bytes as owned slices.
// Keys starting with '-' are considered as "deletion" requests.
@ -287,9 +287,9 @@ fn test_slice_to_bytes() {
fn test_index_manual() {
// Test index lookups with these combinations:
// - Index key: Reference and Owned.
// - Index lag_threshold: 0, 20, 1000.
// - Index lag_threshold: 0, 1, ....
// - Entries: Mixed on-disk and in-memory ones.
for lag in [0u64, 20, 1000].iter().cloned() {
for lag in [0usize, 1, 2, 5, 1000].iter().cloned() {
let dir = tempdir().unwrap();
let mut log = Log::open(dir.path(), get_index_defs(lag)).unwrap();
let entries: [&[u8]; 7] = [b"1", b"", b"2345", b"", b"78", b"3456", b"35"];
@ -518,6 +518,40 @@ fn test_index_func() {
assert_eq!(log.iter().count(), log.lookup(2, b"x").unwrap().count());
}
#[test]
fn test_index_lag_threshold() {
for lag in vec![0, 1, 2, 5, 8] {
let dir = tempdir().unwrap();
let def = IndexDef::new("a", |_| vec![IndexOutput::Reference(0..1)]).lag_threshold(lag);
let index_filename = def.filename();
let open_opts = OpenOptions::new().create(true).index_defs(vec![def]);
let get_index_size = || -> u64 {
let index_path = dir.path().join(&index_filename);
index_path.metadata().map(|m| m.len()).unwrap_or(0)
};
let mut unindexed_entries = 0;
let mut log = open_opts.open(dir.path()).unwrap();
for i in 0..100 {
let index_size_before = get_index_size();
if i % 3 == 0 {
// This is optional, but makes the test more interesting.
log = open_opts.open(dir.path()).unwrap();
}
log.append(&vec![i as u8; (i + 1) as usize]).unwrap();
log.sync().unwrap();
let index_size_after = get_index_size();
let index_should_change = unindexed_entries >= lag;
if index_should_change {
unindexed_entries = 0;
} else {
unindexed_entries += 1;
}
let index_changed = index_size_after != index_size_before;
assert_eq!(index_should_change, index_changed);
}
}
}
#[test]
fn test_flush_filter() {
let dir = tempdir().unwrap();
@ -564,7 +598,7 @@ fn test_flush_filter() {
}
/// Get a `Log` with index defined on first 8 bytes.
fn log_with_index(path: &Path, lag: u64) -> Log {
fn log_with_index(path: &Path, lag: usize) -> Log {
let index_func = |_data: &[u8]| vec![IndexOutput::Reference(0..8)];
let index_def = IndexDef::new("i", index_func).lag_threshold(lag);
Log::open(path, vec![index_def]).unwrap()
@ -831,7 +865,10 @@ fn test_repair_and_delete_content() {
let path = dir.path();
let open_opts = OpenOptions::new()
.create(true)
.index("c", |_| vec![IndexOutput::Reference(0..1)]);
.index_defs(vec![IndexDef::new("c", |_| {
vec![IndexOutput::Reference(0..1)]
})
.lag_threshold(1)]);
let long_lived_log = RefCell::new(open_opts.open(()).unwrap());
let open = || open_opts.open(path);

View File

@ -1248,7 +1248,7 @@ mod tests {
.index_defs(vec![IndexDef::new("idx", |_| {
vec![IndexOutput::Reference(0..2)]
})
.lag_threshold(u64::max_value())])
.lag_threshold(usize::max_value())])
.max_bytes_per_log(100)
.max_log_count(3);