Correctly defer undo operations when messages arrive out of order

Co-Authored-By: Nathan Sobo <nathan@zed.dev>
Co-Authored-By: Max Brunsfeld <max@zed.dev>
This commit is contained in:
Antonio Scandurra 2022-03-03 18:07:59 +01:00
parent 410ee124af
commit 556d9cc53f
5 changed files with 31 additions and 31 deletions

View File

@ -25,7 +25,13 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation {
replica_id: undo.id.replica_id as u32,
local_timestamp: undo.id.value,
lamport_timestamp: lamport_timestamp.value,
ranges: undo.ranges.iter().map(serialize_range).collect(),
version: From::from(&undo.version),
transaction_ranges: undo
.transaction_ranges
.iter()
.map(serialize_range)
.collect(),
transaction_version: From::from(&undo.transaction_version),
counts: undo
.counts
.iter()
@ -35,7 +41,6 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation {
count: *count,
})
.collect(),
version: From::from(&undo.version),
}),
Operation::UpdateSelections {
selections,
@ -183,6 +188,7 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<Operation> {
replica_id: undo.replica_id as ReplicaId,
value: undo.local_timestamp,
},
version: undo.version.into(),
counts: undo
.counts
.into_iter()
@ -196,8 +202,12 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<Operation> {
)
})
.collect(),
ranges: undo.ranges.into_iter().map(deserialize_range).collect(),
version: undo.version.into(),
transaction_ranges: undo
.transaction_ranges
.into_iter()
.map(deserialize_range)
.collect(),
transaction_version: undo.transaction_version.into(),
},
}),
proto::operation::Variant::UpdateSelections(message) => {

View File

@ -629,9 +629,10 @@ message Operation {
uint32 replica_id = 1;
uint32 local_timestamp = 2;
uint32 lamport_timestamp = 3;
repeated Range ranges = 4;
repeated VectorClockEntry version = 5;
repeated UndoCount counts = 6;
repeated VectorClockEntry version = 4;
repeated Range transaction_ranges = 5;
repeated VectorClockEntry transaction_version = 6;
repeated UndoCount counts = 7;
}
message UpdateSelections {

View File

@ -5,4 +5,4 @@ pub mod proto;
pub use conn::Connection;
pub use peer::*;
pub const PROTOCOL_VERSION: u32 = 8;
pub const PROTOCOL_VERSION: u32 = 9;

View File

@ -520,7 +520,8 @@ pub struct EditOperation {
pub struct UndoOperation {
pub id: clock::Local,
pub counts: HashMap<clock::Local, u32>,
pub ranges: Vec<Range<FullOffset>>,
pub transaction_ranges: Vec<Range<FullOffset>>,
pub transaction_version: clock::Global,
pub version: clock::Global,
}
@ -1039,7 +1040,7 @@ impl Buffer {
let mut edits = Patch::default();
self.snapshot.undo_map.insert(undo);
let mut cx = undo.version.clone();
let mut cx = undo.transaction_version.clone();
for edit_id in undo.counts.keys().copied() {
cx.observe(edit_id);
}
@ -1047,7 +1048,7 @@ impl Buffer {
let mut old_fragments = self.fragments.cursor::<(VersionedFullOffset, usize)>();
let mut new_fragments = old_fragments.slice(
&VersionedFullOffset::Offset(undo.ranges[0].start),
&VersionedFullOffset::Offset(undo.transaction_ranges[0].start),
Bias::Right,
&cx,
);
@ -1055,7 +1056,7 @@ impl Buffer {
RopeBuilder::new(self.visible_text.cursor(0), self.deleted_text.cursor(0));
new_ropes.push_tree(new_fragments.summary().text);
for range in &undo.ranges {
for range in &undo.transaction_ranges {
let mut end_offset = old_fragments.end(&cx).0.full_offset();
if end_offset < range.start {
@ -1073,7 +1074,7 @@ impl Buffer {
let mut fragment = fragment.clone();
let fragment_was_visible = fragment.visible;
if fragment.was_visible(&undo.version, &self.undo_map)
if fragment.was_visible(&undo.transaction_version, &self.undo_map)
|| undo
.counts
.contains_key(&fragment.insertion_timestamp.local())
@ -1264,9 +1265,10 @@ impl Buffer {
let undo = UndoOperation {
id: self.local_clock.tick(),
version: self.version(),
counts,
ranges: transaction.ranges,
version: transaction.start.clone(),
transaction_ranges: transaction.ranges,
transaction_version: transaction.start.clone(),
};
self.apply_undo(&undo)?;
let operation = Operation::Undo {

View File

@ -40,23 +40,10 @@ impl<T: Clone, R: rand::Rng> Network<T, R> {
for (replica, inbox) in self.inboxes.iter_mut() {
if *replica != sender {
for message in &messages {
let min_index = inbox
.iter()
.enumerate()
.rev()
.find_map(|(index, envelope)| {
if sender == envelope.sender {
Some(index + 1)
} else {
None
}
})
.unwrap_or(0);
// Insert one or more duplicates of this message *after* the previous
// message delivered by this replica.
// Insert one or more duplicates of this message, potentially *before* the previous
// message sent by this peer to simulate out-of-order delivery.
for _ in 0..self.rng.gen_range(1..4) {
let insertion_index = self.rng.gen_range(min_index..inbox.len() + 1);
let insertion_index = self.rng.gen_range(0..inbox.len() + 1);
inbox.insert(
insertion_index,
Envelope {