revisionstore: Add an efficient pack writer.

Summary:
The packfiles are currently being written via an unbuffered file. This is
inefficient as every write to the file results results in a write(2) syscall.
By buffering these writes we can reduce the number of syscalls and thus
increase the throughput of pack writing operations.

Reviewed By: markbt

Differential Revision: D13603758

fbshipit-source-id: 649186a852d427a1473695b1d32cc9cd87a74a75
This commit is contained in:
Xavier Deguillard 2019-01-16 09:43:50 -08:00 committed by Facebook Github Bot
parent c6c99b4777
commit e6a60b68f3
2 changed files with 107 additions and 0 deletions

View File

@ -36,6 +36,7 @@ pub mod key;
pub mod loosefile;
pub mod mutabledatapack;
pub mod mutablehistorypack;
pub mod packwriter;
pub mod repack;
pub mod uniondatastore;
pub mod unionhistorystore;

View File

@ -0,0 +1,106 @@
// Copyright Facebook, Inc. 2019
use error::Result;
use std::cell::{RefCell, RefMut};
use std::fmt::Debug;
use std::io::{self, BufWriter, Write};
/// A `PackWriter` will buffers all the writes to `T` and count the total number of bytes written.
pub struct PackWriter<T: Write> {
data: RefCell<BufWriter<T>>,
bytes_written: u64,
}
impl<T: 'static + Write + Debug + Send + Sync> PackWriter<T> {
pub fn new(value: T) -> PackWriter<T> {
PackWriter {
data: RefCell::new(BufWriter::new(value)),
bytes_written: 0,
}
}
/// Flush the buffered data to the underlying writer.
pub fn flush_inner(&self) -> Result<()> {
let ret = self.data.try_borrow_mut()?.flush()?;
Ok(ret)
}
/// Return the number of bytes written. Note that due to the buffering nature of a
/// `PackWriter`, not all the data may have reached the underlying writer.
pub fn bytes_written(&self) -> u64 {
self.bytes_written
}
/// Return a mutable reference on the underlying writer. It's not recommended to write to it.
pub fn get_mut(&self) -> RefMut<T> {
let cell = self.data.borrow_mut();
RefMut::map(cell, |w| w.get_mut())
}
/// Flush the buffered data and return the underlying writer.
pub fn into_inner(self) -> Result<T> {
let ret = self.data.into_inner().into_inner()?;
Ok(ret)
}
}
impl<T: Write> Write for PackWriter<T> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let ret = self.data.get_mut().write(buf)?;
self.bytes_written += ret as u64;
Ok(ret)
}
fn flush(&mut self) -> io::Result<()> {
self.data.get_mut().flush()
}
}
#[cfg(test)]
mod tests {
use super::*;
use byteorder::{ReadBytesExt, WriteBytesExt};
use std::io::{Seek, SeekFrom};
use tempfile::tempfile;
#[test]
fn test_bytes_written() {
let mut file = PackWriter::new(tempfile().unwrap());
file.write_u8(10).unwrap();
assert_eq!(file.bytes_written(), 1);
}
#[test]
fn test_write() {
let mut file = PackWriter::new(tempfile().unwrap());
file.write_u8(10).unwrap();
// into_inner() flushes its internal buffer.
let mut inner = file.into_inner().unwrap();
inner.seek(SeekFrom::Start(0)).unwrap();
let data = inner.read_u8().unwrap();
assert_eq!(data, 10);
}
#[test]
fn test_read_without_drain() {
let mut file = PackWriter::new(tempfile().unwrap());
file.write_u8(10).unwrap();
let mut inner = file.get_mut();
inner.seek(SeekFrom::Start(0)).unwrap();
assert!(inner.read_u8().is_err());
}
#[test]
fn test_flush_inner() {
let mut file = PackWriter::new(tempfile().unwrap());
file.write_u8(10).unwrap();
file.flush_inner().unwrap();
let mut inner = file.get_mut();
inner.seek(SeekFrom::Start(0)).unwrap();
let data = inner.read_u8().unwrap();
assert_eq!(data, 10);
}
}