indexedlog: support in-memory-only RotateLog

Summary: This makes it possible to create RotateLog without writing to filesystem.

Reviewed By: xavierd

Differential Revision: D16044037

fbshipit-source-id: 451adaf8c2f97f0bd46bb2e5f42a33047d2ddfb0
This commit is contained in:
Jun Wu 2019-07-18 15:04:30 -07:00 committed by Facebook Github Bot
parent 322d6990fd
commit 6b52d9c55c

View File

@ -19,7 +19,7 @@ use std::path::{Path, PathBuf};
///
/// Writes go to the active [`Log`]. Reads scan through all [`Log`]s.
pub struct RotateLog {
dir: PathBuf,
dir: Option<PathBuf>,
open_options: OpenOptions,
logs: Vec<Log>,
latest: u8,
@ -131,17 +131,28 @@ impl OpenOptions {
let latest = read_latest(dir)?;
(latest, read_logs(dir, &self, latest)?)
} else {
(0, vec![create_empty_log(dir, &self, 0)?])
(0, vec![create_empty_log(Some(dir), &self, 0)?])
}
};
Ok(RotateLog {
dir: dir.into(),
dir: Some(dir.into()),
open_options: self,
logs,
latest,
})
}
/// Open an-empty [`RotateLog`] in memory. The [`RotateLog`] cannot [`sync`].
pub fn create_in_memory(self) -> Fallible<RotateLog> {
let logs = vec![create_empty_log(None, &self, 0)?];
Ok(RotateLog {
dir: None,
open_options: self,
logs,
latest: 0,
})
}
}
impl RotateLog {
@ -190,24 +201,29 @@ impl RotateLog {
/// Read latest data from disk. Write in-memory entries to disk.
///
/// Return the index of the latest [`Log`].
///
/// For in-memory [`RotateLog`], this function always returns 0.
pub fn sync(&mut self) -> Fallible<u8> {
let latest = read_latest(&self.dir)?;
if self.dir.is_none() {
return Ok(0);
}
let latest = read_latest(self.dir.as_ref().unwrap())?;
if self.writable_log().iter_dirty().nth(0).is_none() {
// Read-only path, no need to take directory lock.
if latest != self.latest {
// Latest changed. Re-load and write to the real latest Log.
// PERF(minor): This can be smarter by avoiding reloading some logs.
self.logs = read_logs(&self.dir, &self.open_options, latest)?;
self.logs = read_logs(self.dir.as_ref().unwrap(), &self.open_options, latest)?;
self.latest = latest;
}
} else {
// Read-write path. Take the directory lock.
let mut lock_file = open_dir(&self.dir)?;
let mut lock_file = open_dir(self.dir.as_ref().unwrap())?;
let _lock = ScopedFileLock::new(&mut lock_file, true)?;
// Re-read latest, since it might have changed after taking the lock.
let latest = read_latest(&self.dir)?;
let latest = read_latest(self.dir.as_ref().unwrap())?;
if latest != self.latest {
// Latest changed. Re-load and write to the real latest Log.
//
@ -216,7 +232,8 @@ impl RotateLog {
// non-latest logs automatically.
// PERF(minor): This can be smarter by avoiding reloading some logs.
let mut new_logs = read_logs(&self.dir, &self.open_options, latest)?;
let mut new_logs =
read_logs(self.dir.as_ref().unwrap(), &self.open_options, latest)?;
if let Some(filter) = self.open_options.log_open_options.flush_filter {
let log = &mut new_logs[0];
for entry in self.writable_log().iter_dirty() {
@ -257,7 +274,7 @@ impl RotateLog {
fn rotate_assume_locked(&mut self) -> Fallible<()> {
// Create a new Log. Bump latest.
let next = self.latest.wrapping_add(1);
let log = create_empty_log(&self.dir, &self.open_options, next)?;
let log = create_empty_log(Some(self.dir.as_ref().unwrap()), &self.open_options, next)?;
if self.logs.len() >= self.open_options.max_log_count as usize {
self.logs.pop();
}
@ -273,7 +290,7 @@ impl RotateLog {
}
fn try_remove_old_logs(&self) {
if let Ok(read_dir) = self.dir.read_dir() {
if let Ok(read_dir) = self.dir.as_ref().unwrap().read_dir() {
let latest = self.latest;
let earliest = latest.wrapping_sub(self.open_options.max_log_count - 1);
for entry in read_dir {
@ -333,12 +350,16 @@ impl RotateLowLevelExt for RotateLog {
}
fn force_rotate(&mut self) -> Fallible<()> {
if self.dir.is_none() {
// rotate does not make sense for an in-memory RotateLog.
return Ok(());
}
// Read-write path. Take the directory lock.
let mut lock_file = open_dir(&self.dir)?;
let mut lock_file = open_dir(self.dir.as_ref().unwrap())?;
let _lock = ScopedFileLock::new(&mut lock_file, true)?;
self.latest = read_latest(&self.dir)?;
self.latest = read_latest(self.dir.as_ref().unwrap())?;
self.rotate_assume_locked()?;
self.logs = read_logs(&self.dir, &self.open_options, self.latest)?;
self.logs = read_logs(self.dir.as_ref().unwrap(), &self.open_options, self.latest)?;
Ok(())
}
}
@ -389,17 +410,22 @@ impl<'a> Iterator for RotateLogLookupIter<'a> {
}
}
fn create_empty_log(dir: &Path, open_options: &OpenOptions, latest: u8) -> Fallible<Log> {
let latest_path = dir.join(LATEST_FILE);
let latest_str = format!("{}", latest);
let log_path = dir.join(&latest_str);
let log = open_options
.log_open_options
.clone()
.create(true)
.open(log_path)?;
atomic_write(&latest_path, latest_str.as_bytes())?;
Ok(log)
fn create_empty_log(dir: Option<&Path>, open_options: &OpenOptions, latest: u8) -> Fallible<Log> {
Ok(match dir {
Some(dir) => {
let latest_path = dir.join(LATEST_FILE);
let latest_str = format!("{}", latest);
let log_path = dir.join(&latest_str);
let log = open_options
.log_open_options
.clone()
.create(true)
.open(log_path)?;
atomic_write(&latest_path, latest_str.as_bytes())?;
log
}
None => open_options.log_open_options.clone().create_in_memory()?,
})
}
fn read_latest(dir: &Path) -> io::Result<u8> {
@ -471,21 +497,24 @@ mod tests {
#[test]
fn test_trivial_append_lookup() {
let dir = tempdir().unwrap();
let mut rotate = OpenOptions::new()
let opts = OpenOptions::new()
.create(true)
.index_defs(vec![IndexDef::new("two-bytes", |_| {
vec![IndexOutput::Reference(0..2)]
})])
.open(&dir)
.unwrap();
})]);
rotate.append(b"aaa").unwrap();
rotate.append(b"abbb").unwrap();
rotate.append(b"abc").unwrap();
let rotate = opts.clone().open(&dir).unwrap();
let rotate_mem = opts.clone().create_in_memory().unwrap();
assert_eq!(lookup(&rotate, b"aa"), vec![b"aaa"]);
assert_eq!(lookup(&rotate, b"ab"), vec![&b"abc"[..], b"abbb"]);
assert_eq!(lookup(&rotate, b"ac"), Vec::<&[u8]>::new());
for rotate in &mut [rotate, rotate_mem] {
rotate.append(b"aaa").unwrap();
rotate.append(b"abbb").unwrap();
rotate.append(b"abc").unwrap();
assert_eq!(lookup(&rotate, b"aa"), vec![b"aaa"]);
assert_eq!(lookup(&rotate, b"ab"), vec![&b"abc"[..], b"abbb"]);
assert_eq!(lookup(&rotate, b"ac"), Vec::<&[u8]>::new());
}
}
#[test]