dag: make sure Dag has complete high-level segments when SyncableDag gets dropped

Summary:
Previously, `SyncableDag` and `Dag` can co-exist. Dropping SyncableDag involves
error handling and is not panic-free. If we want to make sure `Dag` has complete
high-level segments, then it would have been implemented in `SyncableDag::drop`,
making it more sensitive to panic.

Change the API so `SyncableDag` is independent from `Dag`, so `Dag` always
has complete segments, and changes to `SyncableDag` are invisible to `Dag`,
so `SyncableDag` cannot mess up existing `Dag` structures.

Reviewed By: sfilipco

Differential Revision: D17000969

fbshipit-source-id: 1ceed4ea335d3d64848b7430d48076846b90695d
This commit is contained in:
Jun Wu 2019-09-17 12:34:26 -07:00 committed by Facebook Github Bot
parent c28623ef7d
commit d10dab5342
4 changed files with 97 additions and 44 deletions

View File

@ -108,6 +108,7 @@ py_class!(class dagindex |py| {
Self::create_instance(py, RefCell::new(dag), RefCell::new(map), segment_size, max_segment_level)
}
/// Build segments on disk. This discards changes by `buildmem`.
def builddisk(&self, nodes: Vec<PyBytes>, parentfunc: PyObject) -> PyResult<Option<u8>> {
// Build indexes towards `node`. Save state on disk.
// Must be called from a clean state (ex. `build_mem` is not called).
@ -130,13 +131,15 @@ py_class!(class dagindex |py| {
let mut dag = self.dag(py).borrow_mut();
{
let mut dag = dag.prepare_filesystem_sync().map_pyerr::<exc::IOError>(py)?;
dag.build_segments_persistent(id, &get_parents).map_pyerr::<exc::IOError>(py)?;
dag.sync().map_pyerr::<exc::IOError>(py)?;
use std::ops::DerefMut;
let mut syncable = dag.prepare_filesystem_sync().map_pyerr::<exc::IOError>(py)?;
syncable.build_segments_persistent(id, &get_parents).map_pyerr::<exc::IOError>(py)?;
syncable.sync(std::iter::once(dag.deref_mut())).map_pyerr::<exc::IOError>(py)?;
}
Ok(None)
}
/// Build segments in memory. Note: This gets discarded by `builddisk`.
def buildmem(&self, nodes: Vec<PyBytes>, parentfunc: PyObject) -> PyResult<Option<u8>> {
// Build indexes towards `node`. Do not save state to disk.
if nodes.is_empty() {

View File

@ -44,12 +44,11 @@ fn main() {
// Write segments to filesystem.
let mut dag = Dag::open(&dag_dir.path()).unwrap();
{
let mut dag = dag.prepare_filesystem_sync().unwrap();
dag.build_segments_persistent(head_id, &parents_by_id)
.unwrap();
dag.sync().unwrap();
}
let mut syncable = dag.prepare_filesystem_sync().unwrap();
syncable
.build_segments_persistent(head_id, &parents_by_id)
.unwrap();
syncable.sync(std::iter::once(&mut dag)).unwrap();
let sample_two_ids: Vec<SpanSet> = (0..parents.len() as u64)
.step_by(10079)

View File

@ -44,12 +44,12 @@ fn main() {
built = true;
measure::Both::<measure::WallClock, String>::measure(|| {
let mut dag = Dag::open(&dag_dir.path()).unwrap();
dag.set_segment_size(segment_size);
let mut dag = dag.prepare_filesystem_sync().unwrap();
let segment_len = dag
dag.set_new_segment_size(segment_size);
let mut syncable = dag.prepare_filesystem_sync().unwrap();
let segment_len = syncable
.build_segments_persistent(head_id, &parents_by_id)
.unwrap();
dag.sync().unwrap();
syncable.sync(std::iter::once(&mut dag)).unwrap();
let log_len = dag_dir.path().join("log").metadata().unwrap().len();
format!("segments: {} log len: {}", segment_len, log_len)
})

View File

@ -6,6 +6,11 @@
//! # segment
//!
//! Segmented DAG. See [`Dag`] for the main structure.
//!
//! There are 2 flavors of DAG: [`Dag`] and [`SyncableDag`]. [`Dag`] loads
//! from the filesystem, is responsible for all kinds of queires, and can
//! have in-memory-only changes. [`SyncableDag`] is the only way to update
//! the filesystem state, and does not support queires.
use crate::spanset::Span;
use crate::spanset::SpanSet;
@ -45,8 +50,8 @@ pub struct Dag {
}
/// Guard to make sure [`Dag`] on-disk writes are race-free.
pub struct SyncableDag<'a> {
dag: &'a mut Dag,
pub struct SyncableDag {
dag: Dag,
lock_file: File,
}
@ -83,11 +88,7 @@ impl Dag {
)]
})
.open(path)?;
// The first byte of the largest key is the maximum level.
let max_level = match log.lookup_range(Self::INDEX_HEAD, ..)?.rev().nth(0) {
None => 0,
Some(key) => key?.0.get(0).cloned().unwrap_or(0),
};
let max_level = Self::max_level_from_log(&log)?;
let mut dag = Self {
log,
path: path.to_path_buf(),
@ -98,6 +99,15 @@ impl Dag {
Ok(dag)
}
fn max_level_from_log(log: &log::Log) -> Fallible<Level> {
// The first byte of the largest key is the maximum level.
let max_level = match log.lookup_range(Self::INDEX_HEAD, ..)?.rev().nth(0) {
None => 0,
Some(key) => key?.0.get(0).cloned().unwrap_or(0),
};
Ok(max_level)
}
/// Find segment by level and head.
pub(crate) fn find_segment_by_head(&self, head: Id, level: u8) -> Fallible<Option<Segment>> {
let key = Self::serialize_lookup_key(head, level);
@ -188,9 +198,7 @@ impl Dag {
/// actually writes changes to disk.
///
/// Block if another instance is taking the lock.
///
/// Pending in-memory writes will be dropped.
pub fn prepare_filesystem_sync(&mut self) -> Fallible<SyncableDag> {
pub fn prepare_filesystem_sync(&self) -> Fallible<SyncableDag> {
// Take a filesystem lock. The file name 'lock' is taken by indexedlog
// running on Windows, so we choose another file name here.
let lock_file = {
@ -205,12 +213,20 @@ impl Dag {
};
lock_file.lock_exclusive()?;
// Reload. So we get latest data.
self.log.clear_dirty()?;
self.log.sync()?;
// Clone. But drop in-memory data.
let mut log = self.log.clone(false)?;
// Read new entries from filesystem.
log.sync()?;
let max_level = Self::max_level_from_log(&log)?;
Ok(SyncableDag {
dag: self,
dag: Dag {
log,
path: self.path.clone(),
max_level,
new_seg_size: self.new_seg_size,
},
lock_file,
})
}
@ -489,6 +505,18 @@ impl Dag {
}
}
// Reload.
impl Dag {
/// Reload from the filesystem. Discard pending changes.
pub fn reload(&mut self) -> Fallible<()> {
self.log.clear_dirty()?;
self.log.sync()?;
self.max_level = Self::max_level_from_log(&self.log)?;
self.build_all_high_level_segments(false)?;
Ok(())
}
}
// Algorithms using SpanSet as output.
impl Dag {
/// Calculate all ancestors reachable from any id from the given set.
@ -903,9 +931,9 @@ impl Dag {
}
}
impl<'a> SyncableDag<'a> {
/// Make sure the [`Dag`] contains the given id (and all ids smaller than
/// `high`) by building up segments on demand.
impl SyncableDag {
/// Make sure the [`SyncableDag`] contains the given id (and all ids smaller
/// than `high`) by building up segments on demand.
///
/// This is similar to [`Dag::build_segments_volatile`]. However, the build
/// result is intended to be written to the filesystem. Therefore high-level
@ -920,26 +948,22 @@ impl<'a> SyncableDag<'a> {
Ok(count)
}
/// Write pending changes to disk.
/// Write pending changes to disk. Release the exclusive lock.
///
/// This method must be called if there are new entries inserted.
/// Otherwise [`SyncableDag`] will panic once it gets dropped.
pub fn sync(&mut self) -> Fallible<()> {
/// The newly written entries can be fetched by [`Dag::reload`].
///
/// To avoid races, [`Dag`]s in the `reload_dags` list will be
/// reloaded while [`SyncableDag`] still holds the lock.
pub fn sync<'a>(mut self, reload_dags: impl IntoIterator<Item = &'a mut Dag>) -> Fallible<()> {
self.dag.log.sync()?;
for dag in reload_dags {
dag.reload()?;
}
let _lock_file = self.lock_file; // Make sure lock is not dropped until here.
Ok(())
}
}
impl<'a> Drop for SyncableDag<'a> {
fn drop(&mut self) {
// TODO: handles `sync` failures gracefully.
assert!(
self.dag.log.iter_dirty().next().is_none(),
"programming error: sync must be called before dropping WritableIdMap"
);
}
}
bitflags! {
pub struct SegmentFlags: u8 {
/// This segment has roots (i.e. there is at least one id in
@ -1201,4 +1225,31 @@ mod tests {
assert_eq!(low_by_id(150, 0), 101);
assert_eq!(low_by_id(151, 0), -1);
}
fn get_parents(id: Id) -> Fallible<Vec<Id>> {
match id {
0 | 1 | 2 => Ok(Vec::new()),
_ => Ok(vec![id - 1, id / 2]),
}
}
#[test]
fn test_sync_reload() {
let dir = tempdir().unwrap();
let mut dag = Dag::open(dir.path()).unwrap();
assert_eq!(dag.next_free_id(0).unwrap(), 0);
let mut syncable = dag.prepare_filesystem_sync().unwrap();
syncable
.build_segments_persistent(1001, &get_parents)
.unwrap();
syncable.sync(std::iter::once(&mut dag)).unwrap();
assert_eq!(dag.max_level, 3);
assert_eq!(
dag.children(1000).unwrap().iter().collect::<Vec<Id>>(),
vec![1001]
);
}
}