Serialize buffer in terms of operations rather than state

This is required because, after joining, we want to be able to refer
to operations that have happened prior to joining, which are not
captured by the state. There is probably a way of reconstructing operations
from the state, but that seems unnecessary and we've already talked about
wanting to have the server store operations rather than state once we start
persisting worktrees.
This commit is contained in:
Antonio Scandurra 2022-02-08 14:50:47 +01:00
parent dca974c7d4
commit 624eb5907e
6 changed files with 146 additions and 170 deletions

View File

@ -147,12 +147,12 @@ pub enum Operation {
lamport_timestamp: clock::Lamport,
},
UpdateSelections {
replica_id: ReplicaId,
selections: Arc<[Selection<Anchor>]>,
lamport_timestamp: clock::Lamport,
},
UpdateCompletionTriggers {
triggers: Vec<String>,
lamport_timestamp: clock::Lamport,
},
}
@ -447,27 +447,19 @@ impl Buffer {
file: Option<Box<dyn File>>,
cx: &mut ModelContext<Self>,
) -> Result<Self> {
let fragments_len = message.fragments.len();
let buffer = TextBuffer::from_parts(
let buffer = TextBuffer::new(
replica_id,
message.id,
&message.visible_text,
&message.deleted_text,
message
.undo_map
.into_iter()
.map(proto::deserialize_undo_map_entry),
message
.fragments
.into_iter()
.enumerate()
.map(|(i, fragment)| {
proto::deserialize_buffer_fragment(fragment, i, fragments_len)
}),
message.lamport_timestamp,
From::from(message.version),
History::new(Arc::from(message.base_text)),
);
let mut this = Self::build(buffer, file);
let ops = message
.operations
.into_iter()
.map(proto::deserialize_operation)
.collect::<Result<Vec<_>>>()?;
this.apply_ops(ops, cx)?;
for selection_set in message.selections {
this.remote_selections.insert(
selection_set.replica_id as ReplicaId,
@ -486,37 +478,24 @@ impl Buffer {
DiagnosticSet::from_sorted_entries(entries.into_iter().cloned(), &snapshot),
cx,
);
this.completion_triggers = message.completion_triggers;
let deferred_ops = message
.deferred_operations
.into_iter()
.map(proto::deserialize_operation)
.collect::<Result<Vec<_>>>()?;
this.apply_ops(deferred_ops, cx)?;
Ok(this)
}
pub fn to_proto(&self) -> proto::BufferState {
let mut operations = self
.text
.history()
.map(|op| proto::serialize_operation(&Operation::Buffer(op.clone())))
.chain(self.deferred_ops.iter().map(proto::serialize_operation))
.collect::<Vec<_>>();
operations.sort_unstable_by_key(proto::lamport_timestamp_for_operation);
proto::BufferState {
id: self.remote_id(),
file: self.file.as_ref().map(|f| f.to_proto()),
visible_text: self.text.text(),
deleted_text: self.text.deleted_text(),
undo_map: self
.text
.undo_history()
.map(proto::serialize_undo_map_entry)
.collect(),
version: From::from(&self.version),
lamport_timestamp: self.lamport_clock.value,
fragments: self
.text
.fragments()
.map(proto::serialize_buffer_fragment)
.collect(),
base_text: self.base_text().to_string(),
operations,
selections: self
.remote_selections
.iter()
@ -527,16 +506,6 @@ impl Buffer {
})
.collect(),
diagnostics: proto::serialize_diagnostics(self.diagnostics.iter()),
deferred_operations: self
.deferred_ops
.iter()
.map(proto::serialize_operation)
.chain(
self.text
.deferred_ops()
.map(|op| proto::serialize_operation(&Operation::Buffer(op.clone()))),
)
.collect(),
completion_triggers: self.completion_triggers.clone(),
}
}
@ -708,9 +677,13 @@ impl Buffer {
.and_then(|c| c.trigger_characters)
.unwrap_or_default();
this.update(&mut cx, |this, cx| {
let lamport_timestamp = this.text.lamport_clock.tick();
this.completion_triggers = triggers.clone();
this.send_operation(
Operation::UpdateCompletionTriggers { triggers },
Operation::UpdateCompletionTriggers {
triggers,
lamport_timestamp,
},
cx,
);
cx.notify();
@ -1404,7 +1377,6 @@ impl Buffer {
);
self.send_operation(
Operation::UpdateSelections {
replica_id: self.text.replica_id(),
selections,
lamport_timestamp,
},
@ -1526,7 +1498,7 @@ impl Buffer {
let new_text_len = new_text.len();
let edit = self.text.edit(ranges.iter().cloned(), new_text);
let edit_id = edit.timestamp.local();
let edit_id = edit.local_timestamp();
if let Some((before_edit, edited)) = autoindent_request {
let mut inserted = None;
@ -1555,7 +1527,7 @@ impl Buffer {
}
self.end_transaction(cx);
self.send_operation(Operation::Buffer(text::Operation::Edit(edit)), cx);
self.send_operation(Operation::Buffer(edit), cx);
Some(edit_id)
}
@ -1702,18 +1674,17 @@ impl Buffer {
);
}
Operation::UpdateSelections {
replica_id,
selections,
lamport_timestamp,
} => {
if let Some(set) = self.remote_selections.get(&replica_id) {
if let Some(set) = self.remote_selections.get(&lamport_timestamp.replica_id) {
if set.lamport_timestamp > lamport_timestamp {
return;
}
}
self.remote_selections.insert(
replica_id,
lamport_timestamp.replica_id,
SelectionSet {
selections,
lamport_timestamp,
@ -1722,8 +1693,12 @@ impl Buffer {
self.text.lamport_clock.observe(lamport_timestamp);
self.selections_update_count += 1;
}
Operation::UpdateCompletionTriggers { triggers } => {
Operation::UpdateCompletionTriggers {
triggers,
lamport_timestamp,
} => {
self.completion_triggers = triggers;
self.text.lamport_clock.observe(lamport_timestamp);
}
}
}
@ -2812,10 +2787,10 @@ impl operation_queue::Operation for Operation {
}
| Operation::UpdateSelections {
lamport_timestamp, ..
} => *lamport_timestamp,
Operation::UpdateCompletionTriggers { .. } => {
unreachable!("updating completion triggers should never be deferred")
}
| Operation::UpdateCompletionTriggers {
lamport_timestamp, ..
} => *lamport_timestamp,
}
}
}

View File

@ -45,11 +45,10 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation {
version: From::from(&undo.version),
}),
Operation::UpdateSelections {
replica_id,
selections,
lamport_timestamp,
} => proto::operation::Variant::UpdateSelections(proto::operation::UpdateSelections {
replica_id: *replica_id as u32,
replica_id: lamport_timestamp.replica_id as u32,
lamport_timestamp: lamport_timestamp.value,
selections: serialize_selections(selections),
}),
@ -61,13 +60,16 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation {
lamport_timestamp: lamport_timestamp.value,
diagnostics: serialize_diagnostics(diagnostics.iter()),
}),
Operation::UpdateCompletionTriggers { triggers } => {
proto::operation::Variant::UpdateCompletionTriggers(
proto::operation::UpdateCompletionTriggers {
triggers: triggers.clone(),
},
)
}
Operation::UpdateCompletionTriggers {
triggers,
lamport_timestamp,
} => proto::operation::Variant::UpdateCompletionTriggers(
proto::operation::UpdateCompletionTriggers {
replica_id: lamport_timestamp.replica_id as u32,
lamport_timestamp: lamport_timestamp.value,
triggers: triggers.clone(),
},
),
}),
}
}
@ -233,7 +235,6 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<Operation> {
.collect::<Vec<_>>();
Operation::UpdateSelections {
replica_id: message.replica_id as ReplicaId,
lamport_timestamp: clock::Lamport {
replica_id: message.replica_id as ReplicaId,
value: message.lamport_timestamp,
@ -251,6 +252,10 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<Operation> {
proto::operation::Variant::UpdateCompletionTriggers(message) => {
Operation::UpdateCompletionTriggers {
triggers: message.triggers,
lamport_timestamp: clock::Lamport {
replica_id: message.replica_id as ReplicaId,
value: message.lamport_timestamp,
},
}
}
},
@ -381,6 +386,38 @@ pub fn deserialize_anchor(anchor: proto::Anchor) -> Option<Anchor> {
})
}
pub fn lamport_timestamp_for_operation(operation: &proto::Operation) -> Option<clock::Lamport> {
let replica_id;
let value;
match operation.variant.as_ref()? {
proto::operation::Variant::Edit(op) => {
replica_id = op.replica_id;
value = op.lamport_timestamp;
}
proto::operation::Variant::Undo(op) => {
replica_id = op.replica_id;
value = op.lamport_timestamp;
}
proto::operation::Variant::UpdateDiagnostics(op) => {
replica_id = op.replica_id;
value = op.lamport_timestamp;
}
proto::operation::Variant::UpdateSelections(op) => {
replica_id = op.replica_id;
value = op.lamport_timestamp;
}
proto::operation::Variant::UpdateCompletionTriggers(op) => {
replica_id = op.replica_id;
value = op.lamport_timestamp;
}
}
Some(clock::Lamport {
replica_id: replica_id as ReplicaId,
value,
})
}
pub fn serialize_completion(completion: &Completion<Anchor>) -> proto::Completion {
proto::Completion {
old_start: Some(serialize_anchor(&completion.old_range.start)),

View File

@ -1072,15 +1072,15 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
for buffer in &buffers {
let buffer = buffer.read(cx).snapshot();
let actual_remote_selections = buffer
.remote_selections_in_range(Anchor::min()..Anchor::max())
.map(|(replica_id, selections)| (replica_id, selections.collect::<Vec<_>>()))
.collect::<Vec<_>>();
let expected_remote_selections = active_selections
.iter()
.filter(|(replica_id, _)| **replica_id != buffer.replica_id())
.map(|(replica_id, selections)| (*replica_id, selections.iter().collect::<Vec<_>>()))
.collect::<Vec<_>>();
let actual_remote_selections = buffer
.remote_selections_in_range(Anchor::min()..Anchor::max())
.map(|(replica_id, selections)| (replica_id, selections.collect::<Vec<_>>()))
.collect::<Vec<_>>();
assert_eq!(actual_remote_selections, expected_remote_selections);
}
}

View File

@ -406,16 +406,11 @@ message Buffer {
message BufferState {
uint64 id = 1;
optional File file = 2;
string visible_text = 3;
string deleted_text = 4;
repeated BufferFragment fragments = 5;
repeated UndoMapEntry undo_map = 6;
repeated VectorClockEntry version = 7;
repeated SelectionSet selections = 8;
repeated Diagnostic diagnostics = 9;
uint32 lamport_timestamp = 10;
repeated Operation deferred_operations = 11;
repeated string completion_triggers = 12;
string base_text = 3;
repeated Operation operations = 4;
repeated SelectionSet selections = 5;
repeated Diagnostic diagnostics = 6;
repeated string completion_triggers = 7;
}
message BufferFragment {
@ -514,7 +509,9 @@ message Operation {
}
message UpdateCompletionTriggers {
repeated string triggers = 1;
uint32 replica_id = 1;
uint32 lamport_timestamp = 2;
repeated string triggers = 3;
}
}

View File

@ -550,12 +550,12 @@ fn test_concurrent_edits() {
let buf3_op = buffer3.edit(vec![5..6], "56");
assert_eq!(buffer3.text(), "abcde56");
buffer1.apply_op(Operation::Edit(buf2_op.clone())).unwrap();
buffer1.apply_op(Operation::Edit(buf3_op.clone())).unwrap();
buffer2.apply_op(Operation::Edit(buf1_op.clone())).unwrap();
buffer2.apply_op(Operation::Edit(buf3_op.clone())).unwrap();
buffer3.apply_op(Operation::Edit(buf1_op.clone())).unwrap();
buffer3.apply_op(Operation::Edit(buf2_op.clone())).unwrap();
buffer1.apply_op(buf2_op.clone()).unwrap();
buffer1.apply_op(buf3_op.clone()).unwrap();
buffer2.apply_op(buf1_op.clone()).unwrap();
buffer2.apply_op(buf3_op.clone()).unwrap();
buffer3.apply_op(buf1_op.clone()).unwrap();
buffer3.apply_op(buf2_op.clone()).unwrap();
assert_eq!(buffer1.text(), "a12c34e56");
assert_eq!(buffer2.text(), "a12c34e56");

View File

@ -130,7 +130,7 @@ impl Transaction {
pub struct History {
// TODO: Turn this into a String or Rope, maybe.
pub base_text: Arc<str>,
ops: HashMap<clock::Local, EditOperation>,
operations: HashMap<clock::Local, Operation>,
undo_stack: Vec<Transaction>,
redo_stack: Vec<Transaction>,
transaction_depth: usize,
@ -142,7 +142,7 @@ impl History {
pub fn new(base_text: Arc<str>) -> Self {
Self {
base_text,
ops: Default::default(),
operations: Default::default(),
undo_stack: Vec::new(),
redo_stack: Vec::new(),
transaction_depth: 0,
@ -151,8 +151,8 @@ impl History {
}
}
fn push(&mut self, op: EditOperation) {
self.ops.insert(op.timestamp.local(), op);
fn push(&mut self, op: Operation) {
self.operations.insert(op.local_timestamp(), op);
}
fn start_transaction(&mut self, start: clock::Global, now: Instant) -> Option<TransactionId> {
@ -216,7 +216,7 @@ impl History {
if let Some(last_transaction) = transactions_to_keep.last_mut() {
for transaction in &*transactions_to_merge {
for edit_id in &transaction.edits {
last_transaction.push_edit(&self.ops[edit_id]);
last_transaction.push_edit(self.operations[edit_id].as_edit().unwrap());
}
}
@ -240,8 +240,11 @@ impl History {
assert_eq!(self.transaction_depth, 0);
let mut edit_ids = edit_ids.into_iter().peekable();
if let Some(first_edit_id) = edit_ids.peek() {
let version = self.ops[first_edit_id].version.clone();
if let Some(first_edit) = edit_ids
.peek()
.and_then(|e| self.operations.get(&e)?.as_edit())
{
let version = first_edit.version.clone();
self.start_transaction(version, now);
for edit_id in edit_ids {
self.push_undo(edit_id);
@ -250,10 +253,12 @@ impl History {
}
}
fn push_undo(&mut self, edit_id: clock::Local) {
fn push_undo(&mut self, op_id: clock::Local) {
assert_ne!(self.transaction_depth, 0);
let last_transaction = self.undo_stack.last_mut().unwrap();
last_transaction.push_edit(&self.ops[&edit_id]);
if let Some(Operation::Edit(edit)) = self.operations.get(&op_id) {
let last_transaction = self.undo_stack.last_mut().unwrap();
last_transaction.push_edit(&edit);
}
}
fn pop_undo(&mut self) -> Option<&Transaction> {
@ -545,56 +550,6 @@ impl Buffer {
}
}
pub fn from_parts(
replica_id: u16,
remote_id: u64,
visible_text: &str,
deleted_text: &str,
undo_map: impl Iterator<Item = (clock::Local, Vec<(clock::Local, u32)>)>,
fragments: impl ExactSizeIterator<Item = Fragment>,
lamport_timestamp: u32,
version: clock::Global,
) -> Self {
let visible_text = visible_text.into();
let deleted_text = deleted_text.into();
let fragments = SumTree::from_iter(fragments, &None);
let mut insertions = fragments
.iter()
.map(|fragment| InsertionFragment {
timestamp: fragment.insertion_timestamp.local(),
split_offset: fragment.insertion_offset,
fragment_id: fragment.id.clone(),
})
.collect::<Vec<_>>();
insertions.sort_unstable_by_key(|i| (i.timestamp, i.split_offset));
Self {
remote_id,
replica_id,
history: History::new("".into()),
deferred_ops: OperationQueue::new(),
deferred_replicas: Default::default(),
local_clock: clock::Local {
replica_id,
value: version.get(replica_id) + 1,
},
lamport_clock: clock::Lamport {
replica_id,
value: lamport_timestamp,
},
subscriptions: Default::default(),
edit_id_resolvers: Default::default(),
snapshot: BufferSnapshot {
replica_id,
visible_text,
deleted_text,
undo_map: UndoMap(undo_map.collect()),
fragments,
insertions: SumTree::from_iter(insertions, &()),
version,
},
}
}
pub fn version(&self) -> clock::Global {
self.version.clone()
}
@ -619,7 +574,7 @@ impl Buffer {
self.history.group_interval
}
pub fn edit<R, I, S, T>(&mut self, ranges: R, new_text: T) -> EditOperation
pub fn edit<R, I, S, T>(&mut self, ranges: R, new_text: T) -> Operation
where
R: IntoIterator<IntoIter = I>,
I: ExactSizeIterator<Item = Range<S>>,
@ -640,13 +595,14 @@ impl Buffer {
local: self.local_clock.tick().value,
lamport: self.lamport_clock.tick().value,
};
let edit = self.apply_local_edit(ranges.into_iter(), new_text, timestamp);
let operation =
Operation::Edit(self.apply_local_edit(ranges.into_iter(), new_text, timestamp));
self.history.push(edit.clone());
self.history.push_undo(edit.timestamp.local());
self.snapshot.version.observe(edit.timestamp.local());
self.history.push(operation.clone());
self.history.push_undo(operation.local_timestamp());
self.snapshot.version.observe(operation.local_timestamp());
self.end_transaction();
edit
operation
}
fn apply_local_edit<S: ToOffset>(
@ -814,6 +770,7 @@ impl Buffer {
pub fn apply_ops<I: IntoIterator<Item = Operation>>(&mut self, ops: I) -> Result<()> {
let mut deferred_ops = Vec::new();
for op in ops {
self.history.push(op.clone());
if self.can_apply_op(&op) {
self.apply_op(op)?;
} else {
@ -838,7 +795,6 @@ impl Buffer {
);
self.snapshot.version.observe(edit.timestamp.local());
self.resolve_edit(edit.timestamp.local());
self.history.push(edit);
}
}
Operation::Undo {
@ -1141,10 +1097,6 @@ impl Buffer {
Ok(())
}
pub fn deferred_ops(&self) -> impl Iterator<Item = &Operation> {
self.deferred_ops.iter()
}
fn flush_deferred_ops(&mut self) -> Result<()> {
self.deferred_replicas.clear();
let mut deferred_ops = Vec::new();
@ -1205,8 +1157,8 @@ impl Buffer {
&self.history.base_text
}
pub fn history(&self) -> impl Iterator<Item = &EditOperation> {
self.history.ops.values()
pub fn history(&self) -> impl Iterator<Item = &Operation> {
self.history.operations.values()
}
pub fn undo_history(&self) -> impl Iterator<Item = (&clock::Local, &[(clock::Local, u32)])> {
@ -1271,12 +1223,13 @@ impl Buffer {
version: transaction.start.clone(),
};
self.apply_undo(&undo)?;
self.snapshot.version.observe(undo.id);
Ok(Operation::Undo {
let operation = Operation::Undo {
undo,
lamport_timestamp: self.lamport_clock.tick(),
})
};
self.snapshot.version.observe(operation.local_timestamp());
self.history.push(operation.clone());
Ok(operation)
}
pub fn push_transaction(
@ -1403,7 +1356,7 @@ impl Buffer {
new_text
);
let op = self.edit(old_ranges.iter().cloned(), new_text.as_str());
(old_ranges, new_text, Operation::Edit(op))
(old_ranges, new_text, op)
}
pub fn randomly_undo_redo(&mut self, rng: &mut impl rand::Rng) -> Vec<Operation> {
@ -2181,6 +2134,20 @@ impl Operation {
operation_queue::Operation::lamport_timestamp(self).replica_id
}
pub fn local_timestamp(&self) -> clock::Local {
match self {
Operation::Edit(edit) => edit.timestamp.local(),
Operation::Undo { undo, .. } => undo.id,
}
}
pub fn as_edit(&self) -> Option<&EditOperation> {
match self {
Operation::Edit(edit) => Some(edit),
_ => None,
}
}
pub fn is_edit(&self) -> bool {
match self {
Operation::Edit { .. } => true,