indexedlog: add a utility function to read files crated by atomic_write

Summary:
This makes it possible to implement atomic_write differently (ex. use a
symlink).

Reviewed By: DurhamG

Differential Revision: D20153865

fbshipit-source-id: 07fa78c2f2dac696668f477c75f65cf70950b73f
This commit is contained in:
Jun Wu 2020-03-04 07:20:52 -08:00 committed by Facebook Github Bot
parent 1e33cd40b6
commit def12896db
6 changed files with 34 additions and 21 deletions

View File

@ -6,9 +6,8 @@
*/
use crate::errors::IoResultExt;
use crate::utils::{self, atomic_write, xxhash};
use crate::utils::{self, atomic_read, atomic_write, xxhash};
use std::collections::BTreeMap;
use std::fs;
use std::io::{self, Cursor, Read, Write};
use std::path::Path;
use vlqencoding::{VLQDecode, VLQEncode};
@ -35,7 +34,7 @@ impl LogMetadata {
const POISONED_HEADER: &'static [u8] = b"pois\0";
/// Read metadata from a reader.
pub fn read<R: Read>(reader: &mut R) -> io::Result<Self> {
pub fn read<R: Read>(mut reader: R) -> io::Result<Self> {
let mut header = vec![0; Self::HEADER.len()];
reader.read_exact(&mut header)?;
if header == Self::POISONED_HEADER {
@ -118,11 +117,8 @@ impl LogMetadata {
/// Read metadata from a file.
pub fn read_file<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let mut file = fs::OpenOptions::new().read(true).open(path)?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
let mut cur = Cursor::new(buf);
Self::read(&mut cur)
let buf = atomic_read(path.as_ref())?;
Self::read(&buf[..])
}
/// Atomically write metadata to a file.

View File

@ -20,7 +20,8 @@ pub enum GenericPath {
/// The [`Log`] is backed by a directory on filesystem.
Filesystem(PathBuf),
/// Metadata is shared. Other parts still use `path`.
/// Metadata is shared (between `Log` and `MultiLog`).
/// Other parts still use `path`.
SharedMeta {
path: Box<GenericPath>,
meta: Arc<Mutex<LogMetadata>>,

View File

@ -1159,7 +1159,7 @@ Rebuilt index "c""#
verify_len(200000);
// Corrupt meta
corrupt(META_FILE, 2);
crate::utils::atomic_write(&path.join(META_FILE), b"xxx", false).unwrap();
corrupt(PRIMARY_FILE, 1000);
verify_corrupted();
assert_eq!(
@ -1171,7 +1171,7 @@ Rebuilt index "c""#
);
verify_len(141);
truncate(META_FILE);
crate::utils::atomic_write(&path.join(META_FILE), b"yyy", false).unwrap();
verify_corrupted();
assert_eq!(
repair(),

View File

@ -12,8 +12,7 @@ use crate::lock::ScopedDirLock;
use crate::log::{self, GenericPath, LogMetadata};
use crate::utils;
use std::collections::BTreeMap;
use std::fs;
use std::io::{self, Read};
use std::io;
use std::mem;
use std::ops;
use std::path::{Path, PathBuf};
@ -296,9 +295,7 @@ impl MultiMeta {
/// Update self with metadata from a file.
pub fn read_file<P: AsRef<Path>>(&mut self, path: P) -> io::Result<()> {
let mut file = fs::OpenOptions::new().read(true).open(path)?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
let buf = utils::atomic_read(path.as_ref())?;
self.read(&buf[..])
}

View File

@ -274,7 +274,8 @@ impl OpenOptions {
| io::ErrorKind::UnexpectedEof => {
let latest = guess_latest(ids);
let content = format!("{}", latest);
fs::write(&latest_path, content).context(&latest_path, "cannot write")?;
let fsync = false;
utils::atomic_write(&latest_path, content, fsync)?;
message += &format!("Reset latest to {}\n", latest);
}
_ => return Err(err).context(&latest_path, "cannot read or parse"),
@ -710,7 +711,13 @@ fn read_latest(dir: &Path) -> crate::Result<u8> {
// Unlike read_latest, this function returns io::Result.
fn read_latest_raw(dir: &Path) -> io::Result<u8> {
let latest_path = dir.join(LATEST_FILE);
let content: String = fs::read_to_string(&latest_path)?;
let data = utils::atomic_read(&latest_path)?;
let content: String = String::from_utf8(data).map_err(|_e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("{:?}: failed to read as utf8 string", latest_path),
)
})?;
let id: u8 = content.parse().map_err(|_e| {
io::Error::new(
io::ErrorKind::InvalidData,
@ -1015,7 +1022,7 @@ mod tests {
let rotate2 = open_opts.open(&dir).unwrap();
// Break logs by truncating "meta".
fs::write(dir.path().join("0").join(log::META_FILE), "").unwrap();
utils::atomic_write(dir.path().join("0").join(log::META_FILE), "", false).unwrap();
// rotate1 can still use its existing indexes even if "a1"
// might have been deleted (on Unix).
@ -1323,7 +1330,7 @@ mod tests {
// Corrupt "latest".
let latest_path = dir.path().join(LATEST_FILE);
fs::write(&latest_path, "NaN").unwrap();
utils::atomic_write(&latest_path, "NaN", false).unwrap();
assert!(opts.open(&dir).is_err());
assert_eq!(
opts.repair(&dir).unwrap(),

View File

@ -8,7 +8,7 @@
use std::{
fs::{self, File},
hash::Hasher,
io::{self, Write},
io::{self, Read, Write},
path::Path,
sync::atomic::AtomicI64,
};
@ -166,6 +166,18 @@ pub fn atomic_write(
})
}
/// Read the entire file written by `atomic_write`.
///
/// The read itself is only atomic if the file was written by `atomic_write`.
/// This function handles format differences (symlink vs normal files)
/// transparently.
pub fn atomic_read(path: &Path) -> io::Result<Vec<u8>> {
let mut file = fs::OpenOptions::new().read(true).open(path)?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
Ok(buf)
}
/// `uid` and `gid` to `chown` for `mkdir_p`.
/// - x (x < 0): do not chown
/// - x (x >= 0): try to chown to `x`, do nothing if failed