indexedlog: move log::LogMetaData to a standalone module

Summary: The goal is to split log.rs to smaller modules so it's easier to reason about.

Reviewed By: DurhamG

Differential Revision: D19431785

fbshipit-source-id: b9a598b3018267ff6e43ef57dc807cb4e880467c
This commit is contained in:
Jun Wu 2020-01-17 03:52:22 -08:00 committed by Facebook Github Bot
parent dc762cc630
commit f6d3fd3d39
2 changed files with 149 additions and 118 deletions

View File

@ -0,0 +1,144 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use crate::errors::IoResultExt;
use crate::utils::{atomic_write, xxhash};
use std::collections::BTreeMap;
use std::fs;
use std::io::{self, Cursor, Read, Write};
use std::path::Path;
use vlqencoding::{VLQDecode, VLQEncode};
/// Metadata about index names, logical [`Log`] and [`Index`] file lengths.
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct LogMetadata {
/// Length of the primary log file.
pub(crate) primary_len: u64,
/// Lengths of index files. Name => Length.
pub(crate) indexes: BTreeMap<String, u64>,
/// Used to detect non-append-only changes.
/// Conceptually similar to "create time".
pub(crate) epoch: u64,
}
impl LogMetadata {
const HEADER: &'static [u8] = b"meta\0";
/// Read metadata from a reader.
pub fn read<R: Read>(reader: &mut R) -> io::Result<Self> {
let mut header = vec![0; Self::HEADER.len()];
reader.read_exact(&mut header)?;
if header != Self::HEADER {
let msg = "invalid metadata header";
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
}
let hash = reader.read_vlq()?;
let buf_len = reader.read_vlq()?;
let mut buf = vec![0; buf_len];
reader.read_exact(&mut buf)?;
if xxhash(&buf) != hash {
let msg = "metadata integrity check failed";
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
}
let mut reader = Cursor::new(buf);
let primary_len = reader.read_vlq()?;
let index_count: usize = reader.read_vlq()?;
let mut indexes = BTreeMap::new();
for _ in 0..index_count {
let name_len = reader.read_vlq()?;
let mut name = vec![0; name_len];
reader.read_exact(&mut name)?;
let name = String::from_utf8(name).map_err(|_e| {
let msg = "non-utf8 index name";
io::Error::new(io::ErrorKind::InvalidData, msg)
})?;
let len = reader.read_vlq()?;
indexes.insert(name, len);
}
// 'epoch' is optional - it does not exist in a previous serialization
// format. So not being able to read it (because EOF) is not fatal.
let epoch = reader.read_vlq().unwrap_or_default();
Ok(Self {
primary_len,
indexes,
epoch,
})
}
/// Write metadata to a writer.
pub fn write<W: Write>(&self, writer: &mut W) -> io::Result<()> {
let mut buf = Vec::new();
buf.write_vlq(self.primary_len)?;
buf.write_vlq(self.indexes.len())?;
for (name, len) in self.indexes.iter() {
let name = name.as_bytes();
buf.write_vlq(name.len())?;
buf.write_all(name)?;
buf.write_vlq(*len)?;
}
buf.write_vlq(self.epoch)?;
writer.write_all(Self::HEADER)?;
writer.write_vlq(xxhash(&buf))?;
writer.write_vlq(buf.len())?;
writer.write_all(&buf)?;
Ok(())
}
/// Read metadata from a file.
pub fn read_file<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let mut file = fs::OpenOptions::new().read(true).open(path)?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
let mut cur = Cursor::new(buf);
Self::read(&mut cur)
}
/// Atomically write metadata to a file.
pub fn write_file<P: AsRef<Path>>(&self, path: P, fsync: bool) -> crate::Result<()> {
let mut buf = Vec::new();
self.write(&mut buf).infallible()?;
atomic_write(path, &buf, fsync)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use quickcheck::quickcheck;
use tempfile::tempdir;
quickcheck! {
fn test_roundtrip_meta(primary_len: u64, indexes: BTreeMap<String, u64>, epoch: u64) -> bool {
let mut buf = Vec::new();
let meta = LogMetadata { primary_len, indexes, epoch };
meta.write(&mut buf).expect("write");
let mut cur = Cursor::new(buf);
let meta_read = LogMetadata::read(&mut cur).expect("read");
meta_read == meta
}
fn test_roundtrip_meta_file(primary_len: u64, indexes: BTreeMap<String, u64>, epoch: u64) -> bool {
let dir = tempdir().unwrap();
let meta = LogMetadata { primary_len, indexes, epoch };
let path = dir.path().join("meta");
meta.write_file(&path, false).expect("write_file");
let meta_read = LogMetadata::read_file(&path).expect("read_file");
meta_read == meta
}
}
}

View File

