indexedlog: log: implement flush

Summary: Completes the interface.

Reviewed By: DurhamG

Differential Revision: D8156511

fbshipit-source-id: 0d4d05aa23c47117da70ec47cf9be3d4fe41df7b
This commit is contained in:
Jun Wu 2018-07-09 14:14:05 -07:00 committed by Facebook Github Bot
parent b05631eff7
commit ee638e6de4

View File

@ -41,7 +41,7 @@ use lock::ScopedFileLock;
use memmap::Mmap;
use std::collections::BTreeMap;
use std::fs::{self, File};
use std::io::{self, Cursor, Read, Write};
use std::io::{self, Cursor, Read, Seek, SeekFrom, Write};
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::rc::Rc;
@ -163,15 +163,50 @@ impl Log {
let mut meta = Self::load_or_create_meta(&self.dir)?;
// Step 2: Append to the primary log.
unimplemented!();
let primary_path = self.dir.join(PRIMARY_FILE);
let mut primary_file = fs::OpenOptions::new()
.read(true)
.append(true)
.open(&primary_path)?;
let physical_len = primary_file.seek(SeekFrom::End(0))?;
if physical_len < meta.primary_len {
let msg = format!(
"corrupted: {} (expected at least {} bytes)",
primary_path.to_string_lossy(),
meta.primary_len
);
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, msg));
}
// Actually write the primary log. Once it's written, we can remove the in-memory buffer.
primary_file.write_all(&self.mem_buf)?;
meta.primary_len += self.mem_buf.len() as u64;
// Step 3: Reload primary log and indexes to get the latest view.
let (disk_buf, indexes) = Self::load_log_and_indexes(&self.dir, &meta, &self.index_defs)?;
self.meta = meta;
self.disk_buf = disk_buf;
self.indexes = indexes;
self.mem_buf.clear();
// Step 4: Update the indexes. Optionally flush them.
let indexes_to_flush: Vec<usize> = self.index_defs
.iter()
.enumerate()
.filter(|&(_i, def)| {
let indexed = self.meta.indexes.get(def.name).cloned().unwrap_or(0);
indexed + def.lag_threshold < self.meta.primary_len
})
.map(|(i, _def)| i)
.collect();
self.update_indexes_for_on_disk_entries()?;
for i in indexes_to_flush {
let new_length = self.indexes[i].flush()?;
let name = self.index_defs[i].name.to_string();
self.meta.indexes.insert(name, new_length);
}
// Step 5: Write the updated meta file.
self.meta.write_file(self.dir.join(META_FILE))?;
@ -556,5 +591,22 @@ mod tests {
let meta_read = LogMetadata::read_file(&path).expect("read_file");
meta_read == meta
}
fn test_roundtrip_entries(entries: Vec<(Vec<u8>, bool, bool)>) -> bool {
let dir = TempDir::new("log").unwrap();
let mut log = Log::open(dir.path(), Vec::new()).unwrap();
for &(ref data, flush, reload) in &entries {
log.append(data).expect("append");
if flush {
log.flush().expect("flush");
if reload {
log = Log::open(dir.path(), Vec::new()).unwrap();
}
}
}
let retrived: Vec<Vec<u8>> = log.iter().map(|v| v.unwrap().to_vec()).collect();
let entries: Vec<Vec<u8>> = entries.iter().map(|v| v.0.clone()).collect();
retrived == entries
}
}
}