revisionstore: implement flush for MultiplexStore

Summary:
As we're migrating toward using flush instead of close, the MultiplexStore no
longer requires a "&mut T", we can directly get the ownership of the object and
flush can be properly implemented.

Reviewed By: kulshrax

Differential Revision: D15416704

fbshipit-source-id: 3cf6c9bf829297e07853484913e6260331ab2b3b
This commit is contained in:
Xavier Deguillard 2019-05-21 14:14:06 -07:00 committed by Facebook Github Bot
parent 0efa8b1d33
commit b42a85b427

View File

@ -9,26 +9,39 @@ use types::{Key, NodeInfo};
use crate::datastore::{Delta, Metadata, MutableDeltaStore};
use crate::historystore::MutableHistoryStore;
/// A `MultiplexStore` is a store that will duplicate all the writes to all the
/// stores that it is made of.
pub struct MultiplexStore<'a, T: ?Sized> {
stores: Vec<&'a mut T>,
/// A `MultiplexDeltaStore` is a store that will duplicate all the writes to all the
/// delta stores that it is made of.
pub struct MultiplexDeltaStore<'a> {
stores: Vec<Box<dyn MutableDeltaStore + 'a>>,
}
pub type MultiplexDeltaStore<'a, T> = MultiplexStore<'a, T>;
pub type MultiplexHistoryStore<'a, T> = MultiplexStore<'a, T>;
/// A `MultiplexHistoryStore` is a store that will duplicate all the writes to all the
/// history stores that it is made of.
pub struct MultiplexHistoryStore<'a> {
stores: Vec<Box<dyn MutableHistoryStore + 'a>>,
}
impl<'a, T: ?Sized> MultiplexStore<'a, T> {
impl<'a> MultiplexDeltaStore<'a> {
pub fn new() -> Self {
Self { stores: Vec::new() }
}
pub fn add_store(&mut self, store: &'a mut T) {
pub fn add_store(&mut self, store: Box<dyn MutableDeltaStore + 'a>) {
self.stores.push(store)
}
}
impl<'a, T: MutableDeltaStore + ?Sized> MutableDeltaStore for MultiplexDeltaStore<'a, T> {
impl<'a> MultiplexHistoryStore<'a> {
pub fn new() -> Self {
Self { stores: Vec::new() }
}
pub fn add_store(&mut self, store: Box<dyn MutableHistoryStore + 'a>) {
self.stores.push(Box::new(store))
}
}
impl<'a> MutableDeltaStore for MultiplexDeltaStore<'a> {
/// Write the `Delta` and `Metadata` to all the stores
fn add(&mut self, delta: &Delta, metadata: &Metadata) -> Fallible<()> {
for store in self.stores.iter_mut() {
@ -38,6 +51,14 @@ impl<'a, T: MutableDeltaStore + ?Sized> MutableDeltaStore for MultiplexDeltaStor
Ok(())
}
fn flush(&mut self) -> Fallible<Option<PathBuf>> {
for store in self.stores.iter_mut() {
store.flush()?;
}
Ok(None)
}
fn close(self) -> Fallible<Option<PathBuf>> {
// close() cannot be implemented as the concrete types of the stores aren't known
// statically. For now, the user of this MultiplexDeltaStore would have to manually close
@ -46,7 +67,7 @@ impl<'a, T: MutableDeltaStore + ?Sized> MutableDeltaStore for MultiplexDeltaStor
}
}
impl<'a, T: MutableHistoryStore + ?Sized> MutableHistoryStore for MultiplexHistoryStore<'a, T> {
impl<'a> MutableHistoryStore for MultiplexHistoryStore<'a> {
fn add(&mut self, key: &Key, info: &NodeInfo) -> Fallible<()> {
for store in self.stores.iter_mut() {
store.add(key, info)?;
@ -55,6 +76,14 @@ impl<'a, T: MutableHistoryStore + ?Sized> MutableHistoryStore for MultiplexHisto
Ok(())
}
fn flush(&mut self) -> Fallible<Option<PathBuf>> {
for store in self.stores.iter_mut() {
store.flush()?;
}
Ok(None)
}
fn close(self) -> Fallible<Option<PathBuf>> {
// close() cannot be implemented as the concrete types of the stores aren't known
// statically. For now, the user of this MultiplexHistoryStore would have to manually close
@ -85,7 +114,7 @@ mod tests {
let tempdir = TempDir::new()?;
let mut log = IndexedLogDataStore::new(&tempdir)?;
let mut multiplex = MultiplexDeltaStore::new();
multiplex.add_store(&mut log);
multiplex.add_store(Box::new(&mut log));
let delta = Delta {
data: Bytes::from(&[1, 2, 3, 4][..]),
@ -95,6 +124,7 @@ mod tests {
let metadata = Default::default();
multiplex.add(&delta, &metadata)?;
drop(multiplex);
let read_delta = log.get_delta(&delta.key)?;
assert_eq!(delta, read_delta);
log.close()?;
@ -106,9 +136,9 @@ mod tests {
let tempdir = TempDir::new()?;
let mut log = IndexedLogDataStore::new(&tempdir)?;
let mut pack = MutableDataPack::new(&tempdir, DataPackVersion::One)?;
let mut multiplex: MultiplexDeltaStore<dyn MutableDeltaStore> = MultiplexDeltaStore::new();
multiplex.add_store(&mut log);
multiplex.add_store(&mut pack);
let mut multiplex = MultiplexDeltaStore::new();
multiplex.add_store(Box::new(&mut log));
multiplex.add_store(Box::new(&mut pack));
let delta = Delta {
data: Bytes::from(&[1, 2, 3, 4][..]),
@ -118,6 +148,7 @@ mod tests {
let metadata = Default::default();
multiplex.add(&delta, &metadata)?;
drop(multiplex);
let read_delta = log.get_delta(&delta.key)?;
assert_eq!(delta, read_delta);
@ -134,8 +165,8 @@ mod tests {
fn test_history_add_static() -> Fallible<()> {
let tempdir = TempDir::new()?;
let mut pack = MutableHistoryPack::new(&tempdir, HistoryPackVersion::One)?;
let mut multiplex = MultiplexDeltaStore::new();
multiplex.add_store(&mut pack);
let mut multiplex = MultiplexHistoryStore::new();
multiplex.add_store(Box::new(&mut pack));
let k = key("a", "1");
let nodeinfo = NodeInfo {
@ -144,6 +175,8 @@ mod tests {
};
multiplex.add(&k, &nodeinfo)?;
drop(multiplex);
let read_node = pack.get_node_info(&k)?;
assert_eq!(nodeinfo, read_node);
@ -156,10 +189,9 @@ mod tests {
let tempdir = TempDir::new()?;
let mut pack1 = MutableHistoryPack::new(&tempdir, HistoryPackVersion::One)?;
let mut pack2 = MutableHistoryPack::new(&tempdir, HistoryPackVersion::One)?;
let mut multiplex: MultiplexHistoryStore<dyn MutableHistoryStore> =
MultiplexDeltaStore::new();
multiplex.add_store(&mut pack1);
multiplex.add_store(&mut pack2);
let mut multiplex = MultiplexHistoryStore::new();
multiplex.add_store(Box::new(&mut pack1));
multiplex.add_store(Box::new(&mut pack2));
let k = key("a", "1");
let nodeinfo = NodeInfo {
@ -168,6 +200,8 @@ mod tests {
};
multiplex.add(&k, &nodeinfo)?;
drop(multiplex);
let read_node = pack1.get_node_info(&k)?;
assert_eq!(nodeinfo, read_node);