@ -45,13 +45,16 @@ use std::borrow::Cow;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fmt::{self, Debug, Formatter}; use std::fmt::{self, Debug, Formatter};
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::{self, BufRead, Cursor, Read, Seek, SeekFrom, Write}; use std::io::{self, BufRead, Read, Seek, SeekFrom, Write};
use std::ops::{Range, RangeBounds}; use std::ops::{Range, RangeBounds};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use tracing::debug_span; use tracing::debug_span;
use vlqencoding::{VLQDecode, VLQDecodeAt, VLQEncode}; use vlqencoding::{VLQDecodeAt, VLQEncode};
mod meta;
pub use self::meta::LogMetadata;
// Constants about file names // Constants about file names
pub(crate) const PRIMARY_FILE: &str = "log"; pub(crate) const PRIMARY_FILE: &str = "log";
@ -212,20 +215,6 @@ pub struct LogRangeIter<'a> {
index: &'a Index, index: &'a Index,
} }
/// Metadata about index names, logical [`Log`] and [`Index`] file lengths.
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct LogMetadata {
/// Length of the primary log file.
primary_len: u64,
/// Lengths of index files. Name => Length.
indexes: BTreeMap<String, u64>,
/// Used to detect non-append-only changes.
/// Conceptually similar to "create time".
epoch: u64,
}
/// Options used to configured how an [`Log`] is opened. /// Options used to configured how an [`Log`] is opened.
#[derive(Clone)] #[derive(Clone)]
pub struct OpenOptions { pub struct OpenOptions {
@ -2212,90 +2201,6 @@ impl<'a> DoubleEndedIterator for LogRangeIter<'a> {
} }
} }
impl LogMetadata {
const HEADER: &'static [u8] = b"meta\0";
pub fn read<R: Read>(reader: &mut R) -> io::Result<Self> {
let mut header = vec![0; Self::HEADER.len()];
reader.read_exact(&mut header)?;
if header != Self::HEADER {
let msg = "invalid metadata header";
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
}
let hash = reader.read_vlq()?;
let buf_len = reader.read_vlq()?;
let mut buf = vec![0; buf_len];
reader.read_exact(&mut buf)?;
if xxhash(&buf) != hash {
let msg = "metadata integrity check failed";
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
}
let mut reader = Cursor::new(buf);
let primary_len = reader.read_vlq()?;
let index_count: usize = reader.read_vlq()?;
let mut indexes = BTreeMap::new();
for _ in 0..index_count {
let name_len = reader.read_vlq()?;
let mut name = vec![0; name_len];
reader.read_exact(&mut name)?;
let name = String::from_utf8(name).map_err(|_e| {
let msg = "non-utf8 index name";
io::Error::new(io::ErrorKind::InvalidData, msg)
})?;
let len = reader.read_vlq()?;
indexes.insert(name, len);
}
// 'epoch' is optional - it does not exist in a previous serialization
// format. So not being able to read it (because EOF) is not fatal.
let epoch = reader.read_vlq().unwrap_or_default();
Ok(Self {
primary_len,
indexes,
epoch,
})
}
pub fn write<W: Write>(&self, writer: &mut W) -> io::Result<()> {
let mut buf = Vec::new();
buf.write_vlq(self.primary_len)?;
buf.write_vlq(self.indexes.len())?;
for (name, len) in self.indexes.iter() {
let name = name.as_bytes();
buf.write_vlq(name.len())?;
buf.write_all(name)?;
buf.write_vlq(*len)?;
}
buf.write_vlq(self.epoch)?;
writer.write_all(Self::HEADER)?;
writer.write_vlq(xxhash(&buf))?;
writer.write_vlq(buf.len())?;
writer.write_all(&buf)?;
Ok(())
}
pub fn read_file<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let mut file = fs::OpenOptions::new().read(true).open(path)?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
let mut cur = Cursor::new(buf);
Self::read(&mut cur)
}
pub fn write_file<P: AsRef<Path>>(&self, path: P, fsync: bool) -> crate::Result<()> {
let mut buf = Vec::new();
self.write(&mut buf).infallible()?;
atomic_write(path, &buf, fsync)?;
Ok(())
}
}
impl IndexOutput { impl IndexOutput {
fn into_cow(self, data: &[u8]) -> crate::Result<Cow<[u8]>> { fn into_cow(self, data: &[u8]) -> crate::Result<Cow<[u8]>> {
Ok(match self { Ok(match self {
@ -3731,24 +3636,6 @@ Rebuilt index "c""#
} }
quickcheck! { quickcheck! {
fn test_roundtrip_meta(primary_len: u64, indexes: BTreeMap<String, u64>, epoch: u64) -> bool {
let mut buf = Vec::new();
let meta = LogMetadata { primary_len, indexes, epoch };
meta.write(&mut buf).expect("write");
let mut cur = Cursor::new(buf);
let meta_read = LogMetadata::read(&mut cur).expect("read");
meta_read == meta
}
fn test_roundtrip_meta_file(primary_len: u64, indexes: BTreeMap<String, u64>, epoch: u64) -> bool {
let dir = tempdir().unwrap();
let meta = LogMetadata { primary_len, indexes, epoch };
let path = dir.path().join("meta");
meta.write_file(&path, false).expect("write_file");
let meta_read = LogMetadata::read_file(&path).expect("read_file");
meta_read == meta
}
fn test_roundtrip_entries(entries: Vec<(Vec<u8>, bool, bool)>) -> bool { fn test_roundtrip_entries(entries: Vec<(Vec<u8>, bool, bool)>) -> bool {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let mut log = Log::open(dir.path(), Vec::new()).unwrap(); let mut log = Log::open(dir.path(), Vec::new()).unwrap();