indexedlog: implement flush for the main index

Summary:
The flush method will write buffered data to disk.

A mistake in Root entry serialization is fixed - it needs to translate dirty
offsets to non-dirty ones.

Reviewed By: DurhamG

Differential Revision: D7223729

fbshipit-source-id: baeaab27627d6cfb7c5798d3a39be4d2b8811e5f
This commit is contained in:
Jun Wu 2018-03-22 19:30:51 -07:00 committed by Saurabh Singh
parent 8f5c35c8d2
commit 3859d00394

View File

@ -41,7 +41,7 @@
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{self, Write};
use std::io::{self, Seek, SeekFrom, Write};
use std::io::ErrorKind::InvalidData;
use std::path::Path;
@ -262,6 +262,43 @@ impl Root {
}
}
/// Represent an offset to a dirty entry of a variant type.
/// The lowest 3 bits are used to represnet the actual type.
enum DirtyOffset {
Radix(usize),
Leaf(usize),
Link(usize),
Key(usize),
}
impl From<u64> for DirtyOffset {
fn from(x: u64) -> DirtyOffset {
debug_assert!(x >= DIRTY_OFFSET);
let x = x - DIRTY_OFFSET;
let typeint = (x & 7) as u8;
let index = (x >> 3) as usize;
match typeint {
TYPE_RADIX => DirtyOffset::Radix(index),
TYPE_LEAF => DirtyOffset::Leaf(index),
TYPE_LINK => DirtyOffset::Link(index),
TYPE_KEY => DirtyOffset::Key(index),
_ => panic!("bug: unexpected dirty offset"),
}
}
}
impl Into<u64> for DirtyOffset {
fn into(self) -> u64 {
let v = match self {
DirtyOffset::Radix(x) => (TYPE_RADIX as u64 + ((x as u64) << 3)),
DirtyOffset::Leaf(x) => (TYPE_LEAF as u64 + ((x as u64) << 3)),
DirtyOffset::Link(x) => (TYPE_LINK as u64 + ((x as u64) << 3)),
DirtyOffset::Key(x) => (TYPE_KEY as u64 + ((x as u64) << 3)),
};
v + DIRTY_OFFSET
}
}
//// Main Index
pub struct Index {
@ -321,7 +358,8 @@ impl Index {
// Automatically locate the root entry
if len == 0 {
// Empty file. Create root radix entry as an dirty entry
(vec![Radix::default()], Root { radix_offset: 1 })
let radix_offset = DirtyOffset::Radix(0).into();
(vec![Radix::default()], Root { radix_offset })
} else {
// Load root entry from the end of file.
(vec![], Root::read_from_end(&mmap, len)?)
@ -342,6 +380,93 @@ impl Index {
dirty_keys: vec![],
})
}
/// Flush dirty parts to disk.
///
/// Return 0 if nothing needs to be written. Otherwise return the
/// new offset to the root entry.
///
/// Return `PermissionDenied` if the file is read-only.
pub fn flush(&mut self) -> io::Result<u64> {
if self.read_only {
return Err(io::ErrorKind::PermissionDenied.into());
}
let mut root_offset = 0;
if self.root.radix_offset < DIRTY_OFFSET {
// Nothing changed
return Ok(root_offset);
}
// Critical section: need write lock
{
let estimated_dirty_bytes = self.dirty_links.len() * 50;
let estimated_dirty_offsets = self.dirty_links.len() + self.dirty_keys.len()
+ self.dirty_leafs.len()
+ self.dirty_radixes.len();
let mut lock = ScopedFileLock::new(&mut self.file, true)?;
let len = lock.as_mut().seek(SeekFrom::End(0))?;
let mut buf = Vec::with_capacity(estimated_dirty_bytes);
let mut offset_map = HashMap::with_capacity(estimated_dirty_offsets);
// Write in the following order:
// header, keys, links, leafs, radixes, root.
// Latter entries depend on former entries.
if len == 0 {
buf.write_all(&[TYPE_HEAD])?;
}
for (i, entry) in self.dirty_keys.iter().enumerate() {
let offset = buf.len() as u64 + len;
entry.write_to(&mut buf, &offset_map)?;
offset_map.insert(DirtyOffset::Key(i).into(), offset);
}
for (i, entry) in self.dirty_links.iter().enumerate() {
let offset = buf.len() as u64 + len;
entry.write_to(&mut buf, &offset_map)?;
offset_map.insert(DirtyOffset::Link(i).into(), offset);
}
for (i, entry) in self.dirty_leafs.iter().enumerate() {
let offset = buf.len() as u64 + len;
entry.write_to(&mut buf, &offset_map)?;
offset_map.insert(DirtyOffset::Leaf(i).into(), offset);
}
for (i, entry) in self.dirty_radixes.iter().enumerate() {
let offset = buf.len() as u64 + len;
entry.write_to(&mut buf, &offset_map)?;
offset_map.insert(DirtyOffset::Radix(i).into(), offset);
}
root_offset = buf.len() as u64 + len;
self.root.write_to(&mut buf, &offset_map)?;
lock.as_mut().write_all(&buf)?;
// Remap and update root since length has changed
let (mmap, new_len) = mmap_readonly(lock.as_ref())?;
self.buf = mmap;
// Sanity check - the length should be expected. Otherwise, the lock
// is somehow ineffective.
if new_len != buf.len() as u64 + len {
return Err(io::ErrorKind::UnexpectedEof.into());
}
self.root = Root::read_from_end(&self.buf, new_len)?;
}
// Outside critical section
self.dirty_radixes.clear();
self.dirty_leafs.clear();
self.dirty_links.clear();
self.dirty_keys.clear();
Ok(root_offset)
}
}
#[cfg(test)]