dag: preserve VerLink on open or persist

Summary:
On IdDag/IdMap open, try to reuse previously assigned VerLink.
On IdDag/IdMap persist/flush, cache the storage version with the current VerLink.

This should reduce "compatibility" issues when the Python world invalidates and
re-creates the changelog, as in `dbsh`:

  In [4]: v1=repo.changelog.dag.version()
  In [5]: repo.invalidatechangelog()
  In [6]: v2=repo.changelog.dag.version()
  In [7]: v1.cmp(v2)
  Out[7]: 0

For example, take the example in D47383634 summary, now

  r1.cmp(r2)

evaluates to `-1` instead of `None` meaning that r2 has
"append-only" changes on top of r1. Although we cannot
yet get rid of D47376881.

Reviewed By: zzl0

Differential Revision: D47488534

fbshipit-source-id: 33e9ec6294ce77f85b04d5d3af02d81a1a6c9282
This commit is contained in:
Jun Wu 2023-07-17 11:51:18 -07:00 committed by Facebook GitHub Bot
parent 069dfdebf5
commit 9b060a3c6e
5 changed files with 61 additions and 4 deletions

View File

@ -136,10 +136,11 @@ impl IdDag<InProcessStore> {
impl<Store: IdDagStore> IdDag<Store> {
pub(crate) fn open_from_store(store: Store) -> Result<Self> {
let version = store.verlink();
let dag = Self {
store,
new_seg_size: default_seg_size(),
version: VerLink::new(),
version,
};
Ok(dag)
}
@ -1854,7 +1855,7 @@ impl<Store: IdDagStore> IdDag<Store> {
}
}
impl<Store: Persist> Persist for IdDag<Store> {
impl<Store: Persist + IdDagStore> Persist for IdDag<Store> {
type Lock = <Store as Persist>::Lock;
fn lock(&mut self) -> Result<Self::Lock> {
@ -1866,7 +1867,9 @@ impl<Store: Persist> Persist for IdDag<Store> {
}
fn persist(&mut self, lock: &Self::Lock) -> Result<()> {
self.store.persist(lock)
self.store.persist(lock)?;
self.store.cache_verlink(&self.version);
Ok(())
}
}

View File

@ -17,6 +17,7 @@ use crate::spanset::Span;
use crate::IdSet;
use crate::Level;
use crate::Result;
use crate::VerLink;
mod in_process_store;
@ -351,6 +352,16 @@ pub trait IdDagStore: Send + Sync + 'static {
Ok(Some(merged))
}
/// Obtain the `VerLink` associated with this store.
fn verlink(&self) -> VerLink {
VerLink::new()
}
/// Associate the verlink with the storage version.
fn cache_verlink(&self, verlink: &VerLink) {
let _ = verlink;
}
}
/// Used by `resize_flat_segment` functions.

View File

@ -39,6 +39,7 @@ use crate::spanset::Span;
use crate::IdSet;
use crate::Level;
use crate::Result;
use crate::VerLink;
pub struct IndexedLogStore {
log: log::Log,
@ -373,6 +374,18 @@ impl IdDagStore for IndexedLogStore {
let iter = self.iter_flat_segments_with_parent_span(parent.into())?;
Ok(Box::new(iter.map(|item| item.map(|(_, seg)| seg))))
}
fn verlink(&self) -> VerLink {
let str_id = self.path.display().to_string();
let version = self.log.version();
VerLink::from_storage_version_or_new(&str_id, version)
}
fn cache_verlink(&self, verlink: &VerLink) {
let str_id = self.path.display().to_string();
let version = self.log.version();
verlink.associate_storage_version(str_id, version);
}
}
impl Persist for IndexedLogStore {

View File

@ -95,11 +95,12 @@ impl IdMap {
pub(crate) fn open_from_log(log: log::Log) -> Result<Self> {
let path = log.path().as_opt_path().unwrap().to_path_buf();
let map_id = format!("ilog:{}", path.display());
let map_version = VerLink::from_storage_version_or_new(&map_id, log.version());
Ok(Self {
log,
path,
map_id,
map_version: VerLink::new(),
map_version,
})
}
@ -484,6 +485,8 @@ impl Persist for IdMap {
fn persist(&mut self, _lock: &Self::Lock) -> Result<()> {
self.log.sync()?;
self.map_version
.associate_storage_version(self.map_id.clone(), self.log.version());
Ok(())
}
}

View File

@ -6,6 +6,8 @@
*/
use super::TestDag;
use crate::ops::DagAlgorithm;
use crate::ops::IdConvert;
#[tokio::test]
async fn test_strip_basic() {
@ -191,3 +193,28 @@ async fn test_reinsert_then_create_higher_level() {
Lv3: |N0 N1 N2| |N4 N5 N6 N7 N8 N9 N10 N11|"#
);
}
#[tokio::test]
async fn test_strip_update_version() {
let mut t = TestDag::draw_client("A..E").await;
let dag_version = t.dag.dag_version().clone();
let map_version = t.dag.map_version().clone();
// Version is not changed after reopen.
t.reopen();
assert_eq!(&dag_version, t.dag.dag_version());
assert_eq!(&map_version, t.dag.map_version());
// Version is compatible after appending.
t.drawdag("E-F", &[]);
t.flush("").await;
assert!(&dag_version < t.dag.dag_version());
assert!(&map_version < t.dag.map_version());
// Version is incompatible after strip.
t.strip("D").await;
assert_eq!(dag_version.partial_cmp(t.dag.dag_version()), None);
assert_eq!(map_version.partial_cmp(t.dag.map_version()), None);
}