dag: add ways to remove non-master ids in Dag and IdMap

Summary: First step to implement segment and idmap rewrites for non-master ids.

Reviewed By: sfilipco

Differential Revision: D18838996

fbshipit-source-id: 0380c0193d9078a2a2d7fde9c5f565e8dbc8e713
This commit is contained in:
Jun Wu 2019-12-20 17:41:14 -08:00 committed by Facebook Github Bot
parent 784da69352
commit 61e0f06417
3 changed files with 120 additions and 9 deletions

View File

@ -45,8 +45,8 @@ impl Group {
pub(crate) const COUNT: usize = Self::ALL.len();
// Reserved 1 bit so u64 can be casted to i64 safely.
const BITS: u32 = 65 - ((Self::COUNT - 1) as u64).leading_zeros();
// 1 byte for Group so it's easier to remove everything in a group.
pub(crate) const BITS: u32 = 8;
/// The first [`Id`] in this group.
pub fn min_id(self) -> Id {

View File

@ -26,6 +26,7 @@ pub struct IdMap {
log: log::Log,
path: PathBuf,
cached_next_free_ids: [AtomicU64; Group::COUNT],
pub(crate) need_rebuild_non_master: bool,
}
/// Guard to make sure [`IdMap`] on-disk writes are race-free.
@ -42,6 +43,11 @@ impl IdMap {
const INDEX_ID_TO_SLICE: usize = 0;
const INDEX_SLICE_TO_ID: usize = 1;
/// Magic bytes in `Log` that indicates "remove all non-master id->slice
/// mappings". A valid entry has at least 8 bytes so does not conflict
/// with this.
const MAGIC_CLEAR_NON_MASTER: &'static [u8] = b"CLRNM";
/// Create an [`IdMap`] backed by the given directory.
///
/// By default, only read-only operations are allowed. For writing
@ -50,9 +56,27 @@ impl IdMap {
let path = path.as_ref();
let log = log::OpenOptions::new()
.create(true)
.index("id", |_| vec![log::IndexOutput::Reference(0..8)])
.index("id", |data| {
assert!(Self::MAGIC_CLEAR_NON_MASTER.len() < 8);
assert!(Group::BITS == 8);
if data.len() < 8 {
if data == Self::MAGIC_CLEAR_NON_MASTER {
vec![log::IndexOutput::RemovePrefix(Box::new([
Group::NON_MASTER.0 as u8,
]))]
} else {
panic!("bug: invalid segment {:?}", &data);
}
} else {
vec![log::IndexOutput::Reference(0..8)]
}
})
.index("slice", |data| {
vec![log::IndexOutput::Reference(8..data.len() as u64)]
if data.len() >= 8 {
vec![log::IndexOutput::Reference(8..data.len() as u64)]
} else {
Vec::new()
}
})
.flush_filter(Some(|_, _| {
panic!("programming error: idmap changed by other process")
@ -63,6 +87,7 @@ impl IdMap {
log,
path,
cached_next_free_ids: Default::default(),
need_rebuild_non_master: false,
})
}
@ -132,7 +157,17 @@ impl IdMap {
match key {
Some(Ok(mut entry)) => {
ensure!(entry.len() >= 8, "index key should have 8 bytes at least");
Ok(Some(Id(entry.read_u64::<BigEndian>().unwrap())))
let id = Id(entry.read_u64::<BigEndian>().unwrap());
// Double check. Id should <= next_free_id. This is useful for 'remove_non_master'
// and re-insert ids.
// This is because 'remove_non_master' can only (efficiently) affect the id->slice
// index, not the slice->id index.
let group = id.group();
if group != Group::MASTER && self.next_free_id(group)? <= id {
Ok(None)
} else {
Ok(Some(id))
}
}
None => Ok(None),
Some(Err(err)) => Err(err.into()),
@ -193,6 +228,10 @@ impl IdMap {
slice
);
}
// Mark "need_rebuild_non_master". This prevents "sync" until
// the callsite uses "remove_non_master" to remove and re-insert
// non-master ids.
self.need_rebuild_non_master = true;
}
let mut data = Vec::with_capacity(8 + slice.len());
@ -381,9 +420,29 @@ impl IdMap {
}
}
// Remove data.
impl IdMap {
/// Mark non-master ids as "removed".
pub fn remove_non_master(&mut self) -> Result<()> {
self.log.append(IdMap::MAGIC_CLEAR_NON_MASTER)?;
self.need_rebuild_non_master = false;
// Invalidate the next free id cache.
self.cached_next_free_ids = Default::default();
ensure!(
self.next_free_id(Group::NON_MASTER)? == Group::NON_MASTER.min_id(),
"bug: remove_non_master did not take effect"
);
Ok(())
}
}
impl<'a> SyncableIdMap<'a> {
/// Write pending changes to disk.
pub fn sync(&mut self) -> Result<()> {
ensure!(
!self.need_rebuild_non_master,
"bug: cannot sync with re-assigned ids unresolved"
);
self.map.log.sync()?;
Ok(())
}
@ -489,6 +548,8 @@ mod tests {
assert_eq!(map.find_id_by_slice(b"jkl").unwrap().unwrap(), id);
assert_eq!(map.find_id_by_slice(b"jkl2").unwrap().unwrap().0, 15);
assert!(map.find_id_by_slice(b"jkl3").unwrap().is_none());
// HACK: allow sync with re-assigned ids.
map.need_rebuild_non_master = false;
map.sync().unwrap();
}

View File

@ -23,6 +23,7 @@ use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use fs2::FileExt;
use indexedlog::log;
use indexmap::set::IndexSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::{BTreeSet, BinaryHeap};
use std::fmt::{self, Debug, Formatter};
@ -80,22 +81,51 @@ impl Dag {
const INDEX_PARENT: usize = 1;
const KEY_LEVEL_HEAD_LEN: usize = Segment::OFFSET_DELTA - Segment::OFFSET_LEVEL;
/// Magic bytes in `Log` that indicates "remove all non-master segments".
/// A Segment entry has at least KEY_LEVEL_HEAD_LEN (9) bytes so it does
/// not conflict with this.
const MAGIC_CLEAR_NON_MASTER: &'static [u8] = b"CLRNM";
/// Open [`Dag`] at the given directory. Create it on demand.
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref();
let log = log::OpenOptions::new()
.create(true)
.index("level-head", |_| {
.index("level-head", |data| {
// (level, high)
vec![log::IndexOutput::Reference(
Segment::OFFSET_LEVEL as u64..Segment::OFFSET_DELTA as u64,
)]
assert!(Self::MAGIC_CLEAR_NON_MASTER.len() < Segment::OFFSET_DELTA);
assert!(Group::BITS == 8);
if data.len() < Segment::OFFSET_DELTA {
if data == Self::MAGIC_CLEAR_NON_MASTER {
let max_level = 255;
(0..=max_level)
.map(|level| {
log::IndexOutput::RemovePrefix(Box::new([
level,
Group::NON_MASTER.0 as u8,
]))
})
.collect()
} else {
panic!("bug: invalid segment {:?}", &data);
}
} else {
vec![log::IndexOutput::Reference(
Segment::OFFSET_LEVEL as u64..Segment::OFFSET_DELTA as u64,
)]
}
})
.index("parent", |data| {
// parent -> child for flat segments
let seg = Segment(data);
let mut result = Vec::new();
if seg.level().ok() == Some(0) {
// This should never pass since MAGIC_CLEAR_NON_MASTER[0] != 0.
assert_ne!(
data,
Self::MAGIC_CLEAR_NON_MASTER,
"bug: MAGIC_CLEAR_NON_MASTER conflicts with data"
);
if let Ok(parents) = seg.parents() {
for id in parents {
let mut bytes = Vec::with_capacity(8);
@ -565,6 +595,21 @@ impl Dag {
}
}
// Remove data.
impl Dag {
/// Mark non-master ids as "removed".
pub fn remove_non_master(&mut self) -> Result<()> {
self.log.append(Self::MAGIC_CLEAR_NON_MASTER)?;
for level in 0..=self.max_level {
ensure!(
self.next_free_id(level, Group::NON_MASTER)? == Group::NON_MASTER.min_id(),
"bug: remove_non_master did not take effect"
);
}
Ok(())
}
}
// User-facing DAG-related algorithms.
impl Dag {
/// Return a [`SpanSet`] that covers all ids stored in this [`Dag`].
@ -1271,6 +1316,11 @@ impl SyncableDag {
let _lock_file = self.lock_file; // Make sure lock is not dropped until here.
Ok(())
}
/// Mark non-master segments as "removed".
pub fn remove_non_master(&mut self) -> Result<()> {
self.dag.remove_non_master()
}
}
bitflags! {