From f38bbfd92eef1d4fed74445efa21dfde193cd2c3 Mon Sep 17 00:00:00 2001 From: Jun Wu Date: Mon, 1 Apr 2019 17:08:43 -0700 Subject: [PATCH] logrotate: partially implement flush Summary: Implement the basic flush logic. Missing bits are listed as TODO items. Differential Revision: D14688177 fbshipit-source-id: 3613009ec2c216398af6eaff44487a20ceeb97ef --- lib/indexedlog/src/rotate.rs | 65 ++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/lib/indexedlog/src/rotate.rs b/lib/indexedlog/src/rotate.rs index 72f1ae2887..7934ca74a1 100644 --- a/lib/indexedlog/src/rotate.rs +++ b/lib/indexedlog/src/rotate.rs @@ -142,6 +142,36 @@ impl LogRotate { }) } + /// Write in-memory entries to disk. + /// + /// Return the index of the latest [Log]. + pub fn flush(&mut self) -> io::Result { + let mut lock_file = open_dir(&self.dir)?; + let _lock = ScopedFileLock::new(&mut lock_file, true)?; + + let latest = read_latest(&self.dir)?; + if latest != self.latest { + // Latest changed. Make sure write to the correct Log. + unimplemented!(); + } + + // TODO: Scan directory and delete old logs. + let size = self.writable_log().flush()?; + + if size >= self.open_options.max_bytes_per_log { + // Create a new Log. Bump latest. + let next = self.latest.wrapping_add(1); + let log = create_empty_log(&self.dir, &self.open_options, next)?; + if self.logs.len() as u64 >= self.open_options.max_log_count { + self.logs.pop(); + } + self.logs.insert(0, log); + self.latest = next; + } + + Ok(self.latest) + } + // `writable_log` is public for advanced use-cases. Ex. if a Log is used to // store file contents chained with deltas. It might be desirable to make // sure the delta parent is within a same log. That can be done by using @@ -291,4 +321,39 @@ mod tests { assert_eq!(lookup(&rotate, b"ac"), Vec::<&[u8]>::new()); } + #[test] + fn test_simple_rotate() { + let dir = TempDir::new("log").unwrap(); + let mut rotate = OpenOptions::new() + .create(true) + .max_bytes_per_log(100) + .max_log_count(2) + .index_defs(vec![IndexDef::new("first-byte", |_| { + vec![IndexOutput::Reference(0..1)] + })]) + .open(&dir) + .unwrap(); + + // No rotate. + rotate.append(b"a").unwrap(); + assert_eq!(rotate.flush().unwrap(), 0); + rotate.append(b"a").unwrap(); + assert_eq!(rotate.flush().unwrap(), 0); + + // Trigger rotate. "a" is still accessible. + rotate.append(vec![b'b'; 100]).unwrap(); + assert_eq!(rotate.flush().unwrap(), 1); + assert_eq!(lookup(&rotate, b"a").len(), 2); + + // Trigger rotate again. Only new entries are accessible. + rotate.append(vec![b'c'; 50]).unwrap(); + assert_eq!(rotate.flush().unwrap(), 1); + rotate.append(vec![b'd'; 50]).unwrap(); + assert_eq!(rotate.flush().unwrap(), 2); + assert_eq!(lookup(&rotate, b"a").len(), 0); + assert_eq!(lookup(&rotate, b"b").len(), 0); + assert_eq!(lookup(&rotate, b"c").len(), 1); + assert_eq!(lookup(&rotate, b"d").len(), 1); + } + }