diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index dc7a50151b..127b6831f4 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -147,12 +147,12 @@ pub enum Operation { lamport_timestamp: clock::Lamport, }, UpdateSelections { - replica_id: ReplicaId, selections: Arc<[Selection]>, lamport_timestamp: clock::Lamport, }, UpdateCompletionTriggers { triggers: Vec, + lamport_timestamp: clock::Lamport, }, } @@ -447,27 +447,19 @@ impl Buffer { file: Option>, cx: &mut ModelContext, ) -> Result { - let fragments_len = message.fragments.len(); - let buffer = TextBuffer::from_parts( + let buffer = TextBuffer::new( replica_id, message.id, - &message.visible_text, - &message.deleted_text, - message - .undo_map - .into_iter() - .map(proto::deserialize_undo_map_entry), - message - .fragments - .into_iter() - .enumerate() - .map(|(i, fragment)| { - proto::deserialize_buffer_fragment(fragment, i, fragments_len) - }), - message.lamport_timestamp, - From::from(message.version), + History::new(Arc::from(message.base_text)), ); let mut this = Self::build(buffer, file); + let ops = message + .operations + .into_iter() + .map(proto::deserialize_operation) + .collect::>>()?; + this.apply_ops(ops, cx)?; + for selection_set in message.selections { this.remote_selections.insert( selection_set.replica_id as ReplicaId, @@ -486,37 +478,24 @@ impl Buffer { DiagnosticSet::from_sorted_entries(entries.into_iter().cloned(), &snapshot), cx, ); - this.completion_triggers = message.completion_triggers; - let deferred_ops = message - .deferred_operations - .into_iter() - .map(proto::deserialize_operation) - .collect::>>()?; - this.apply_ops(deferred_ops, cx)?; - Ok(this) } pub fn to_proto(&self) -> proto::BufferState { + let mut operations = self + .text + .history() + .map(|op| proto::serialize_operation(&Operation::Buffer(op.clone()))) + .chain(self.deferred_ops.iter().map(proto::serialize_operation)) + .collect::>(); + operations.sort_unstable_by_key(proto::lamport_timestamp_for_operation); proto::BufferState { id: self.remote_id(), file: self.file.as_ref().map(|f| f.to_proto()), - visible_text: self.text.text(), - deleted_text: self.text.deleted_text(), - undo_map: self - .text - .undo_history() - .map(proto::serialize_undo_map_entry) - .collect(), - version: From::from(&self.version), - lamport_timestamp: self.lamport_clock.value, - fragments: self - .text - .fragments() - .map(proto::serialize_buffer_fragment) - .collect(), + base_text: self.base_text().to_string(), + operations, selections: self .remote_selections .iter() @@ -527,16 +506,6 @@ impl Buffer { }) .collect(), diagnostics: proto::serialize_diagnostics(self.diagnostics.iter()), - deferred_operations: self - .deferred_ops - .iter() - .map(proto::serialize_operation) - .chain( - self.text - .deferred_ops() - .map(|op| proto::serialize_operation(&Operation::Buffer(op.clone()))), - ) - .collect(), completion_triggers: self.completion_triggers.clone(), } } @@ -708,9 +677,13 @@ impl Buffer { .and_then(|c| c.trigger_characters) .unwrap_or_default(); this.update(&mut cx, |this, cx| { + let lamport_timestamp = this.text.lamport_clock.tick(); this.completion_triggers = triggers.clone(); this.send_operation( - Operation::UpdateCompletionTriggers { triggers }, + Operation::UpdateCompletionTriggers { + triggers, + lamport_timestamp, + }, cx, ); cx.notify(); @@ -1404,7 +1377,6 @@ impl Buffer { ); self.send_operation( Operation::UpdateSelections { - replica_id: self.text.replica_id(), selections, lamport_timestamp, }, @@ -1526,7 +1498,7 @@ impl Buffer { let new_text_len = new_text.len(); let edit = self.text.edit(ranges.iter().cloned(), new_text); - let edit_id = edit.timestamp.local(); + let edit_id = edit.local_timestamp(); if let Some((before_edit, edited)) = autoindent_request { let mut inserted = None; @@ -1555,7 +1527,7 @@ impl Buffer { } self.end_transaction(cx); - self.send_operation(Operation::Buffer(text::Operation::Edit(edit)), cx); + self.send_operation(Operation::Buffer(edit), cx); Some(edit_id) } @@ -1702,18 +1674,17 @@ impl Buffer { ); } Operation::UpdateSelections { - replica_id, selections, lamport_timestamp, } => { - if let Some(set) = self.remote_selections.get(&replica_id) { + if let Some(set) = self.remote_selections.get(&lamport_timestamp.replica_id) { if set.lamport_timestamp > lamport_timestamp { return; } } self.remote_selections.insert( - replica_id, + lamport_timestamp.replica_id, SelectionSet { selections, lamport_timestamp, @@ -1722,8 +1693,12 @@ impl Buffer { self.text.lamport_clock.observe(lamport_timestamp); self.selections_update_count += 1; } - Operation::UpdateCompletionTriggers { triggers } => { + Operation::UpdateCompletionTriggers { + triggers, + lamport_timestamp, + } => { self.completion_triggers = triggers; + self.text.lamport_clock.observe(lamport_timestamp); } } } @@ -2812,10 +2787,10 @@ impl operation_queue::Operation for Operation { } | Operation::UpdateSelections { lamport_timestamp, .. - } => *lamport_timestamp, - Operation::UpdateCompletionTriggers { .. } => { - unreachable!("updating completion triggers should never be deferred") } + | Operation::UpdateCompletionTriggers { + lamport_timestamp, .. + } => *lamport_timestamp, } } } diff --git a/crates/language/src/proto.rs b/crates/language/src/proto.rs index 2f23a1242e..62691583fb 100644 --- a/crates/language/src/proto.rs +++ b/crates/language/src/proto.rs @@ -45,11 +45,10 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation { version: From::from(&undo.version), }), Operation::UpdateSelections { - replica_id, selections, lamport_timestamp, } => proto::operation::Variant::UpdateSelections(proto::operation::UpdateSelections { - replica_id: *replica_id as u32, + replica_id: lamport_timestamp.replica_id as u32, lamport_timestamp: lamport_timestamp.value, selections: serialize_selections(selections), }), @@ -61,13 +60,16 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation { lamport_timestamp: lamport_timestamp.value, diagnostics: serialize_diagnostics(diagnostics.iter()), }), - Operation::UpdateCompletionTriggers { triggers } => { - proto::operation::Variant::UpdateCompletionTriggers( - proto::operation::UpdateCompletionTriggers { - triggers: triggers.clone(), - }, - ) - } + Operation::UpdateCompletionTriggers { + triggers, + lamport_timestamp, + } => proto::operation::Variant::UpdateCompletionTriggers( + proto::operation::UpdateCompletionTriggers { + replica_id: lamport_timestamp.replica_id as u32, + lamport_timestamp: lamport_timestamp.value, + triggers: triggers.clone(), + }, + ), }), } } @@ -233,7 +235,6 @@ pub fn deserialize_operation(message: proto::Operation) -> Result { .collect::>(); Operation::UpdateSelections { - replica_id: message.replica_id as ReplicaId, lamport_timestamp: clock::Lamport { replica_id: message.replica_id as ReplicaId, value: message.lamport_timestamp, @@ -251,6 +252,10 @@ pub fn deserialize_operation(message: proto::Operation) -> Result { proto::operation::Variant::UpdateCompletionTriggers(message) => { Operation::UpdateCompletionTriggers { triggers: message.triggers, + lamport_timestamp: clock::Lamport { + replica_id: message.replica_id as ReplicaId, + value: message.lamport_timestamp, + }, } } }, @@ -381,6 +386,38 @@ pub fn deserialize_anchor(anchor: proto::Anchor) -> Option { }) } +pub fn lamport_timestamp_for_operation(operation: &proto::Operation) -> Option { + let replica_id; + let value; + match operation.variant.as_ref()? { + proto::operation::Variant::Edit(op) => { + replica_id = op.replica_id; + value = op.lamport_timestamp; + } + proto::operation::Variant::Undo(op) => { + replica_id = op.replica_id; + value = op.lamport_timestamp; + } + proto::operation::Variant::UpdateDiagnostics(op) => { + replica_id = op.replica_id; + value = op.lamport_timestamp; + } + proto::operation::Variant::UpdateSelections(op) => { + replica_id = op.replica_id; + value = op.lamport_timestamp; + } + proto::operation::Variant::UpdateCompletionTriggers(op) => { + replica_id = op.replica_id; + value = op.lamport_timestamp; + } + } + + Some(clock::Lamport { + replica_id: replica_id as ReplicaId, + value, + }) +} + pub fn serialize_completion(completion: &Completion) -> proto::Completion { proto::Completion { old_start: Some(serialize_anchor(&completion.old_range.start)), diff --git a/crates/language/src/tests.rs b/crates/language/src/tests.rs index b852680bd6..0e436b495b 100644 --- a/crates/language/src/tests.rs +++ b/crates/language/src/tests.rs @@ -1072,15 +1072,15 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { for buffer in &buffers { let buffer = buffer.read(cx).snapshot(); + let actual_remote_selections = buffer + .remote_selections_in_range(Anchor::min()..Anchor::max()) + .map(|(replica_id, selections)| (replica_id, selections.collect::>())) + .collect::>(); let expected_remote_selections = active_selections .iter() .filter(|(replica_id, _)| **replica_id != buffer.replica_id()) .map(|(replica_id, selections)| (*replica_id, selections.iter().collect::>())) .collect::>(); - let actual_remote_selections = buffer - .remote_selections_in_range(Anchor::min()..Anchor::max()) - .map(|(replica_id, selections)| (replica_id, selections.collect::>())) - .collect::>(); assert_eq!(actual_remote_selections, expected_remote_selections); } } diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index f61aaf38e6..5e92c7b22f 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -406,16 +406,11 @@ message Buffer { message BufferState { uint64 id = 1; optional File file = 2; - string visible_text = 3; - string deleted_text = 4; - repeated BufferFragment fragments = 5; - repeated UndoMapEntry undo_map = 6; - repeated VectorClockEntry version = 7; - repeated SelectionSet selections = 8; - repeated Diagnostic diagnostics = 9; - uint32 lamport_timestamp = 10; - repeated Operation deferred_operations = 11; - repeated string completion_triggers = 12; + string base_text = 3; + repeated Operation operations = 4; + repeated SelectionSet selections = 5; + repeated Diagnostic diagnostics = 6; + repeated string completion_triggers = 7; } message BufferFragment { @@ -514,7 +509,9 @@ message Operation { } message UpdateCompletionTriggers { - repeated string triggers = 1; + uint32 replica_id = 1; + uint32 lamport_timestamp = 2; + repeated string triggers = 3; } } diff --git a/crates/text/src/tests.rs b/crates/text/src/tests.rs index b4ac7d46d4..0c6c95cb4a 100644 --- a/crates/text/src/tests.rs +++ b/crates/text/src/tests.rs @@ -550,12 +550,12 @@ fn test_concurrent_edits() { let buf3_op = buffer3.edit(vec![5..6], "56"); assert_eq!(buffer3.text(), "abcde56"); - buffer1.apply_op(Operation::Edit(buf2_op.clone())).unwrap(); - buffer1.apply_op(Operation::Edit(buf3_op.clone())).unwrap(); - buffer2.apply_op(Operation::Edit(buf1_op.clone())).unwrap(); - buffer2.apply_op(Operation::Edit(buf3_op.clone())).unwrap(); - buffer3.apply_op(Operation::Edit(buf1_op.clone())).unwrap(); - buffer3.apply_op(Operation::Edit(buf2_op.clone())).unwrap(); + buffer1.apply_op(buf2_op.clone()).unwrap(); + buffer1.apply_op(buf3_op.clone()).unwrap(); + buffer2.apply_op(buf1_op.clone()).unwrap(); + buffer2.apply_op(buf3_op.clone()).unwrap(); + buffer3.apply_op(buf1_op.clone()).unwrap(); + buffer3.apply_op(buf2_op.clone()).unwrap(); assert_eq!(buffer1.text(), "a12c34e56"); assert_eq!(buffer2.text(), "a12c34e56"); diff --git a/crates/text/src/text.rs b/crates/text/src/text.rs index b1f513c375..37e5c72402 100644 --- a/crates/text/src/text.rs +++ b/crates/text/src/text.rs @@ -130,7 +130,7 @@ impl Transaction { pub struct History { // TODO: Turn this into a String or Rope, maybe. pub base_text: Arc, - ops: HashMap, + operations: HashMap, undo_stack: Vec, redo_stack: Vec, transaction_depth: usize, @@ -142,7 +142,7 @@ impl History { pub fn new(base_text: Arc) -> Self { Self { base_text, - ops: Default::default(), + operations: Default::default(), undo_stack: Vec::new(), redo_stack: Vec::new(), transaction_depth: 0, @@ -151,8 +151,8 @@ impl History { } } - fn push(&mut self, op: EditOperation) { - self.ops.insert(op.timestamp.local(), op); + fn push(&mut self, op: Operation) { + self.operations.insert(op.local_timestamp(), op); } fn start_transaction(&mut self, start: clock::Global, now: Instant) -> Option { @@ -216,7 +216,7 @@ impl History { if let Some(last_transaction) = transactions_to_keep.last_mut() { for transaction in &*transactions_to_merge { for edit_id in &transaction.edits { - last_transaction.push_edit(&self.ops[edit_id]); + last_transaction.push_edit(self.operations[edit_id].as_edit().unwrap()); } } @@ -240,8 +240,11 @@ impl History { assert_eq!(self.transaction_depth, 0); let mut edit_ids = edit_ids.into_iter().peekable(); - if let Some(first_edit_id) = edit_ids.peek() { - let version = self.ops[first_edit_id].version.clone(); + if let Some(first_edit) = edit_ids + .peek() + .and_then(|e| self.operations.get(&e)?.as_edit()) + { + let version = first_edit.version.clone(); self.start_transaction(version, now); for edit_id in edit_ids { self.push_undo(edit_id); @@ -250,10 +253,12 @@ impl History { } } - fn push_undo(&mut self, edit_id: clock::Local) { + fn push_undo(&mut self, op_id: clock::Local) { assert_ne!(self.transaction_depth, 0); - let last_transaction = self.undo_stack.last_mut().unwrap(); - last_transaction.push_edit(&self.ops[&edit_id]); + if let Some(Operation::Edit(edit)) = self.operations.get(&op_id) { + let last_transaction = self.undo_stack.last_mut().unwrap(); + last_transaction.push_edit(&edit); + } } fn pop_undo(&mut self) -> Option<&Transaction> { @@ -545,56 +550,6 @@ impl Buffer { } } - pub fn from_parts( - replica_id: u16, - remote_id: u64, - visible_text: &str, - deleted_text: &str, - undo_map: impl Iterator)>, - fragments: impl ExactSizeIterator, - lamport_timestamp: u32, - version: clock::Global, - ) -> Self { - let visible_text = visible_text.into(); - let deleted_text = deleted_text.into(); - let fragments = SumTree::from_iter(fragments, &None); - let mut insertions = fragments - .iter() - .map(|fragment| InsertionFragment { - timestamp: fragment.insertion_timestamp.local(), - split_offset: fragment.insertion_offset, - fragment_id: fragment.id.clone(), - }) - .collect::>(); - insertions.sort_unstable_by_key(|i| (i.timestamp, i.split_offset)); - Self { - remote_id, - replica_id, - history: History::new("".into()), - deferred_ops: OperationQueue::new(), - deferred_replicas: Default::default(), - local_clock: clock::Local { - replica_id, - value: version.get(replica_id) + 1, - }, - lamport_clock: clock::Lamport { - replica_id, - value: lamport_timestamp, - }, - subscriptions: Default::default(), - edit_id_resolvers: Default::default(), - snapshot: BufferSnapshot { - replica_id, - visible_text, - deleted_text, - undo_map: UndoMap(undo_map.collect()), - fragments, - insertions: SumTree::from_iter(insertions, &()), - version, - }, - } - } - pub fn version(&self) -> clock::Global { self.version.clone() } @@ -619,7 +574,7 @@ impl Buffer { self.history.group_interval } - pub fn edit(&mut self, ranges: R, new_text: T) -> EditOperation + pub fn edit(&mut self, ranges: R, new_text: T) -> Operation where R: IntoIterator, I: ExactSizeIterator>, @@ -640,13 +595,14 @@ impl Buffer { local: self.local_clock.tick().value, lamport: self.lamport_clock.tick().value, }; - let edit = self.apply_local_edit(ranges.into_iter(), new_text, timestamp); + let operation = + Operation::Edit(self.apply_local_edit(ranges.into_iter(), new_text, timestamp)); - self.history.push(edit.clone()); - self.history.push_undo(edit.timestamp.local()); - self.snapshot.version.observe(edit.timestamp.local()); + self.history.push(operation.clone()); + self.history.push_undo(operation.local_timestamp()); + self.snapshot.version.observe(operation.local_timestamp()); self.end_transaction(); - edit + operation } fn apply_local_edit( @@ -814,6 +770,7 @@ impl Buffer { pub fn apply_ops>(&mut self, ops: I) -> Result<()> { let mut deferred_ops = Vec::new(); for op in ops { + self.history.push(op.clone()); if self.can_apply_op(&op) { self.apply_op(op)?; } else { @@ -838,7 +795,6 @@ impl Buffer { ); self.snapshot.version.observe(edit.timestamp.local()); self.resolve_edit(edit.timestamp.local()); - self.history.push(edit); } } Operation::Undo { @@ -1141,10 +1097,6 @@ impl Buffer { Ok(()) } - pub fn deferred_ops(&self) -> impl Iterator { - self.deferred_ops.iter() - } - fn flush_deferred_ops(&mut self) -> Result<()> { self.deferred_replicas.clear(); let mut deferred_ops = Vec::new(); @@ -1205,8 +1157,8 @@ impl Buffer { &self.history.base_text } - pub fn history(&self) -> impl Iterator { - self.history.ops.values() + pub fn history(&self) -> impl Iterator { + self.history.operations.values() } pub fn undo_history(&self) -> impl Iterator { @@ -1271,12 +1223,13 @@ impl Buffer { version: transaction.start.clone(), }; self.apply_undo(&undo)?; - self.snapshot.version.observe(undo.id); - - Ok(Operation::Undo { + let operation = Operation::Undo { undo, lamport_timestamp: self.lamport_clock.tick(), - }) + }; + self.snapshot.version.observe(operation.local_timestamp()); + self.history.push(operation.clone()); + Ok(operation) } pub fn push_transaction( @@ -1403,7 +1356,7 @@ impl Buffer { new_text ); let op = self.edit(old_ranges.iter().cloned(), new_text.as_str()); - (old_ranges, new_text, Operation::Edit(op)) + (old_ranges, new_text, op) } pub fn randomly_undo_redo(&mut self, rng: &mut impl rand::Rng) -> Vec { @@ -2181,6 +2134,20 @@ impl Operation { operation_queue::Operation::lamport_timestamp(self).replica_id } + pub fn local_timestamp(&self) -> clock::Local { + match self { + Operation::Edit(edit) => edit.timestamp.local(), + Operation::Undo { undo, .. } => undo.id, + } + } + + pub fn as_edit(&self) -> Option<&EditOperation> { + match self { + Operation::Edit(edit) => Some(edit), + _ => None, + } + } + pub fn is_edit(&self) -> bool { match self { Operation::Edit { .. } => true,