indexedlog: add MultiLog

Summary: The MultiLog holds multiple Logs and can atomically sync them.

Reviewed By: DurhamG

Differential Revision: D19432659

fbshipit-source-id: 6ac7dc6f74468f985c6a6b0c419e888722a80037
This commit is contained in:
Jun Wu 2020-01-17 21:47:32 -08:00 committed by Facebook Github Bot
parent 5aa872599c
commit 907aadcdd7
4 changed files with 531 additions and 3 deletions

View File

@ -25,6 +25,7 @@ mod errors;
pub mod index;
pub mod lock;
pub mod log;
pub mod multi;
mod repair;
pub mod rotate;
pub mod utils;

View File

@ -25,15 +25,26 @@ pub struct LogMetadata {
/// Used to detect non-append-only changes.
/// Conceptually similar to "create time".
pub(crate) epoch: u64,
/// Once set. Indicate this LogMetadata shouldn't be read.
pub(crate) poisoned: Option<&'static str>,
}
impl LogMetadata {
const HEADER: &'static [u8] = b"meta\0";
const POISONED_HEADER: &'static [u8] = b"pois\0";
/// Read metadata from a reader.
pub fn read<R: Read>(reader: &mut R) -> io::Result<Self> {
let mut header = vec![0; Self::HEADER.len()];
reader.read_exact(&mut header)?;
if header == Self::POISONED_HEADER {
let message_len: usize = reader.read_vlq()?;
let mut message_bytes = vec![0u8; message_len];
reader.read_exact(&mut message_bytes[..])?;
let msg = String::from_utf8_lossy(&message_bytes);
return Err(io::Error::new(io::ErrorKind::AddrNotAvailable, msg));
}
if header != Self::HEADER {
let msg = "invalid metadata header";
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
@ -74,11 +85,19 @@ impl LogMetadata {
primary_len,
indexes,
epoch,
poisoned: None,
})
}
/// Write metadata to a writer.
pub fn write<W: Write>(&self, writer: &mut W) -> io::Result<()> {
if let Some(poisoned) = self.poisoned {
writer.write_all(Self::POISONED_HEADER)?;
writer.write_vlq(poisoned.as_bytes().len())?;
writer.write_all(poisoned.as_bytes())?;
return Ok(());
}
let mut buf = Vec::new();
buf.write_vlq(self.primary_len)?;
buf.write_vlq(self.indexes.len())?;
@ -123,6 +142,17 @@ impl LogMetadata {
primary_len: len,
indexes: BTreeMap::new(),
epoch: utils::epoch(),
poisoned: None,
}
}
/// Create a new poisoned LogMetadata.
pub(crate) fn new_poisoned(message: &'static str) -> Self {
Self {
primary_len: 0,
indexes: BTreeMap::new(),
epoch: 0,
poisoned: Some(message),
}
}
}
@ -136,7 +166,7 @@ mod tests {
quickcheck! {
fn test_roundtrip_meta(primary_len: u64, indexes: BTreeMap<String, u64>, epoch: u64) -> bool {
let mut buf = Vec::new();
let meta = LogMetadata { primary_len, indexes, epoch };
let meta = LogMetadata { primary_len, indexes, epoch, poisoned: None };
meta.write(&mut buf).expect("write");
let mut cur = Cursor::new(buf);
let meta_read = LogMetadata::read(&mut cur).expect("read");
@ -145,7 +175,7 @@ mod tests {
fn test_roundtrip_meta_file(primary_len: u64, indexes: BTreeMap<String, u64>, epoch: u64) -> bool {
let dir = tempdir().unwrap();
let meta = LogMetadata { primary_len, indexes, epoch };
let meta = LogMetadata { primary_len, indexes, epoch, poisoned: None };
let path = dir.path().join("meta");
meta.write_file(&path, false).expect("write_file");
let meta_read = LogMetadata::read_file(&path).expect("read_file");

View File

@ -1100,7 +1100,23 @@ impl Log {
///
/// The caller should ensure the directory exists and take a lock on it to
/// avoid filesystem races.
fn load_or_create_meta(path: &GenericPath, create: bool) -> crate::Result<LogMetadata> {
pub(crate) fn load_or_create_meta(
path: &GenericPath,
create: bool,
) -> crate::Result<LogMetadata> {
Self::load_or_create_meta_internal(path, create, false)
}
/// Used by MultiLog. Write a dummy "meta" file that prevents accidental reading.
pub(crate) fn load_or_create_shared_meta(path: &GenericPath) -> crate::Result<LogMetadata> {
Self::load_or_create_meta_internal(path, true, true)
}
pub(crate) fn load_or_create_meta_internal(
path: &GenericPath,
create: bool,
is_shared: bool,
) -> crate::Result<LogMetadata> {
match path.read_meta() {
Err(err) => {
if err.io_error_kind() == io::ErrorKind::NotFound && create {
@ -1117,6 +1133,25 @@ impl Log {
let meta = LogMetadata::new_with_primary_len(PRIMARY_START_OFFSET);
// An empty meta file is easy to recreate. No need to use fsync.
path.write_meta(&meta, false)?;
if is_shared {
// If meta is intended to be shared, write a poisoned one to the
// filesystem to prevent loading. But return the clean one.
// The filesystem layout looks like:
// - multilog/this-log/meta # poisoned
// - multilog/multimeta # the right one to use
let poisoned_meta = LogMetadata::new_poisoned(
"This Log is managed by MultiLog. Direct access is forbidden!",
);
if let GenericPath::SharedMeta { .. } = path {
// This path being poisoned should be the path being wrapped in
// a `GenericPath::SharedMeta`, not the shared path itself.
panic!("bug: GenericPath::SharedMeta shouldn't be used here.");
}
path.write_meta(&poisoned_meta, false)?;
} else {
// An empty meta file is easy to recreate. No need to use fsync.
path.write_meta(&meta, false)?;
}
Ok(meta)
} else {
Err(err)

View File

@ -0,0 +1,462 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
//! Atomic `sync` support for multiple [`Log`]s.
use crate::errors::{IoResultExt, ResultExt};
use crate::lock::ScopedDirLock;
use crate::log::{self, GenericPath, LogMetadata};
use crate::utils;
use std::collections::BTreeMap;
use std::fs;
use std::io::{self, Read};
use std::mem;
use std::ops;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use vlqencoding::{VLQDecode, VLQEncode};
/// Options used to configure how a [`MultiLog`] is opened.
#[derive(Clone, Default)]
pub struct OpenOptions {
/// Name (subdir) of the Log and its OpenOptions.
name_open_options: Vec<(&'static str, log::OpenOptions)>,
}
/// A [`MultiLog`] contains multiple [`Log`]s with a centric metadata file.
///
/// Metadata is "frozen" and changes to the metadata on the filesystem are not
/// visible to Logs until [`MultiLog::lock`] gets called. The only way to write
/// the centric metadata back to the filesystem is [`MultiLog::write_meta`].
/// Note: [`MultiLog::sync`] calls the above functions and is another way to
/// exchange data with the filesystem.
///
/// [`Log`]s will be accessible via indexing. For example, `multilog[0]`
/// accesses the first [`Log`]. [`Log`]s can also be moved out of this
/// struct by [`MultiLog::detach_logs`].
///
/// [`MultiLog`] makes sure the data consistency on disk but not always
/// in memory. In case [`MultiLog::write_meta`] is not called or is not
/// successful, but [`Log::sync`] was called. The data in [`Log`] might
/// be rewritten by other processes, breaking the [`Log`]!
pub struct MultiLog {
/// Directory containing all the Logs.
/// Used to write metadata.
path: PathBuf,
/// Combined metadata from logs.
multimeta: MultiMeta,
/// Logs loaded by MultiLog.
logs: Vec<log::Log>,
}
#[derive(Default)]
pub struct MultiMeta {
metas: BTreeMap<String, Arc<Mutex<LogMetadata>>>,
}
impl OpenOptions {
/// Create [`OpenOptions`] from names and OpenOptions of [`Log`].
pub fn from_name_opts(name_opts: Vec<(&'static str, log::OpenOptions)>) -> Self {
// Sanity check.
for (name, _) in &name_opts {
if name == &"multimeta" {
panic!("MultiLog: cannot use 'multimeta' as Log name");
} else if name.contains('/') || name.contains('\\') {
panic!("MultiLog: cannot use '/' or '\\' in Log name");
}
}
Self {
name_open_options: name_opts,
}
}
/// Open [`MultiLog`] at the given directory.
///
/// This ignores the `create` option per [`Log`]. [`Log`] and their metadata
/// are created on demand.
pub fn open(&self, path: &Path) -> crate::Result<MultiLog> {
let result: crate::Result<_> = (|| {
let meta_path = multi_meta_path(path);
let mut multimeta = MultiMeta::default();
match multimeta.read_file(&meta_path) {
Err(e) => match e.kind() {
io::ErrorKind::NotFound => (), // not fatal.
_ => return Err(e).context(&meta_path, "when opening MultiLog"),
},
Ok(_) => (),
};
let locked = if self
.name_open_options
.iter()
.all(|(name, _)| multimeta.metas.contains_key(AsRef::<str>::as_ref(name)))
{
// All keys exist. No need to write files on disk.
None
} else {
// Need to create some Logs and rewrite the multimeta.
utils::mkdir_p(path)?;
Some(ScopedDirLock::new(path)?)
};
let mut logs = Vec::with_capacity(self.name_open_options.len());
for (name, opts) in self.name_open_options.iter() {
let fspath = path.join(name);
let name_ref: &str = name.as_ref();
if !multimeta.metas.contains_key(name_ref) {
// Create a new Log if it does not exist in MultiMeta.
utils::mkdir_p(&fspath)?;
let meta = log::Log::load_or_create_shared_meta(&fspath.as_path().into())?;
let meta = Arc::new(Mutex::new(meta));
multimeta.metas.insert(name.to_string(), meta);
}
let path = GenericPath::SharedMeta {
path: Box::new(fspath.as_path().into()),
meta: multimeta.metas[name_ref].clone(),
};
let log = opts.open(path)?;
logs.push(log);
}
if locked.is_some() {
multimeta.write_file(&meta_path)?;
}
Ok(MultiLog {
path: path.to_path_buf(),
logs,
multimeta,
})
})();
result.context("in multi::OpenOptions::open")
}
}
impl MultiLog {
/// Lock the MultiLog directory for writing.
///
/// After taking the lock, metadata will be reloaded so [`Log`]s can see the
/// latest metadata on disk and do `sync()` accordingly.
///
/// Once everything is done, use [`MultiLog::write_meta`] to persistent the
/// changed metadata.
pub fn lock(&mut self) -> crate::Result<LockGuard> {
let result: crate::Result<_> = (|| {
let lock = LockGuard(ScopedDirLock::new(&self.path)?);
self.read_meta(&lock)?;
Ok(lock)
})();
result.context("in MultiLog::lock")
}
/// Write meta to disk so they become visible to other processes.
///
/// A lock must be provided to prove that there is no race condition.
/// The lock is usually obtained via `lock()`.
pub fn write_meta(&mut self, lock: &LockGuard) -> crate::Result<()> {
if lock.0.path() != &self.path {
let msg = format!(
"Invalid lock used to write_meta (Lock path = {:?}, MultiLog path = {:?})",
lock.0.path(),
&self.path
);
return Err(crate::Error::programming(msg));
}
let result: crate::Result<_> = (|| {
let meta_path = multi_meta_path(&self.path);
self.multimeta.write_file(&meta_path)?;
Ok(())
})();
result.context("in MultiLog::write_meta")
}
/// Reload meta from disk so they become visible to Logs.
///
/// This is called automatically by `lock` so it's not part of the
/// public interface.
fn read_meta(&mut self, lock: &LockGuard) -> crate::Result<()> {
debug_assert_eq!(lock.0.path(), &self.path);
let meta_path = multi_meta_path(&self.path);
match self.multimeta.read_file(&meta_path) {
Err(err) => {
if err.kind() == io::ErrorKind::NotFound {
Ok(())
} else {
Err(err).context(&meta_path, "reloading meta")
}
}
Ok(()) => Ok(()),
}
}
/// Detach [`Log`]s from this [`MultiLog`].
///
/// Once detached, [`Log`]s will no longer be available via indexing
/// like `multilog[0]`.
///
/// This is useful for places where [`Log`]s are owned by other
/// structured, instead of being accessed via [`MultiLog`].
pub fn detach_logs(&mut self) -> Vec<log::Log> {
let mut result = Vec::new();
mem::swap(&mut result, &mut self.logs);
result
}
/// Sync all [`Log`]s. This is an atomic operation.
///
/// This function simply calls [`MultiLog::lock`], [`Log::sync`] and
/// [`MultiLog::write_meta`]. For more advanced use-cases, call those
/// functions manually.
///
/// (This does not seem very useful practically. So it is private.)
pub fn sync(&mut self) -> crate::Result<()> {
let lock = self.lock()?;
for log in self.logs.iter_mut() {
log.sync()?;
}
self.write_meta(&lock)?;
Ok(())
}
}
/// Structure proving a lock was taken for [`MultiLog`].
pub struct LockGuard(ScopedDirLock);
impl ops::Index<usize> for MultiLog {
type Output = log::Log;
fn index(&self, index: usize) -> &Self::Output {
&self.logs[index]
}
}
impl ops::IndexMut<usize> for MultiLog {
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
&mut self.logs[index]
}
}
fn multi_meta_path(dir: &Path) -> PathBuf {
dir.join("multimeta")
}
impl MultiMeta {
/// Update self with content from a reader.
/// Metadata with existing keys are mutated in-place.
fn read(&mut self, mut reader: impl io::Read) -> io::Result<()> {
let version: usize = reader.read_vlq()?;
if version != 0 {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("MultiMeta version is unsupported: {}", version),
));
}
let count: usize = reader.read_vlq()?;
for _ in 0..count {
let name_len = reader.read_vlq()?;
let mut name_buf = vec![0; name_len];
reader.read_exact(&mut name_buf)?;
let name = String::from_utf8(name_buf)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Log name is not utf-8"))?;
let meta = LogMetadata::read(&mut reader)?;
self.metas
.entry(name.to_string())
.and_modify(|e| {
let mut e = e.lock().unwrap();
let truncated = e.primary_len > meta.primary_len && e.epoch == meta.epoch;
*e = meta.clone();
// Force a different epoch for truncation.
if truncated {
e.epoch = e.epoch.wrapping_add(1);
}
})
.or_insert_with(|| Arc::new(Mutex::new(meta.clone())));
}
Ok(())
}
/// Write metadata to a writer.
fn write(&self, mut writer: impl io::Write) -> io::Result<()> {
let version = 0;
writer.write_vlq(version)?;
writer.write_vlq(self.metas.len())?;
for (name, meta) in self.metas.iter() {
writer.write_vlq(name.len())?;
writer.write_all(name.as_bytes())?;
meta.lock().unwrap().write(&mut writer)?;
}
Ok(())
}
/// Update self with metadata from a file.
pub fn read_file<P: AsRef<Path>>(&mut self, path: P) -> io::Result<()> {
let mut file = fs::OpenOptions::new().read(true).open(path)?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
self.read(&buf[..])
}
/// Atomically write metadata to a file.
pub fn write_file<P: AsRef<Path>>(&self, path: P) -> crate::Result<()> {
let mut buf = Vec::new();
self.write(&mut buf).infallible()?;
utils::atomic_write(path, &buf, false)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use quickcheck::quickcheck;
/// Create a simple MultiLog containing Log 'a' and 'b' for testing.
fn simple_multilog(path: &Path) -> MultiLog {
let mopts = OpenOptions::from_name_opts(vec![
("a", log::OpenOptions::new()),
("b", log::OpenOptions::new()),
]);
mopts.open(path).unwrap()
}
#[test]
fn test_individual_log_cannot_be_opened_directly() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
let mut mlog = simple_multilog(path);
assert_eq!(
log::OpenOptions::new()
.open(path.join("a"))
.unwrap_err()
.to_string()
.lines()
.last()
.unwrap(),
"- This Log is managed by MultiLog. Direct access is forbidden!"
);
log::OpenOptions::new().open(path.join("b")).unwrap_err();
// It's still an error after individual log flush.
mlog[0].append(b"1").unwrap();
mlog[0].flush().unwrap();
log::OpenOptions::new().open(path.join("a")).unwrap_err();
}
#[test]
fn test_individual_log_flushes_are_invisible() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
let mut mlog = simple_multilog(path);
// This is not a proper use of Log::sync, since
// it's not protected by a lock. But it demostrates
// the properties.
mlog[0].append(b"2").unwrap();
mlog[0].sync().unwrap();
mlog[0].append(b"3").unwrap();
mlog[0].append(b"4").unwrap();
mlog[1].append(b"y").unwrap();
mlog[1].sync().unwrap();
mlog[1].append(b"z").unwrap();
mlog[1].sync().unwrap();
assert_eq!(mlog[0].iter().count(), 3);
assert_eq!(mlog[1].iter().count(), 2);
// mlog changes are not written via MultiLog::write_meta.
// Therefore invisible to mlog2.
let mlog2 = simple_multilog(path);
assert_eq!(mlog2[0].iter().count(), 0);
assert_eq!(mlog2[1].iter().count(), 0);
// mlog.sync reloads multimeta. "Flushed" contents are dropped.
// But in-memory content is kept and written.
mlog.sync().unwrap();
assert_eq!(mlog[0].iter().count(), 2);
assert_eq!(mlog[1].iter().count(), 0);
let mlog2 = simple_multilog(path);
assert_eq!(mlog2[0].iter().count(), 2);
assert_eq!(mlog2[1].iter().count(), 0);
}
#[test]
fn test_detach_logs() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
let mut mlog = simple_multilog(path);
let mut logs = mlog.detach_logs();
logs[0].append(b"0").unwrap();
logs[1].append(b"1").unwrap();
// Although logs are detached. MultiLog can still update multimeta.
let lock = mlog.lock().unwrap();
logs[0].sync().unwrap();
logs[1].sync().unwrap();
mlog.write_meta(&lock).unwrap();
let mlog2 = simple_multilog(path);
assert_eq!(mlog2[0].iter().count(), 1);
assert_eq!(mlog2[1].iter().count(), 1);
}
#[test]
fn test_wrong_locks_cause_errors() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
let mut mlog1 = simple_multilog(&path.join("1"));
let mut mlog2 = simple_multilog(&path.join("2"));
let lock1 = mlog1.lock().unwrap();
let lock2 = mlog2.lock().unwrap();
assert!(mlog1.write_meta(&lock2).is_err());
assert!(mlog2.write_meta(&lock1).is_err());
}
quickcheck! {
fn test_roundtrip_multimeta(name_len_list: Vec<(String, u64)>) -> bool {
let metas = name_len_list
.into_iter()
.map(|(name, len)| {
let meta = LogMetadata::new_with_primary_len(len);
(name, Arc::new(Mutex::new(meta)))
})
.collect();
let meta = MultiMeta { metas, ..Default::default() };
let mut buf = Vec::new();
meta.write(&mut buf).unwrap();
let mut meta2 = MultiMeta::default();
meta2.read(&buf[..]).unwrap();
let mut buf2 = Vec::new();
meta2.write(&mut buf2).unwrap();
assert_eq!(buf2, buf);
buf2 == buf
}
fn test_roundtrip_multilog(list_a: Vec<Vec<u8>>, list_b: Vec<Vec<u8>>) -> bool {
let dir = tempfile::tempdir().unwrap();
let mut mlog = simple_multilog(dir.path());
for a in &list_a {
mlog[0].append(a).unwrap();
}
for b in &list_b {
mlog[1].append(b).unwrap();
}
mlog.sync().unwrap();
let mlog_read = simple_multilog(dir.path());
let list_a_read: Vec<Vec<u8>> = mlog_read[0].iter().map(|e| e.unwrap().to_vec()).collect();
let list_b_read: Vec<Vec<u8>> = mlog_read[1].iter().map(|e| e.unwrap().to_vec()).collect();
list_a == list_a_read && list_b == list_b_read
}
}
}