From 03f0365d4d39134aed4f83381348f84be940f7a3 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 31 Aug 2023 15:52:16 -0700 Subject: [PATCH] Remove local timestamps from CRDT operations Use lamport timestamps for everything. --- crates/clock/src/clock.rs | 81 ++-------- crates/collab/src/db/queries/buffers.rs | 62 +++----- crates/collab/src/db/queries/users.rs | 2 - crates/language/src/buffer.rs | 10 +- crates/language/src/proto.rs | 118 +++++++------- crates/rpc/proto/zed.proto | 12 +- crates/text/Cargo.toml | 1 + crates/text/src/anchor.rs | 6 +- crates/text/src/text.rs | 196 +++++++++--------------- crates/text/src/undo_map.rs | 12 +- 10 files changed, 186 insertions(+), 314 deletions(-) diff --git a/crates/clock/src/clock.rs b/crates/clock/src/clock.rs index bc936fcb99..3cbf8d6594 100644 --- a/crates/clock/src/clock.rs +++ b/crates/clock/src/clock.rs @@ -2,70 +2,17 @@ use smallvec::SmallVec; use std::{ cmp::{self, Ordering}, fmt, iter, - ops::{Add, AddAssign}, }; pub type ReplicaId = u16; pub type Seq = u32; -#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd)] -pub struct Local { - pub replica_id: ReplicaId, - pub value: Seq, -} - #[derive(Clone, Copy, Default, Eq, Hash, PartialEq)] pub struct Lamport { pub replica_id: ReplicaId, pub value: Seq, } -impl Local { - pub const MIN: Self = Self { - replica_id: ReplicaId::MIN, - value: Seq::MIN, - }; - pub const MAX: Self = Self { - replica_id: ReplicaId::MAX, - value: Seq::MAX, - }; - - pub fn new(replica_id: ReplicaId) -> Self { - Self { - replica_id, - value: 1, - } - } - - pub fn tick(&mut self) -> Self { - let timestamp = *self; - self.value += 1; - timestamp - } - - pub fn observe(&mut self, timestamp: Self) { - if timestamp.replica_id == self.replica_id { - self.value = cmp::max(self.value, timestamp.value + 1); - } - } -} - -impl<'a> Add<&'a Self> for Local { - type Output = Local; - - fn add(self, other: &'a Self) -> Self::Output { - *cmp::max(&self, other) - } -} - -impl<'a> AddAssign<&'a Local> for Local { - fn add_assign(&mut self, other: &Self) { - if *self < *other { - *self = *other; - } - } -} - /// A vector clock #[derive(Clone, Default, Hash, Eq, PartialEq)] pub struct Global(SmallVec<[u32; 8]>); @@ -79,7 +26,7 @@ impl Global { self.0.get(replica_id as usize).copied().unwrap_or(0) as Seq } - pub fn observe(&mut self, timestamp: Local) { + pub fn observe(&mut self, timestamp: Lamport) { if timestamp.value > 0 { let new_len = timestamp.replica_id as usize + 1; if new_len > self.0.len() { @@ -126,7 +73,7 @@ impl Global { self.0.resize(new_len, 0); } - pub fn observed(&self, timestamp: Local) -> bool { + pub fn observed(&self, timestamp: Lamport) -> bool { self.get(timestamp.replica_id) >= timestamp.value } @@ -178,16 +125,16 @@ impl Global { false } - pub fn iter(&self) -> impl Iterator + '_ { - self.0.iter().enumerate().map(|(replica_id, seq)| Local { + pub fn iter(&self) -> impl Iterator + '_ { + self.0.iter().enumerate().map(|(replica_id, seq)| Lamport { replica_id: replica_id as ReplicaId, value: *seq, }) } } -impl FromIterator for Global { - fn from_iter>(locals: T) -> Self { +impl FromIterator for Global { + fn from_iter>(locals: T) -> Self { let mut result = Self::new(); for local in locals { result.observe(local); @@ -212,6 +159,16 @@ impl PartialOrd for Lamport { } impl Lamport { + pub const MIN: Self = Self { + replica_id: ReplicaId::MIN, + value: Seq::MIN, + }; + + pub const MAX: Self = Self { + replica_id: ReplicaId::MAX, + value: Seq::MAX, + }; + pub fn new(replica_id: ReplicaId) -> Self { Self { value: 1, @@ -230,12 +187,6 @@ impl Lamport { } } -impl fmt::Debug for Local { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Local {{{}: {}}}", self.replica_id, self.value) - } -} - impl fmt::Debug for Lamport { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value) diff --git a/crates/collab/src/db/queries/buffers.rs b/crates/collab/src/db/queries/buffers.rs index 354accc01a..f120aea1c5 100644 --- a/crates/collab/src/db/queries/buffers.rs +++ b/crates/collab/src/db/queries/buffers.rs @@ -1,6 +1,6 @@ use super::*; use prost::Message; -use text::{EditOperation, InsertionTimestamp, UndoOperation}; +use text::{EditOperation, UndoOperation}; impl Database { pub async fn join_channel_buffer( @@ -182,7 +182,6 @@ impl Database { .await } - #[cfg(debug_assertions)] pub async fn get_channel_buffer_collaborators( &self, channel_id: ChannelId, @@ -370,7 +369,6 @@ fn operation_to_storage( operation.replica_id, operation.lamport_timestamp, storage::Operation { - local_timestamp: operation.local_timestamp, version: version_to_storage(&operation.version), is_undo: false, edit_ranges: operation @@ -389,7 +387,6 @@ fn operation_to_storage( operation.replica_id, operation.lamport_timestamp, storage::Operation { - local_timestamp: operation.local_timestamp, version: version_to_storage(&operation.version), is_undo: true, edit_ranges: Vec::new(), @@ -399,7 +396,7 @@ fn operation_to_storage( .iter() .map(|entry| storage::UndoCount { replica_id: entry.replica_id, - local_timestamp: entry.local_timestamp, + lamport_timestamp: entry.lamport_timestamp, count: entry.count, }) .collect(), @@ -427,7 +424,6 @@ fn operation_from_storage( Ok(if operation.is_undo { proto::operation::Variant::Undo(proto::operation::Undo { replica_id: row.replica_id as u32, - local_timestamp: operation.local_timestamp as u32, lamport_timestamp: row.lamport_timestamp as u32, version, counts: operation @@ -435,7 +431,7 @@ fn operation_from_storage( .iter() .map(|entry| proto::UndoCount { replica_id: entry.replica_id, - local_timestamp: entry.local_timestamp, + lamport_timestamp: entry.lamport_timestamp, count: entry.count, }) .collect(), @@ -443,7 +439,6 @@ fn operation_from_storage( } else { proto::operation::Variant::Edit(proto::operation::Edit { replica_id: row.replica_id as u32, - local_timestamp: operation.local_timestamp as u32, lamport_timestamp: row.lamport_timestamp as u32, version, ranges: operation @@ -483,10 +478,9 @@ fn version_from_storage(version: &Vec) -> Vec Option { match operation.variant? { proto::operation::Variant::Edit(edit) => Some(text::Operation::Edit(EditOperation { - timestamp: InsertionTimestamp { + timestamp: clock::Lamport { replica_id: edit.replica_id as text::ReplicaId, - local: edit.local_timestamp, - lamport: edit.lamport_timestamp, + value: edit.lamport_timestamp, }, version: version_from_wire(&edit.version), ranges: edit @@ -498,32 +492,26 @@ pub fn operation_from_wire(operation: proto::Operation) -> Option Some(text::Operation::Undo { - lamport_timestamp: clock::Lamport { + proto::operation::Variant::Undo(undo) => Some(text::Operation::Undo(UndoOperation { + timestamp: clock::Lamport { replica_id: undo.replica_id as text::ReplicaId, value: undo.lamport_timestamp, }, - undo: UndoOperation { - id: clock::Local { - replica_id: undo.replica_id as text::ReplicaId, - value: undo.local_timestamp, - }, - version: version_from_wire(&undo.version), - counts: undo - .counts - .into_iter() - .map(|c| { - ( - clock::Local { - replica_id: c.replica_id as text::ReplicaId, - value: c.local_timestamp, - }, - c.count, - ) - }) - .collect(), - }, - }), + version: version_from_wire(&undo.version), + counts: undo + .counts + .into_iter() + .map(|c| { + ( + clock::Lamport { + replica_id: c.replica_id as text::ReplicaId, + value: c.lamport_timestamp, + }, + c.count, + ) + }) + .collect(), + })), _ => None, } } @@ -531,7 +519,7 @@ pub fn operation_from_wire(operation: proto::Operation) -> Option clock::Global { let mut version = clock::Global::new(); for entry in message { - version.observe(clock::Local { + version.observe(clock::Lamport { replica_id: entry.replica_id as text::ReplicaId, value: entry.timestamp, }); @@ -546,8 +534,6 @@ mod storage { #[derive(Message)] pub struct Operation { - #[prost(uint32, tag = "1")] - pub local_timestamp: u32, #[prost(message, repeated, tag = "2")] pub version: Vec, #[prost(bool, tag = "3")] @@ -581,7 +567,7 @@ mod storage { #[prost(uint32, tag = "1")] pub replica_id: u32, #[prost(uint32, tag = "2")] - pub local_timestamp: u32, + pub lamport_timestamp: u32, #[prost(uint32, tag = "3")] pub count: u32, } diff --git a/crates/collab/src/db/queries/users.rs b/crates/collab/src/db/queries/users.rs index bd7c3e9ffd..5cb1ef6ea3 100644 --- a/crates/collab/src/db/queries/users.rs +++ b/crates/collab/src/db/queries/users.rs @@ -241,7 +241,6 @@ impl Database { result } - #[cfg(debug_assertions)] pub async fn create_user_flag(&self, flag: &str) -> Result { self.transaction(|tx| async move { let flag = feature_flag::Entity::insert(feature_flag::ActiveModel { @@ -257,7 +256,6 @@ impl Database { .await } - #[cfg(debug_assertions)] pub async fn add_user_flag(&self, user: UserId, flag: FlagId) -> Result<()> { self.transaction(|tx| async move { user_feature::Entity::insert(user_feature::ActiveModel { diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index 1ded955cd7..38b2842c12 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -439,7 +439,7 @@ impl Buffer { operations.extend( text_operations .iter() - .filter(|(_, op)| !since.observed(op.local_timestamp())) + .filter(|(_, op)| !since.observed(op.timestamp())) .map(|(_, op)| proto::serialize_operation(&Operation::Buffer(op.clone()))), ); operations.sort_unstable_by_key(proto::lamport_timestamp_for_operation); @@ -1304,7 +1304,7 @@ impl Buffer { pub fn wait_for_edits( &mut self, - edit_ids: impl IntoIterator, + edit_ids: impl IntoIterator, ) -> impl Future> { self.text.wait_for_edits(edit_ids) } @@ -1362,7 +1362,7 @@ impl Buffer { } } - pub fn set_text(&mut self, text: T, cx: &mut ModelContext) -> Option + pub fn set_text(&mut self, text: T, cx: &mut ModelContext) -> Option where T: Into>, { @@ -1375,7 +1375,7 @@ impl Buffer { edits_iter: I, autoindent_mode: Option, cx: &mut ModelContext, - ) -> Option + ) -> Option where I: IntoIterator, T)>, S: ToOffset, @@ -1412,7 +1412,7 @@ impl Buffer { .and_then(|mode| self.language.as_ref().map(|_| (self.snapshot(), mode))); let edit_operation = self.text.edit(edits.iter().cloned()); - let edit_id = edit_operation.local_timestamp(); + let edit_id = edit_operation.timestamp(); if let Some((before_edit, mode)) = autoindent_request { let mut delta = 0isize; diff --git a/crates/language/src/proto.rs b/crates/language/src/proto.rs index c88abc08ac..80eb972f42 100644 --- a/crates/language/src/proto.rs +++ b/crates/language/src/proto.rs @@ -41,24 +41,22 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation { proto::operation::Variant::Edit(serialize_edit_operation(edit)) } - crate::Operation::Buffer(text::Operation::Undo { - undo, - lamport_timestamp, - }) => proto::operation::Variant::Undo(proto::operation::Undo { - replica_id: undo.id.replica_id as u32, - local_timestamp: undo.id.value, - lamport_timestamp: lamport_timestamp.value, - version: serialize_version(&undo.version), - counts: undo - .counts - .iter() - .map(|(edit_id, count)| proto::UndoCount { - replica_id: edit_id.replica_id as u32, - local_timestamp: edit_id.value, - count: *count, - }) - .collect(), - }), + crate::Operation::Buffer(text::Operation::Undo(undo)) => { + proto::operation::Variant::Undo(proto::operation::Undo { + replica_id: undo.timestamp.replica_id as u32, + lamport_timestamp: undo.timestamp.value, + version: serialize_version(&undo.version), + counts: undo + .counts + .iter() + .map(|(edit_id, count)| proto::UndoCount { + replica_id: edit_id.replica_id as u32, + lamport_timestamp: edit_id.value, + count: *count, + }) + .collect(), + }) + } crate::Operation::UpdateSelections { selections, @@ -101,8 +99,7 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation { pub fn serialize_edit_operation(operation: &EditOperation) -> proto::operation::Edit { proto::operation::Edit { replica_id: operation.timestamp.replica_id as u32, - local_timestamp: operation.timestamp.local, - lamport_timestamp: operation.timestamp.lamport, + lamport_timestamp: operation.timestamp.value, version: serialize_version(&operation.version), ranges: operation.ranges.iter().map(serialize_range).collect(), new_text: operation @@ -114,7 +111,7 @@ pub fn serialize_edit_operation(operation: &EditOperation) -> proto::operation:: } pub fn serialize_undo_map_entry( - (edit_id, counts): (&clock::Local, &[(clock::Local, u32)]), + (edit_id, counts): (&clock::Lamport, &[(clock::Lamport, u32)]), ) -> proto::UndoMapEntry { proto::UndoMapEntry { replica_id: edit_id.replica_id as u32, @@ -123,7 +120,7 @@ pub fn serialize_undo_map_entry( .iter() .map(|(undo_id, count)| proto::UndoCount { replica_id: undo_id.replica_id as u32, - local_timestamp: undo_id.value, + lamport_timestamp: undo_id.value, count: *count, }) .collect(), @@ -197,7 +194,7 @@ pub fn serialize_diagnostics<'a>( pub fn serialize_anchor(anchor: &Anchor) -> proto::Anchor { proto::Anchor { replica_id: anchor.timestamp.replica_id as u32, - local_timestamp: anchor.timestamp.value, + timestamp: anchor.timestamp.value, offset: anchor.offset as u64, bias: match anchor.bias { Bias::Left => proto::Bias::Left as i32, @@ -218,32 +215,26 @@ pub fn deserialize_operation(message: proto::Operation) -> Result { - crate::Operation::Buffer(text::Operation::Undo { - lamport_timestamp: clock::Lamport { + crate::Operation::Buffer(text::Operation::Undo(UndoOperation { + timestamp: clock::Lamport { replica_id: undo.replica_id as ReplicaId, value: undo.lamport_timestamp, }, - undo: UndoOperation { - id: clock::Local { - replica_id: undo.replica_id as ReplicaId, - value: undo.local_timestamp, - }, - version: deserialize_version(&undo.version), - counts: undo - .counts - .into_iter() - .map(|c| { - ( - clock::Local { - replica_id: c.replica_id as ReplicaId, - value: c.local_timestamp, - }, - c.count, - ) - }) - .collect(), - }, - }) + version: deserialize_version(&undo.version), + counts: undo + .counts + .into_iter() + .map(|c| { + ( + clock::Lamport { + replica_id: c.replica_id as ReplicaId, + value: c.lamport_timestamp, + }, + c.count, + ) + }) + .collect(), + })) } proto::operation::Variant::UpdateSelections(message) => { let selections = message @@ -298,10 +289,9 @@ pub fn deserialize_operation(message: proto::Operation) -> Result EditOperation { EditOperation { - timestamp: InsertionTimestamp { + timestamp: clock::Lamport { replica_id: edit.replica_id as ReplicaId, - local: edit.local_timestamp, - lamport: edit.lamport_timestamp, + value: edit.lamport_timestamp, }, version: deserialize_version(&edit.version), ranges: edit.ranges.into_iter().map(deserialize_range).collect(), @@ -311,9 +301,9 @@ pub fn deserialize_edit_operation(edit: proto::operation::Edit) -> EditOperation pub fn deserialize_undo_map_entry( entry: proto::UndoMapEntry, -) -> (clock::Local, Vec<(clock::Local, u32)>) { +) -> (clock::Lamport, Vec<(clock::Lamport, u32)>) { ( - clock::Local { + clock::Lamport { replica_id: entry.replica_id as u16, value: entry.local_timestamp, }, @@ -322,9 +312,9 @@ pub fn deserialize_undo_map_entry( .into_iter() .map(|undo_count| { ( - clock::Local { + clock::Lamport { replica_id: undo_count.replica_id as u16, - value: undo_count.local_timestamp, + value: undo_count.lamport_timestamp, }, undo_count.count, ) @@ -384,9 +374,9 @@ pub fn deserialize_diagnostics( pub fn deserialize_anchor(anchor: proto::Anchor) -> Option { Some(Anchor { - timestamp: clock::Local { + timestamp: clock::Lamport { replica_id: anchor.replica_id as ReplicaId, - value: anchor.local_timestamp, + value: anchor.timestamp, }, offset: anchor.offset as usize, bias: match proto::Bias::from_i32(anchor.bias)? { @@ -500,12 +490,12 @@ pub fn deserialize_code_action(action: proto::CodeAction) -> Result pub fn serialize_transaction(transaction: &Transaction) -> proto::Transaction { proto::Transaction { - id: Some(serialize_local_timestamp(transaction.id)), + id: Some(serialize_timestamp(transaction.id)), edit_ids: transaction .edit_ids .iter() .copied() - .map(serialize_local_timestamp) + .map(serialize_timestamp) .collect(), start: serialize_version(&transaction.start), } @@ -513,7 +503,7 @@ pub fn serialize_transaction(transaction: &Transaction) -> proto::Transaction { pub fn deserialize_transaction(transaction: proto::Transaction) -> Result { Ok(Transaction { - id: deserialize_local_timestamp( + id: deserialize_timestamp( transaction .id .ok_or_else(|| anyhow!("missing transaction id"))?, @@ -521,21 +511,21 @@ pub fn deserialize_transaction(transaction: proto::Transaction) -> Result proto::LocalTimestamp { - proto::LocalTimestamp { +pub fn serialize_timestamp(timestamp: clock::Lamport) -> proto::LamportTimestamp { + proto::LamportTimestamp { replica_id: timestamp.replica_id as u32, value: timestamp.value, } } -pub fn deserialize_local_timestamp(timestamp: proto::LocalTimestamp) -> clock::Local { - clock::Local { +pub fn deserialize_timestamp(timestamp: proto::LamportTimestamp) -> clock::Lamport { + clock::Lamport { replica_id: timestamp.replica_id as ReplicaId, value: timestamp.value, } @@ -555,7 +545,7 @@ pub fn deserialize_range(range: proto::Range) -> Range { pub fn deserialize_version(message: &[proto::VectorClockEntry]) -> clock::Global { let mut version = clock::Global::new(); for entry in message { - version.observe(clock::Local { + version.observe(clock::Lamport { replica_id: entry.replica_id as ReplicaId, value: entry.timestamp, }); diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 5e96ea043c..61c25f8f84 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -861,12 +861,12 @@ message ProjectTransaction { } message Transaction { - LocalTimestamp id = 1; - repeated LocalTimestamp edit_ids = 2; + LamportTimestamp id = 1; + repeated LamportTimestamp edit_ids = 2; repeated VectorClockEntry start = 3; } -message LocalTimestamp { +message LamportTimestamp { uint32 replica_id = 1; uint32 value = 2; } @@ -1280,7 +1280,7 @@ message Excerpt { message Anchor { uint32 replica_id = 1; - uint32 local_timestamp = 2; + uint32 timestamp = 2; uint64 offset = 3; Bias bias = 4; optional uint64 buffer_id = 5; @@ -1324,7 +1324,6 @@ message Operation { message Edit { uint32 replica_id = 1; - uint32 local_timestamp = 2; uint32 lamport_timestamp = 3; repeated VectorClockEntry version = 4; repeated Range ranges = 5; @@ -1333,7 +1332,6 @@ message Operation { message Undo { uint32 replica_id = 1; - uint32 local_timestamp = 2; uint32 lamport_timestamp = 3; repeated VectorClockEntry version = 4; repeated UndoCount counts = 5; @@ -1362,7 +1360,7 @@ message UndoMapEntry { message UndoCount { uint32 replica_id = 1; - uint32 local_timestamp = 2; + uint32 lamport_timestamp = 2; uint32 count = 3; } diff --git a/crates/text/Cargo.toml b/crates/text/Cargo.toml index 65e9b6fcec..d1bc6cc8f8 100644 --- a/crates/text/Cargo.toml +++ b/crates/text/Cargo.toml @@ -31,6 +31,7 @@ regex.workspace = true [dev-dependencies] collections = { path = "../collections", features = ["test-support"] } gpui = { path = "../gpui", features = ["test-support"] } +util = { path = "../util", features = ["test-support"] } ctor.workspace = true env_logger.workspace = true rand.workspace = true diff --git a/crates/text/src/anchor.rs b/crates/text/src/anchor.rs index b5f4fb24ec..084be0e336 100644 --- a/crates/text/src/anchor.rs +++ b/crates/text/src/anchor.rs @@ -8,7 +8,7 @@ use sum_tree::Bias; #[derive(Copy, Clone, Eq, PartialEq, Debug, Hash, Default)] pub struct Anchor { - pub timestamp: clock::Local, + pub timestamp: clock::Lamport, pub offset: usize, pub bias: Bias, pub buffer_id: Option, @@ -16,14 +16,14 @@ pub struct Anchor { impl Anchor { pub const MIN: Self = Self { - timestamp: clock::Local::MIN, + timestamp: clock::Lamport::MIN, offset: usize::MIN, bias: Bias::Left, buffer_id: None, }; pub const MAX: Self = Self { - timestamp: clock::Local::MAX, + timestamp: clock::Lamport::MAX, offset: usize::MAX, bias: Bias::Right, buffer_id: None, diff --git a/crates/text/src/text.rs b/crates/text/src/text.rs index 2fabb0f87f..c05ea1109c 100644 --- a/crates/text/src/text.rs +++ b/crates/text/src/text.rs @@ -46,18 +46,16 @@ lazy_static! { static ref LINE_SEPARATORS_REGEX: Regex = Regex::new("\r\n|\r|\u{2028}|\u{2029}").unwrap(); } -pub type TransactionId = clock::Local; +pub type TransactionId = clock::Lamport; pub struct Buffer { snapshot: BufferSnapshot, history: History, deferred_ops: OperationQueue, deferred_replicas: HashSet, - replica_id: ReplicaId, - local_clock: clock::Local, pub lamport_clock: clock::Lamport, subscriptions: Topic, - edit_id_resolvers: HashMap>>, + edit_id_resolvers: HashMap>>, wait_for_version_txs: Vec<(clock::Global, oneshot::Sender<()>)>, } @@ -85,7 +83,7 @@ pub struct HistoryEntry { #[derive(Clone, Debug)] pub struct Transaction { pub id: TransactionId, - pub edit_ids: Vec, + pub edit_ids: Vec, pub start: clock::Global, } @@ -97,8 +95,8 @@ impl HistoryEntry { struct History { base_text: Rope, - operations: TreeMap, - insertion_slices: HashMap>, + operations: TreeMap, + insertion_slices: HashMap>, undo_stack: Vec, redo_stack: Vec, transaction_depth: usize, @@ -107,7 +105,7 @@ struct History { #[derive(Clone, Debug)] struct InsertionSlice { - insertion_id: clock::Local, + insertion_id: clock::Lamport, range: Range, } @@ -129,18 +127,18 @@ impl History { } fn push(&mut self, op: Operation) { - self.operations.insert(op.local_timestamp(), op); + self.operations.insert(op.timestamp(), op); } fn start_transaction( &mut self, start: clock::Global, now: Instant, - local_clock: &mut clock::Local, + clock: &mut clock::Lamport, ) -> Option { self.transaction_depth += 1; if self.transaction_depth == 1 { - let id = local_clock.tick(); + let id = clock.tick(); self.undo_stack.push(HistoryEntry { transaction: Transaction { id, @@ -251,7 +249,7 @@ impl History { self.redo_stack.clear(); } - fn push_undo(&mut self, op_id: clock::Local) { + fn push_undo(&mut self, op_id: clock::Lamport) { assert_ne!(self.transaction_depth, 0); if let Some(Operation::Edit(_)) = self.operations.get(&op_id) { let last_transaction = self.undo_stack.last_mut().unwrap(); @@ -412,37 +410,14 @@ impl Edit<(D1, D2)> { } } -#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, PartialOrd, Ord)] -pub struct InsertionTimestamp { - pub replica_id: ReplicaId, - pub local: clock::Seq, - pub lamport: clock::Seq, -} - -impl InsertionTimestamp { - pub fn local(&self) -> clock::Local { - clock::Local { - replica_id: self.replica_id, - value: self.local, - } - } - - pub fn lamport(&self) -> clock::Lamport { - clock::Lamport { - replica_id: self.replica_id, - value: self.lamport, - } - } -} - #[derive(Eq, PartialEq, Clone, Debug)] pub struct Fragment { pub id: Locator, - pub insertion_timestamp: InsertionTimestamp, + pub timestamp: clock::Lamport, pub insertion_offset: usize, pub len: usize, pub visible: bool, - pub deletions: HashSet, + pub deletions: HashSet, pub max_undos: clock::Global, } @@ -470,29 +445,26 @@ impl<'a> sum_tree::Dimension<'a, FragmentSummary> for FragmentTextSummary { #[derive(Eq, PartialEq, Clone, Debug)] struct InsertionFragment { - timestamp: clock::Local, + timestamp: clock::Lamport, split_offset: usize, fragment_id: Locator, } #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] struct InsertionFragmentKey { - timestamp: clock::Local, + timestamp: clock::Lamport, split_offset: usize, } #[derive(Clone, Debug, Eq, PartialEq)] pub enum Operation { Edit(EditOperation), - Undo { - undo: UndoOperation, - lamport_timestamp: clock::Lamport, - }, + Undo(UndoOperation), } #[derive(Clone, Debug, Eq, PartialEq)] pub struct EditOperation { - pub timestamp: InsertionTimestamp, + pub timestamp: clock::Lamport, pub version: clock::Global, pub ranges: Vec>, pub new_text: Vec>, @@ -500,9 +472,9 @@ pub struct EditOperation { #[derive(Clone, Debug, Eq, PartialEq)] pub struct UndoOperation { - pub id: clock::Local, - pub counts: HashMap, + pub timestamp: clock::Lamport, pub version: clock::Global, + pub counts: HashMap, } impl Buffer { @@ -514,24 +486,21 @@ impl Buffer { let mut fragments = SumTree::new(); let mut insertions = SumTree::new(); - let mut local_clock = clock::Local::new(replica_id); let mut lamport_clock = clock::Lamport::new(replica_id); let mut version = clock::Global::new(); let visible_text = history.base_text.clone(); if !visible_text.is_empty() { - let insertion_timestamp = InsertionTimestamp { + let insertion_timestamp = clock::Lamport { replica_id: 0, - local: 1, - lamport: 1, + value: 1, }; - local_clock.observe(insertion_timestamp.local()); - lamport_clock.observe(insertion_timestamp.lamport()); - version.observe(insertion_timestamp.local()); + lamport_clock.observe(insertion_timestamp); + version.observe(insertion_timestamp); let fragment_id = Locator::between(&Locator::min(), &Locator::max()); let fragment = Fragment { id: fragment_id, - insertion_timestamp, + timestamp: insertion_timestamp, insertion_offset: 0, len: visible_text.len(), visible: true, @@ -557,8 +526,6 @@ impl Buffer { history, deferred_ops: OperationQueue::new(), deferred_replicas: HashSet::default(), - replica_id, - local_clock, lamport_clock, subscriptions: Default::default(), edit_id_resolvers: Default::default(), @@ -575,7 +542,7 @@ impl Buffer { } pub fn replica_id(&self) -> ReplicaId { - self.local_clock.replica_id + self.lamport_clock.replica_id } pub fn remote_id(&self) -> u64 { @@ -602,16 +569,12 @@ impl Buffer { .map(|(range, new_text)| (range, new_text.into())); self.start_transaction(); - let timestamp = InsertionTimestamp { - replica_id: self.replica_id, - local: self.local_clock.tick().value, - lamport: self.lamport_clock.tick().value, - }; + let timestamp = self.lamport_clock.tick(); let operation = Operation::Edit(self.apply_local_edit(edits, timestamp)); self.history.push(operation.clone()); - self.history.push_undo(operation.local_timestamp()); - self.snapshot.version.observe(operation.local_timestamp()); + self.history.push_undo(operation.timestamp()); + self.snapshot.version.observe(operation.timestamp()); self.end_transaction(); operation } @@ -619,7 +582,7 @@ impl Buffer { fn apply_local_edit>>( &mut self, edits: impl ExactSizeIterator, T)>, - timestamp: InsertionTimestamp, + timestamp: clock::Lamport, ) -> EditOperation { let mut edits_patch = Patch::default(); let mut edit_op = EditOperation { @@ -696,7 +659,7 @@ impl Buffer { .item() .map_or(&Locator::max(), |old_fragment| &old_fragment.id), ), - insertion_timestamp: timestamp, + timestamp, insertion_offset, len: new_text.len(), deletions: Default::default(), @@ -726,7 +689,7 @@ impl Buffer { intersection.insertion_offset += fragment_start - old_fragments.start().visible; intersection.id = Locator::between(&new_fragments.summary().max_id, &intersection.id); - intersection.deletions.insert(timestamp.local()); + intersection.deletions.insert(timestamp); intersection.visible = false; } if intersection.len > 0 { @@ -781,7 +744,7 @@ impl Buffer { self.subscriptions.publish_mut(&edits_patch); self.history .insertion_slices - .insert(timestamp.local(), insertion_slices); + .insert(timestamp, insertion_slices); edit_op } @@ -808,28 +771,23 @@ impl Buffer { fn apply_op(&mut self, op: Operation) -> Result<()> { match op { Operation::Edit(edit) => { - if !self.version.observed(edit.timestamp.local()) { + if !self.version.observed(edit.timestamp) { self.apply_remote_edit( &edit.version, &edit.ranges, &edit.new_text, edit.timestamp, ); - self.snapshot.version.observe(edit.timestamp.local()); - self.local_clock.observe(edit.timestamp.local()); - self.lamport_clock.observe(edit.timestamp.lamport()); - self.resolve_edit(edit.timestamp.local()); + self.snapshot.version.observe(edit.timestamp); + self.lamport_clock.observe(edit.timestamp); + self.resolve_edit(edit.timestamp); } } - Operation::Undo { - undo, - lamport_timestamp, - } => { - if !self.version.observed(undo.id) { + Operation::Undo(undo) => { + if !self.version.observed(undo.timestamp) { self.apply_undo(&undo)?; - self.snapshot.version.observe(undo.id); - self.local_clock.observe(undo.id); - self.lamport_clock.observe(lamport_timestamp); + self.snapshot.version.observe(undo.timestamp); + self.lamport_clock.observe(undo.timestamp); } } } @@ -849,7 +807,7 @@ impl Buffer { version: &clock::Global, ranges: &[Range], new_text: &[Arc], - timestamp: InsertionTimestamp, + timestamp: clock::Lamport, ) { if ranges.is_empty() { return; @@ -916,9 +874,7 @@ impl Buffer { // Skip over insertions that are concurrent to this edit, but have a lower lamport // timestamp. while let Some(fragment) = old_fragments.item() { - if fragment_start == range.start - && fragment.insertion_timestamp.lamport() > timestamp.lamport() - { + if fragment_start == range.start && fragment.timestamp > timestamp { new_ropes.push_fragment(fragment, fragment.visible); new_fragments.push(fragment.clone(), &None); old_fragments.next(&cx); @@ -955,7 +911,7 @@ impl Buffer { .item() .map_or(&Locator::max(), |old_fragment| &old_fragment.id), ), - insertion_timestamp: timestamp, + timestamp, insertion_offset, len: new_text.len(), deletions: Default::default(), @@ -986,7 +942,7 @@ impl Buffer { fragment_start - old_fragments.start().0.full_offset(); intersection.id = Locator::between(&new_fragments.summary().max_id, &intersection.id); - intersection.deletions.insert(timestamp.local()); + intersection.deletions.insert(timestamp); intersection.visible = false; insertion_slices.push(intersection.insertion_slice()); } @@ -1038,13 +994,13 @@ impl Buffer { self.snapshot.insertions.edit(new_insertions, &()); self.history .insertion_slices - .insert(timestamp.local(), insertion_slices); + .insert(timestamp, insertion_slices); self.subscriptions.publish_mut(&edits_patch) } fn fragment_ids_for_edits<'a>( &'a self, - edit_ids: impl Iterator, + edit_ids: impl Iterator, ) -> Vec<&'a Locator> { // Get all of the insertion slices changed by the given edits. let mut insertion_slices = Vec::new(); @@ -1105,7 +1061,7 @@ impl Buffer { let fragment_was_visible = fragment.visible; fragment.visible = fragment.is_visible(&self.undo_map); - fragment.max_undos.observe(undo.id); + fragment.max_undos.observe(undo.timestamp); let old_start = old_fragments.start().1; let new_start = new_fragments.summary().text.visible; @@ -1159,10 +1115,10 @@ impl Buffer { if self.deferred_replicas.contains(&op.replica_id()) { false } else { - match op { - Operation::Edit(edit) => self.version.observed_all(&edit.version), - Operation::Undo { undo, .. } => self.version.observed_all(&undo.version), - } + self.version.observed_all(match op { + Operation::Edit(edit) => &edit.version, + Operation::Undo(undo) => &undo.version, + }) } } @@ -1180,7 +1136,7 @@ impl Buffer { pub fn start_transaction_at(&mut self, now: Instant) -> Option { self.history - .start_transaction(self.version.clone(), now, &mut self.local_clock) + .start_transaction(self.version.clone(), now, &mut self.lamport_clock) } pub fn end_transaction(&mut self) -> Option<(TransactionId, clock::Global)> { @@ -1209,7 +1165,7 @@ impl Buffer { &self.history.base_text } - pub fn operations(&self) -> &TreeMap { + pub fn operations(&self) -> &TreeMap { &self.history.operations } @@ -1289,16 +1245,13 @@ impl Buffer { } let undo = UndoOperation { - id: self.local_clock.tick(), + timestamp: self.lamport_clock.tick(), version: self.version(), counts, }; self.apply_undo(&undo)?; - let operation = Operation::Undo { - undo, - lamport_timestamp: self.lamport_clock.tick(), - }; - self.snapshot.version.observe(operation.local_timestamp()); + self.snapshot.version.observe(undo.timestamp); + let operation = Operation::Undo(undo); self.history.push(operation.clone()); Ok(operation) } @@ -1363,7 +1316,7 @@ impl Buffer { pub fn wait_for_edits( &mut self, - edit_ids: impl IntoIterator, + edit_ids: impl IntoIterator, ) -> impl 'static + Future> { let mut futures = Vec::new(); for edit_id in edit_ids { @@ -1435,7 +1388,7 @@ impl Buffer { self.wait_for_version_txs.clear(); } - fn resolve_edit(&mut self, edit_id: clock::Local) { + fn resolve_edit(&mut self, edit_id: clock::Lamport) { for mut tx in self .edit_id_resolvers .remove(&edit_id) @@ -1513,7 +1466,7 @@ impl Buffer { .insertions .get( &InsertionFragmentKey { - timestamp: fragment.insertion_timestamp.local(), + timestamp: fragment.timestamp, split_offset: fragment.insertion_offset, }, &(), @@ -1996,7 +1949,7 @@ impl BufferSnapshot { let fragment = fragment_cursor.item().unwrap(); let overshoot = offset - *fragment_cursor.start(); Anchor { - timestamp: fragment.insertion_timestamp.local(), + timestamp: fragment.timestamp, offset: fragment.insertion_offset + overshoot, bias, buffer_id: Some(self.remote_id), @@ -2188,15 +2141,14 @@ impl<'a, D: TextDimension + Ord, F: FnMut(&FragmentSummary) -> bool> Iterator fo break; } - let timestamp = fragment.insertion_timestamp.local(); let start_anchor = Anchor { - timestamp, + timestamp: fragment.timestamp, offset: fragment.insertion_offset, bias: Bias::Right, buffer_id: Some(self.buffer_id), }; let end_anchor = Anchor { - timestamp, + timestamp: fragment.timestamp, offset: fragment.insertion_offset + fragment.len, bias: Bias::Left, buffer_id: Some(self.buffer_id), @@ -2269,19 +2221,17 @@ impl<'a, D: TextDimension + Ord, F: FnMut(&FragmentSummary) -> bool> Iterator fo impl Fragment { fn insertion_slice(&self) -> InsertionSlice { InsertionSlice { - insertion_id: self.insertion_timestamp.local(), + insertion_id: self.timestamp, range: self.insertion_offset..self.insertion_offset + self.len, } } fn is_visible(&self, undos: &UndoMap) -> bool { - !undos.is_undone(self.insertion_timestamp.local()) - && self.deletions.iter().all(|d| undos.is_undone(*d)) + !undos.is_undone(self.timestamp) && self.deletions.iter().all(|d| undos.is_undone(*d)) } fn was_visible(&self, version: &clock::Global, undos: &UndoMap) -> bool { - (version.observed(self.insertion_timestamp.local()) - && !undos.was_undone(self.insertion_timestamp.local(), version)) + (version.observed(self.timestamp) && !undos.was_undone(self.timestamp, version)) && self .deletions .iter() @@ -2294,14 +2244,14 @@ impl sum_tree::Item for Fragment { fn summary(&self) -> Self::Summary { let mut max_version = clock::Global::new(); - max_version.observe(self.insertion_timestamp.local()); + max_version.observe(self.timestamp); for deletion in &self.deletions { max_version.observe(*deletion); } max_version.join(&self.max_undos); let mut min_insertion_version = clock::Global::new(); - min_insertion_version.observe(self.insertion_timestamp.local()); + min_insertion_version.observe(self.timestamp); let max_insertion_version = min_insertion_version.clone(); if self.visible { FragmentSummary { @@ -2378,7 +2328,7 @@ impl sum_tree::KeyedItem for InsertionFragment { impl InsertionFragment { fn new(fragment: &Fragment) -> Self { Self { - timestamp: fragment.insertion_timestamp.local(), + timestamp: fragment.timestamp, split_offset: fragment.insertion_offset, fragment_id: fragment.id.clone(), } @@ -2501,10 +2451,10 @@ impl Operation { operation_queue::Operation::lamport_timestamp(self).replica_id } - pub fn local_timestamp(&self) -> clock::Local { + pub fn timestamp(&self) -> clock::Lamport { match self { - Operation::Edit(edit) => edit.timestamp.local(), - Operation::Undo { undo, .. } => undo.id, + Operation::Edit(edit) => edit.timestamp, + Operation::Undo(undo) => undo.timestamp, } } @@ -2523,10 +2473,8 @@ impl Operation { impl operation_queue::Operation for Operation { fn lamport_timestamp(&self) -> clock::Lamport { match self { - Operation::Edit(edit) => edit.timestamp.lamport(), - Operation::Undo { - lamport_timestamp, .. - } => *lamport_timestamp, + Operation::Edit(edit) => edit.timestamp, + Operation::Undo(undo) => undo.timestamp, } } } diff --git a/crates/text/src/undo_map.rs b/crates/text/src/undo_map.rs index ff1b241e73..f95809c02e 100644 --- a/crates/text/src/undo_map.rs +++ b/crates/text/src/undo_map.rs @@ -26,8 +26,8 @@ impl sum_tree::KeyedItem for UndoMapEntry { #[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] struct UndoMapKey { - edit_id: clock::Local, - undo_id: clock::Local, + edit_id: clock::Lamport, + undo_id: clock::Lamport, } impl sum_tree::Summary for UndoMapKey { @@ -50,7 +50,7 @@ impl UndoMap { sum_tree::Edit::Insert(UndoMapEntry { key: UndoMapKey { edit_id: *edit_id, - undo_id: undo.id, + undo_id: undo.timestamp, }, undo_count: *count, }) @@ -59,11 +59,11 @@ impl UndoMap { self.0.edit(edits, &()); } - pub fn is_undone(&self, edit_id: clock::Local) -> bool { + pub fn is_undone(&self, edit_id: clock::Lamport) -> bool { self.undo_count(edit_id) % 2 == 1 } - pub fn was_undone(&self, edit_id: clock::Local, version: &clock::Global) -> bool { + pub fn was_undone(&self, edit_id: clock::Lamport, version: &clock::Global) -> bool { let mut cursor = self.0.cursor::(); cursor.seek( &UndoMapKey { @@ -88,7 +88,7 @@ impl UndoMap { undo_count % 2 == 1 } - pub fn undo_count(&self, edit_id: clock::Local) -> u32 { + pub fn undo_count(&self, edit_id: clock::Lamport) -> u32 { let mut cursor = self.0.cursor::(); cursor.seek( &UndoMapKey {