use std::{fmt::Debug, ops::Add}; use sum_tree::{Dimension, Edit, Item, KeyedItem, SumTree, Summary}; pub trait Operation: Clone + Debug { fn lamport_timestamp(&self) -> clock::Lamport; } #[derive(Clone, Debug)] struct OperationItem(T); #[derive(Clone, Debug)] pub struct OperationQueue(SumTree>); #[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)] pub struct OperationKey(clock::Lamport); #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] pub struct OperationSummary { pub key: OperationKey, pub len: usize, } impl OperationKey { pub fn new(timestamp: clock::Lamport) -> Self { Self(timestamp) } } impl Default for OperationQueue { fn default() -> Self { OperationQueue::new() } } impl OperationQueue { pub fn new() -> Self { OperationQueue(SumTree::default()) } pub fn len(&self) -> usize { self.0.summary().len } pub fn is_empty(&self) -> bool { self.len() == 0 } pub fn insert(&mut self, mut ops: Vec) { ops.sort_by_key(|op| op.lamport_timestamp()); ops.dedup_by_key(|op| op.lamport_timestamp()); self.0.edit( ops.into_iter() .map(|op| Edit::Insert(OperationItem(op))) .collect(), &(), ); } pub fn drain(&mut self) -> Self { let clone = self.clone(); self.0 = SumTree::default(); clone } pub fn iter(&self) -> impl Iterator { self.0.iter().map(|i| &i.0) } } impl Summary for OperationSummary { type Context = (); fn zero(_cx: &()) -> Self { Default::default() } fn add_summary(&mut self, other: &Self, _: &()) { assert!(self.key < other.key); self.key = other.key; self.len += other.len; } } impl<'a> Add<&'a Self> for OperationSummary { type Output = Self; fn add(self, other: &Self) -> Self { assert!(self.key < other.key); OperationSummary { key: other.key, len: self.len + other.len, } } } impl<'a> Dimension<'a, OperationSummary> for OperationKey { fn zero(_cx: &()) -> Self { Default::default() } fn add_summary(&mut self, summary: &OperationSummary, _: &()) { assert!(*self <= summary.key); *self = summary.key; } } impl Item for OperationItem { type Summary = OperationSummary; fn summary(&self, _cx: &()) -> Self::Summary { OperationSummary { key: OperationKey::new(self.0.lamport_timestamp()), len: 1, } } } impl KeyedItem for OperationItem { type Key = OperationKey; fn key(&self) -> Self::Key { OperationKey::new(self.0.lamport_timestamp()) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_len() { let mut clock = clock::Lamport::new(0); let mut queue = OperationQueue::new(); assert_eq!(queue.len(), 0); queue.insert(vec![ TestOperation(clock.tick()), TestOperation(clock.tick()), ]); assert_eq!(queue.len(), 2); queue.insert(vec![TestOperation(clock.tick())]); assert_eq!(queue.len(), 3); drop(queue.drain()); assert_eq!(queue.len(), 0); queue.insert(vec![TestOperation(clock.tick())]); assert_eq!(queue.len(), 1); } #[derive(Clone, Debug, Eq, PartialEq)] struct TestOperation(clock::Lamport); impl Operation for TestOperation { fn lamport_timestamp(&self) -> clock::Lamport { self.0 } } }