From ee638e6de450f54f6a3ea636af3edc320bfc7d10 Mon Sep 17 00:00:00 2001 From: Jun Wu Date: Mon, 9 Jul 2018 14:14:05 -0700 Subject: [PATCH] indexedlog: log: implement flush Summary: Completes the interface. Reviewed By: DurhamG Differential Revision: D8156511 fbshipit-source-id: 0d4d05aa23c47117da70ec47cf9be3d4fe41df7b --- lib/indexedlog/src/log.rs | 56 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/lib/indexedlog/src/log.rs b/lib/indexedlog/src/log.rs index 21dedbad45..5d558f2f28 100644 --- a/lib/indexedlog/src/log.rs +++ b/lib/indexedlog/src/log.rs @@ -41,7 +41,7 @@ use lock::ScopedFileLock; use memmap::Mmap; use std::collections::BTreeMap; use std::fs::{self, File}; -use std::io::{self, Cursor, Read, Write}; +use std::io::{self, Cursor, Read, Seek, SeekFrom, Write}; use std::ops::Range; use std::path::{Path, PathBuf}; use std::rc::Rc; @@ -163,15 +163,50 @@ impl Log { let mut meta = Self::load_or_create_meta(&self.dir)?; // Step 2: Append to the primary log. - unimplemented!(); + let primary_path = self.dir.join(PRIMARY_FILE); + let mut primary_file = fs::OpenOptions::new() + .read(true) + .append(true) + .open(&primary_path)?; + + let physical_len = primary_file.seek(SeekFrom::End(0))?; + if physical_len < meta.primary_len { + let msg = format!( + "corrupted: {} (expected at least {} bytes)", + primary_path.to_string_lossy(), + meta.primary_len + ); + return Err(io::Error::new(io::ErrorKind::UnexpectedEof, msg)); + } + + // Actually write the primary log. Once it's written, we can remove the in-memory buffer. + primary_file.write_all(&self.mem_buf)?; + meta.primary_len += self.mem_buf.len() as u64; // Step 3: Reload primary log and indexes to get the latest view. let (disk_buf, indexes) = Self::load_log_and_indexes(&self.dir, &meta, &self.index_defs)?; + + self.meta = meta; self.disk_buf = disk_buf; self.indexes = indexes; + self.mem_buf.clear(); // Step 4: Update the indexes. Optionally flush them. + let indexes_to_flush: Vec = self.index_defs + .iter() + .enumerate() + .filter(|&(_i, def)| { + let indexed = self.meta.indexes.get(def.name).cloned().unwrap_or(0); + indexed + def.lag_threshold < self.meta.primary_len + }) + .map(|(i, _def)| i) + .collect(); self.update_indexes_for_on_disk_entries()?; + for i in indexes_to_flush { + let new_length = self.indexes[i].flush()?; + let name = self.index_defs[i].name.to_string(); + self.meta.indexes.insert(name, new_length); + } // Step 5: Write the updated meta file. self.meta.write_file(self.dir.join(META_FILE))?; @@ -556,5 +591,22 @@ mod tests { let meta_read = LogMetadata::read_file(&path).expect("read_file"); meta_read == meta } + + fn test_roundtrip_entries(entries: Vec<(Vec, bool, bool)>) -> bool { + let dir = TempDir::new("log").unwrap(); + let mut log = Log::open(dir.path(), Vec::new()).unwrap(); + for &(ref data, flush, reload) in &entries { + log.append(data).expect("append"); + if flush { + log.flush().expect("flush"); + if reload { + log = Log::open(dir.path(), Vec::new()).unwrap(); + } + } + } + let retrived: Vec> = log.iter().map(|v| v.unwrap().to_vec()).collect(); + let entries: Vec> = entries.iter().map(|v| v.0.clone()).collect(); + retrived == entries + } } }