Move operation serialization off the main thread

This commit is contained in:
Antonio Scandurra 2022-08-23 16:34:25 +02:00
parent 954695f5fe
commit f0d35ccc50
5 changed files with 57 additions and 39 deletions

View File

@ -3304,7 +3304,10 @@ mod tests {
fn test_remote_multibuffer(cx: &mut MutableAppContext) {
let host_buffer = cx.add_model(|cx| Buffer::new(0, "a", cx));
let guest_buffer = cx.add_model(|cx| {
let (state, ops) = host_buffer.read(cx).to_proto();
let state = host_buffer.read(cx).to_proto();
let ops = cx
.background()
.block(host_buffer.read(cx).serialize_ops(cx));
let mut buffer = Buffer::from_proto(1, state, None).unwrap();
buffer
.apply_ops(

View File

@ -371,40 +371,46 @@ impl Buffer {
Ok(this)
}
pub fn to_proto(&self) -> (proto::BufferState, Vec<proto::Operation>) {
let mut operations = self
.text
.history()
.map(|op| proto::serialize_operation(&Operation::Buffer(op.clone())))
.chain(self.deferred_ops.iter().map(proto::serialize_operation))
.chain(self.remote_selections.iter().map(|(_, set)| {
proto::serialize_operation(&Operation::UpdateSelections {
selections: set.selections.clone(),
lamport_timestamp: set.lamport_timestamp,
line_mode: set.line_mode,
})
}))
.chain(Some(proto::serialize_operation(
&Operation::UpdateDiagnostics {
diagnostics: self.diagnostics.iter().cloned().collect(),
lamport_timestamp: self.diagnostics_timestamp,
},
)))
.chain(Some(proto::serialize_operation(
&Operation::UpdateCompletionTriggers {
triggers: self.completion_triggers.clone(),
lamport_timestamp: self.completion_triggers_timestamp,
},
)))
.collect::<Vec<_>>();
operations.sort_unstable_by_key(proto::lamport_timestamp_for_operation);
let state = proto::BufferState {
pub fn to_proto(&self) -> proto::BufferState {
proto::BufferState {
id: self.remote_id(),
file: self.file.as_ref().map(|f| f.to_proto()),
base_text: self.base_text().to_string(),
line_ending: proto::serialize_line_ending(self.line_ending()) as i32,
};
(state, operations)
}
}
pub fn serialize_ops(&self, cx: &AppContext) -> Task<Vec<proto::Operation>> {
let mut operations = Vec::new();
operations.extend(self.deferred_ops.iter().map(proto::serialize_operation));
operations.extend(self.remote_selections.iter().map(|(_, set)| {
proto::serialize_operation(&Operation::UpdateSelections {
selections: set.selections.clone(),
lamport_timestamp: set.lamport_timestamp,
line_mode: set.line_mode,
})
}));
operations.push(proto::serialize_operation(&Operation::UpdateDiagnostics {
diagnostics: self.diagnostics.iter().cloned().collect(),
lamport_timestamp: self.diagnostics_timestamp,
}));
operations.push(proto::serialize_operation(
&Operation::UpdateCompletionTriggers {
triggers: self.completion_triggers.clone(),
lamport_timestamp: self.completion_triggers_timestamp,
},
));
let text_operations = self.text.operations().clone();
cx.background().spawn(async move {
operations.extend(
text_operations
.iter()
.map(|(_, op)| proto::serialize_operation(&Operation::Buffer(op.clone()))),
);
operations.sort_unstable_by_key(proto::lamport_timestamp_for_operation);
operations
})
}
pub fn with_language(mut self, language: Arc<Language>, cx: &mut ModelContext<Self>) -> Self {

View File

@ -1048,7 +1048,8 @@ fn test_serialization(cx: &mut gpui::MutableAppContext) {
});
assert_eq!(buffer1.read(cx).text(), "abcDF");
let (state, ops) = buffer1.read(cx).to_proto();
let state = buffer1.read(cx).to_proto();
let ops = cx.background().block(buffer1.read(cx).serialize_ops(cx));
let buffer2 = cx.add_model(|cx| {
let mut buffer = Buffer::from_proto(1, state, None).unwrap();
buffer
@ -1086,7 +1087,10 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
for i in 0..rng.gen_range(min_peers..=max_peers) {
let buffer = cx.add_model(|cx| {
let (state, ops) = base_buffer.read(cx).to_proto();
let state = base_buffer.read(cx).to_proto();
let ops = cx
.background()
.block(base_buffer.read(cx).serialize_ops(cx));
let mut buffer = Buffer::from_proto(i as ReplicaId, state, None).unwrap();
buffer
.apply_ops(
@ -1181,7 +1185,8 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
mutation_count -= 1;
}
50..=59 if replica_ids.len() < max_peers => {
let (old_buffer_state, old_buffer_ops) = buffer.read(cx).to_proto();
let old_buffer_state = buffer.read(cx).to_proto();
let old_buffer_ops = cx.background().block(buffer.read(cx).serialize_ops(cx));
let new_replica_id = (0..=replica_ids.len() as ReplicaId)
.filter(|replica_id| *replica_id != buffer.read(cx).replica_id())
.choose(&mut rng)

View File

@ -5693,11 +5693,15 @@ impl Project {
if let Some(project_id) = self.remote_id() {
let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
if shared_buffers.insert(buffer_id) {
let (state, mut operations) = buffer.read(cx).to_proto();
let buffer = buffer.read(cx);
let state = buffer.to_proto();
let operations = buffer.serialize_ops(cx);
let client = self.client.clone();
cx.background()
.spawn(
async move {
let mut operations = operations.await;
client.send(proto::CreateBufferForPeer {
project_id,
peer_id: peer_id.0,

View File

@ -45,7 +45,7 @@ use std::{
};
pub use subscription::*;
pub use sum_tree::Bias;
use sum_tree::{FilterCursor, SumTree};
use sum_tree::{FilterCursor, SumTree, TreeMap};
lazy_static! {
static ref CARRIAGE_RETURNS_REGEX: Regex = Regex::new("\r\n|\r").unwrap();
@ -109,7 +109,7 @@ impl HistoryEntry {
struct History {
// TODO: Turn this into a String or Rope, maybe.
base_text: Arc<str>,
operations: HashMap<clock::Local, Operation>,
operations: TreeMap<clock::Local, Operation>,
insertion_slices: HashMap<clock::Local, Vec<InsertionSlice>>,
undo_stack: Vec<HistoryEntry>,
redo_stack: Vec<HistoryEntry>,
@ -1213,8 +1213,8 @@ impl Buffer {
&self.history.base_text
}
pub fn history(&self) -> impl Iterator<Item = &Operation> {
self.history.operations.values()
pub fn operations(&self) -> &TreeMap<clock::Local, Operation> {
&self.history.operations
}
pub fn undo_history(&self) -> impl Iterator<Item = (&clock::Local, &[(clock::Local, u32)])> {