indexedlog: make Log::sync handle non-append-only changes

Summary:
Add extra metadata to help detect non-append-only changes and make Log::sync
handle it by automatically reloading log and indexes. This can reduce chances
that data were written to a different log incorrectly.

Reviewed By: xavierd

Differential Revision: D17732137

fbshipit-source-id: 33668913f1695a6c02af5b81a40214e5a521ef09
This commit is contained in:
Jun Wu 2019-10-04 20:33:58 -07:00 committed by Facebook Github Bot
parent 415dad5587
commit 43919375bb

View File

@ -521,16 +521,20 @@ impl Log {
// Read-only fast path - no need to take directory lock.
if self.mem_buf.is_empty() {
let meta = Self::load_or_create_meta(&self.dir.as_ref().unwrap(), false)?;
check_append_only(self, &meta)?;
let changed = self.meta != meta;
let truncated = self.meta.epoch != meta.epoch;
if !truncated {
check_append_only(self, &meta)?;
}
// No need to reload anything if metadata hasn't changed.
if changed {
// Indexes cannot be reused, if epoch has changed. Otherwise,
// Indexes can be reused, since they do not have new in-memory
// entries, and the on-disk primary log is append-only (so data
// already present in the indexes is valid).
*self = self.open_options.clone().open_internal(
self.dir.as_ref().unwrap(),
Some(&self.indexes),
if truncated { None } else { Some(&self.indexes) },
false, // assume_locked=false
)?;
}
@ -545,8 +549,12 @@ impl Log {
// Step 1: Reload metadata to get the latest view of the files.
let mut meta = Self::load_or_create_meta(&self.dir.as_ref().unwrap(), false)?;
let changed = self.meta != meta;
check_append_only(self, &meta)?;
let truncated = self.meta.epoch != meta.epoch;
if !truncated {
check_append_only(self, &meta)?;
}
// Cases where Log and Indexes need to be reloaded.
if changed && self.open_options.flush_filter.is_some() {
let filter = self.open_options.flush_filter.unwrap();
@ -554,7 +562,8 @@ impl Log {
let mut log = self
.open_options
.clone()
.open_assume_locked(self.dir.as_ref().unwrap())?;
.open_assume_locked(self.dir.as_ref().unwrap())
.context("re-open to run flush_filter")?;
for entry in self.iter_dirty() {
let content = entry?;
@ -569,6 +578,26 @@ impl Log {
}
}
// Replace "self" so we can continue flushing the updated data.
*self = log;
} else if truncated {
// Reload log and indexes, and re-insert entries.
let mut log = self
.open_options
.clone()
.open_assume_locked(self.dir.as_ref().unwrap())
.context(|| {
format!(
"re-open since epoch has changed ({} to {})",
self.meta.epoch, meta.epoch
)
})?;
for entry in self.iter_dirty() {
let content = entry?;
log.append(content)?;
}
// Replace "self" so we can continue flushing the updated data.
*self = log;
}
@ -2810,6 +2839,77 @@ mod tests {
assert!(log.iter().any(|e| e.is_err()));
}
#[cfg(unix)]
#[test]
fn test_non_append_only() {
// Test non-append-only changes can be detected by epoch change.
//
// In this test, we create 2 logs with different content. Then swap
// those 2 logs and call sync.
//
// This test requires renaming files while mmap is present. That
// cannot be done in Windows.
//
// This test should fail if utils::epoch returns a constant.
let dir = tempdir().unwrap();
let indexes = vec![IndexDef::new("key1", index_ref).lag_threshold(1)];
let open_opts = OpenOptions::new().create(true).index_defs(indexes);
// Prepare the first log.
let mut log1 = open_opts.open(dir.path().join("1")).unwrap();
for b in 0..10 {
log1.append(&[b; 7][..]).unwrap();
}
log1.flush().unwrap();
for b in 30..40 {
log1.append(&[b; 21][..]).unwrap();
}
// Prepare the second log
let mut log2 = open_opts.open(dir.path().join("2")).unwrap();
for b in 20..30 {
log2.append(&[b; 21][..]).unwrap();
}
log2.flush().unwrap();
for b in 10..20 {
log2.append(&[b; 7][..]).unwrap();
}
// Rename to emulate the non-append-only change.
fs::rename(dir.path().join("1"), dir.path().join("temp")).unwrap();
fs::rename(dir.path().join("2"), dir.path().join("1")).unwrap();
fs::rename(dir.path().join("temp"), dir.path().join("2")).unwrap();
log1.sync().unwrap();
log2.sync().unwrap();
// Check their content.
fn check_log(log: &Log, range: Range<u8>, len: usize) {
assert_eq!(
log.iter().map(|b| b.unwrap().to_vec()).collect::<Vec<_>>(),
range.clone().map(|i| vec![i; len]).collect::<Vec<_>>(),
);
assert_eq!(
log.lookup_range(0, ..)
.unwrap()
.flat_map(|e| e.unwrap().1.into_vec().unwrap())
.map(|b| b.to_vec())
.collect::<Vec<_>>(),
range.map(|i| vec![i; len]).collect::<Vec<_>>(),
);
}
check_log(&log1, 20..40, 21);
check_log(&log2, 0..20, 7);
let log1 = open_opts.open(dir.path().join("1")).unwrap();
let log2 = open_opts.open(dir.path().join("2")).unwrap();
check_log(&log1, 20..40, 21);
check_log(&log2, 0..20, 7);
}
#[test]
fn test_clear_dirty() {
for lag in vec![0, 1000] {
@ -2861,9 +2961,6 @@ mod tests {
const WRITE_COUNT_PER_THREAD: u8 = 150;
// Some indexes. They have different lag_threshold.
fn index_ref(data: &[u8]) -> Vec<IndexOutput> {
vec![IndexOutput::Reference(0..data.len() as u64)]
}
fn index_copy(data: &[u8]) -> Vec<IndexOutput> {
vec![IndexOutput::Owned(data.to_vec().into_boxed_slice())]
}
@ -2926,6 +3023,10 @@ mod tests {
assert_eq!(count, THREAD_COUNT as u64 * WRITE_COUNT_PER_THREAD as u64);
}
fn index_ref(data: &[u8]) -> Vec<IndexOutput> {
vec![IndexOutput::Reference(0..data.len() as u64)]
}
quickcheck! {
fn test_roundtrip_meta(primary_len: u64, indexes: BTreeMap<String, u64>, epoch: u64) -> bool {
let mut buf = Vec::new();