From 954695f5fed0e6d0dbf7e37c868e9ae1a948b711 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 23 Aug 2022 16:05:56 +0200 Subject: [PATCH] Stream buffer ops in the background when creating buffer for peers --- crates/editor/src/multi_buffer.rs | 12 ++- crates/language/src/buffer.rs | 18 ++-- crates/language/src/proto.rs | 89 ++++++++++---------- crates/language/src/tests.rs | 39 +++++++-- crates/project/src/project.rs | 133 +++++++++++++++++++++++------- crates/rpc/proto/zed.proto | 16 +++- 6 files changed, 209 insertions(+), 98 deletions(-) diff --git a/crates/editor/src/multi_buffer.rs b/crates/editor/src/multi_buffer.rs index 53021870c6..d7c1526cd4 100644 --- a/crates/editor/src/multi_buffer.rs +++ b/crates/editor/src/multi_buffer.rs @@ -3304,8 +3304,16 @@ mod tests { fn test_remote_multibuffer(cx: &mut MutableAppContext) { let host_buffer = cx.add_model(|cx| Buffer::new(0, "a", cx)); let guest_buffer = cx.add_model(|cx| { - let message = host_buffer.read(cx).to_proto(); - Buffer::from_proto(1, message, None, cx).unwrap() + let (state, ops) = host_buffer.read(cx).to_proto(); + let mut buffer = Buffer::from_proto(1, state, None).unwrap(); + buffer + .apply_ops( + ops.into_iter() + .map(|op| language::proto::deserialize_operation(op).unwrap()), + cx, + ) + .unwrap(); + buffer }); let multibuffer = cx.add_model(|cx| MultiBuffer::singleton(guest_buffer.clone(), cx)); let snapshot = multibuffer.read(cx).snapshot(cx); diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index 86b0b0b92b..99efcc55b0 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -359,9 +359,8 @@ impl Buffer { pub fn from_proto( replica_id: ReplicaId, - message: proto::Buffer, + message: proto::BufferState, file: Option>, - cx: &mut ModelContext, ) -> Result { let buffer = TextBuffer::new(replica_id, message.id, message.base_text); let mut this = Self::build(buffer, file); @@ -369,17 +368,10 @@ impl Buffer { proto::LineEnding::from_i32(message.line_ending) .ok_or_else(|| anyhow!("missing line_ending"))?, )); - let ops = message - .operations - .into_iter() - .map(proto::deserialize_operation) - .collect::>>()?; - this.apply_ops(ops, cx)?; - Ok(this) } - pub fn to_proto(&self) -> proto::Buffer { + pub fn to_proto(&self) -> (proto::BufferState, Vec) { let mut operations = self .text .history() @@ -406,13 +398,13 @@ impl Buffer { ))) .collect::>(); operations.sort_unstable_by_key(proto::lamport_timestamp_for_operation); - proto::Buffer { + let state = proto::BufferState { id: self.remote_id(), file: self.file.as_ref().map(|f| f.to_proto()), base_text: self.base_text().to_string(), - operations, line_ending: proto::serialize_line_ending(self.line_ending()) as i32, - } + }; + (state, operations) } pub fn with_language(mut self, language: Arc, cx: &mut ModelContext) -> Self { diff --git a/crates/language/src/proto.rs b/crates/language/src/proto.rs index c01973b1c6..fddfb7961f 100644 --- a/crates/language/src/proto.rs +++ b/crates/language/src/proto.rs @@ -1,6 +1,5 @@ use crate::{ diagnostic_set::DiagnosticEntry, CodeAction, CodeLabel, Completion, Diagnostic, Language, - Operation, }; use anyhow::{anyhow, Result}; use clock::ReplicaId; @@ -9,7 +8,7 @@ use rpc::proto; use std::{ops::Range, sync::Arc}; use text::*; -pub use proto::{Buffer, LineEnding, SelectionSet}; +pub use proto::{BufferState, LineEnding, Operation, SelectionSet}; pub fn deserialize_line_ending(message: proto::LineEnding) -> text::LineEnding { match message { @@ -25,13 +24,13 @@ pub fn serialize_line_ending(message: text::LineEnding) -> proto::LineEnding { } } -pub fn serialize_operation(operation: &Operation) -> proto::Operation { +pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation { proto::Operation { variant: Some(match operation { - Operation::Buffer(text::Operation::Edit(edit)) => { + crate::Operation::Buffer(text::Operation::Edit(edit)) => { proto::operation::Variant::Edit(serialize_edit_operation(edit)) } - Operation::Buffer(text::Operation::Undo { + crate::Operation::Buffer(text::Operation::Undo { undo, lamport_timestamp, }) => proto::operation::Variant::Undo(proto::operation::Undo { @@ -49,7 +48,7 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation { }) .collect(), }), - Operation::UpdateSelections { + crate::Operation::UpdateSelections { selections, line_mode, lamport_timestamp, @@ -59,7 +58,7 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation { selections: serialize_selections(selections), line_mode: *line_mode, }), - Operation::UpdateDiagnostics { + crate::Operation::UpdateDiagnostics { diagnostics, lamport_timestamp, } => proto::operation::Variant::UpdateDiagnostics(proto::UpdateDiagnostics { @@ -67,7 +66,7 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation { lamport_timestamp: lamport_timestamp.value, diagnostics: serialize_diagnostics(diagnostics.iter()), }), - Operation::UpdateCompletionTriggers { + crate::Operation::UpdateCompletionTriggers { triggers, lamport_timestamp, } => proto::operation::Variant::UpdateCompletionTriggers( @@ -165,41 +164,43 @@ pub fn serialize_anchor(anchor: &Anchor) -> proto::Anchor { } } -pub fn deserialize_operation(message: proto::Operation) -> Result { +pub fn deserialize_operation(message: proto::Operation) -> Result { Ok( match message .variant .ok_or_else(|| anyhow!("missing operation variant"))? { proto::operation::Variant::Edit(edit) => { - Operation::Buffer(text::Operation::Edit(deserialize_edit_operation(edit))) + crate::Operation::Buffer(text::Operation::Edit(deserialize_edit_operation(edit))) } - proto::operation::Variant::Undo(undo) => Operation::Buffer(text::Operation::Undo { - lamport_timestamp: clock::Lamport { - replica_id: undo.replica_id as ReplicaId, - value: undo.lamport_timestamp, - }, - undo: UndoOperation { - id: clock::Local { + proto::operation::Variant::Undo(undo) => { + crate::Operation::Buffer(text::Operation::Undo { + lamport_timestamp: clock::Lamport { replica_id: undo.replica_id as ReplicaId, - value: undo.local_timestamp, + value: undo.lamport_timestamp, }, - version: deserialize_version(undo.version), - counts: undo - .counts - .into_iter() - .map(|c| { - ( - clock::Local { - replica_id: c.replica_id as ReplicaId, - value: c.local_timestamp, - }, - c.count, - ) - }) - .collect(), - }, - }), + undo: UndoOperation { + id: clock::Local { + replica_id: undo.replica_id as ReplicaId, + value: undo.local_timestamp, + }, + version: deserialize_version(undo.version), + counts: undo + .counts + .into_iter() + .map(|c| { + ( + clock::Local { + replica_id: c.replica_id as ReplicaId, + value: c.local_timestamp, + }, + c.count, + ) + }) + .collect(), + }, + }) + } proto::operation::Variant::UpdateSelections(message) => { let selections = message .selections @@ -215,7 +216,7 @@ pub fn deserialize_operation(message: proto::Operation) -> Result { }) .collect::>(); - Operation::UpdateSelections { + crate::Operation::UpdateSelections { lamport_timestamp: clock::Lamport { replica_id: message.replica_id as ReplicaId, value: message.lamport_timestamp, @@ -224,15 +225,17 @@ pub fn deserialize_operation(message: proto::Operation) -> Result { line_mode: message.line_mode, } } - proto::operation::Variant::UpdateDiagnostics(message) => Operation::UpdateDiagnostics { - diagnostics: deserialize_diagnostics(message.diagnostics), - lamport_timestamp: clock::Lamport { - replica_id: message.replica_id as ReplicaId, - value: message.lamport_timestamp, - }, - }, + proto::operation::Variant::UpdateDiagnostics(message) => { + crate::Operation::UpdateDiagnostics { + diagnostics: deserialize_diagnostics(message.diagnostics), + lamport_timestamp: clock::Lamport { + replica_id: message.replica_id as ReplicaId, + value: message.lamport_timestamp, + }, + } + } proto::operation::Variant::UpdateCompletionTriggers(message) => { - Operation::UpdateCompletionTriggers { + crate::Operation::UpdateCompletionTriggers { triggers: message.triggers, lamport_timestamp: clock::Lamport { replica_id: message.replica_id as ReplicaId, diff --git a/crates/language/src/tests.rs b/crates/language/src/tests.rs index 44c15d1a3b..e6686d1761 100644 --- a/crates/language/src/tests.rs +++ b/crates/language/src/tests.rs @@ -2,6 +2,7 @@ use super::*; use clock::ReplicaId; use collections::BTreeMap; use gpui::{ModelHandle, MutableAppContext}; +use proto::deserialize_operation; use rand::prelude::*; use settings::Settings; use std::{ @@ -1047,8 +1048,18 @@ fn test_serialization(cx: &mut gpui::MutableAppContext) { }); assert_eq!(buffer1.read(cx).text(), "abcDF"); - let message = buffer1.read(cx).to_proto(); - let buffer2 = cx.add_model(|cx| Buffer::from_proto(1, message, None, cx).unwrap()); + let (state, ops) = buffer1.read(cx).to_proto(); + let buffer2 = cx.add_model(|cx| { + let mut buffer = Buffer::from_proto(1, state, None).unwrap(); + buffer + .apply_ops( + ops.into_iter() + .map(|op| proto::deserialize_operation(op).unwrap()), + cx, + ) + .unwrap(); + buffer + }); assert_eq!(buffer2.read(cx).text(), "abcDF"); } @@ -1075,9 +1086,15 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { for i in 0..rng.gen_range(min_peers..=max_peers) { let buffer = cx.add_model(|cx| { - let mut buffer = - Buffer::from_proto(i as ReplicaId, base_buffer.read(cx).to_proto(), None, cx) - .unwrap(); + let (state, ops) = base_buffer.read(cx).to_proto(); + let mut buffer = Buffer::from_proto(i as ReplicaId, state, None).unwrap(); + buffer + .apply_ops( + ops.into_iter() + .map(|op| proto::deserialize_operation(op).unwrap()), + cx, + ) + .unwrap(); buffer.set_group_interval(Duration::from_millis(rng.gen_range(0..=200))); let network = network.clone(); cx.subscribe(&cx.handle(), move |buffer, _, event, _| { @@ -1164,7 +1181,7 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { mutation_count -= 1; } 50..=59 if replica_ids.len() < max_peers => { - let old_buffer = buffer.read(cx).to_proto(); + let (old_buffer_state, old_buffer_ops) = buffer.read(cx).to_proto(); let new_replica_id = (0..=replica_ids.len() as ReplicaId) .filter(|replica_id| *replica_id != buffer.read(cx).replica_id()) .choose(&mut rng) @@ -1176,7 +1193,15 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { ); new_buffer = Some(cx.add_model(|cx| { let mut new_buffer = - Buffer::from_proto(new_replica_id, old_buffer, None, cx).unwrap(); + Buffer::from_proto(new_replica_id, old_buffer_state, None).unwrap(); + new_buffer + .apply_ops( + old_buffer_ops + .into_iter() + .map(|op| deserialize_operation(op).unwrap()), + cx, + ) + .unwrap(); log::info!( "New replica {} text: {:?}", new_buffer.replica_id(), diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index dcab44e373..89cbb868a0 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -123,6 +123,7 @@ pub struct Project { loading_local_worktrees: HashMap, Shared, Arc>>>>, opened_buffers: HashMap, + incomplete_buffers: HashMap>, buffer_snapshots: HashMap>, nonce: u128, initialized_persistent_state: bool, @@ -144,7 +145,7 @@ pub enum JoinProjectError { enum OpenBuffer { Strong(ModelHandle), Weak(WeakModelHandle), - Loading(Vec), + Operations(Vec), } enum WorktreeHandle { @@ -461,6 +462,7 @@ impl Project { collaborators: Default::default(), opened_buffers: Default::default(), shared_buffers: Default::default(), + incomplete_buffers: Default::default(), loading_buffers: Default::default(), loading_local_worktrees: Default::default(), buffer_snapshots: Default::default(), @@ -550,6 +552,7 @@ impl Project { loading_buffers: Default::default(), opened_buffer: watch::channel(), shared_buffers: Default::default(), + incomplete_buffers: Default::default(), loading_local_worktrees: Default::default(), active_entry: None, collaborators: Default::default(), @@ -1331,7 +1334,7 @@ impl Project { *open_buffer = OpenBuffer::Strong(buffer); } } - OpenBuffer::Loading(_) => unreachable!(), + OpenBuffer::Operations(_) => unreachable!(), } } @@ -1456,6 +1459,10 @@ impl Project { } cx.emit(Event::DisconnectedFromHost); cx.notify(); + + // Wake up all futures currently waiting on a buffer to get opened, + // to give them a chance to fail now that we've disconnected. + *self.opened_buffer.0.borrow_mut() = (); } } @@ -1757,7 +1764,7 @@ impl Project { match self.opened_buffers.insert(remote_id, open_buffer) { None => {} - Some(OpenBuffer::Loading(operations)) => { + Some(OpenBuffer::Operations(operations)) => { buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))? } Some(OpenBuffer::Weak(existing_handle)) => { @@ -5107,7 +5114,7 @@ impl Project { OpenBuffer::Strong(buffer) => { buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?; } - OpenBuffer::Loading(operations) => operations.extend_from_slice(&ops), + OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops), OpenBuffer::Weak(_) => {} }, hash_map::Entry::Vacant(e) => { @@ -5116,7 +5123,7 @@ impl Project { "received buffer update from {:?}", envelope.original_sender_id ); - e.insert(OpenBuffer::Loading(ops)); + e.insert(OpenBuffer::Operations(ops)); } } Ok(()) @@ -5130,24 +5137,52 @@ impl Project { mut cx: AsyncAppContext, ) -> Result<()> { this.update(&mut cx, |this, cx| { - let mut buffer = envelope + match envelope .payload - .buffer - .ok_or_else(|| anyhow!("invalid buffer"))?; - let mut buffer_file = None; - if let Some(file) = buffer.file.take() { - let worktree_id = WorktreeId::from_proto(file.worktree_id); - let worktree = this - .worktree_for_id(worktree_id, cx) - .ok_or_else(|| anyhow!("no worktree found for id {}", file.worktree_id))?; - buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?) - as Arc); - } + .variant + .ok_or_else(|| anyhow!("missing variant"))? + { + proto::create_buffer_for_peer::Variant::State(mut state) => { + let mut buffer_file = None; + if let Some(file) = state.file.take() { + let worktree_id = WorktreeId::from_proto(file.worktree_id); + let worktree = this.worktree_for_id(worktree_id, cx).ok_or_else(|| { + anyhow!("no worktree found for id {}", file.worktree_id) + })?; + buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?) + as Arc); + } - let buffer = cx.add_model(|cx| { - Buffer::from_proto(this.replica_id(), buffer, buffer_file, cx).unwrap() - }); - this.register_buffer(&buffer, cx)?; + let buffer_id = state.id; + let buffer = cx.add_model(|_| { + Buffer::from_proto(this.replica_id(), state, buffer_file).unwrap() + }); + this.incomplete_buffers.insert(buffer_id, buffer); + } + proto::create_buffer_for_peer::Variant::Chunk(chunk) => { + let buffer = this + .incomplete_buffers + .get(&chunk.buffer_id) + .ok_or_else(|| { + anyhow!( + "received chunk for buffer {} without initial state", + chunk.buffer_id + ) + })? + .clone(); + let operations = chunk + .operations + .into_iter() + .map(language::proto::deserialize_operation) + .collect::>>()?; + buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?; + + if chunk.is_last { + this.incomplete_buffers.remove(&chunk.buffer_id); + this.register_buffer(&buffer, cx)?; + } + } + } Ok(()) }) @@ -5658,13 +5693,50 @@ impl Project { if let Some(project_id) = self.remote_id() { let shared_buffers = self.shared_buffers.entry(peer_id).or_default(); if shared_buffers.insert(buffer_id) { - self.client - .send(proto::CreateBufferForPeer { - project_id, - peer_id: peer_id.0, - buffer: Some(buffer.read(cx).to_proto()), - }) - .log_err(); + let (state, mut operations) = buffer.read(cx).to_proto(); + let client = self.client.clone(); + cx.background() + .spawn( + async move { + client.send(proto::CreateBufferForPeer { + project_id, + peer_id: peer_id.0, + variant: Some(proto::create_buffer_for_peer::Variant::State(state)), + })?; + + loop { + #[cfg(any(test, feature = "test-support"))] + const CHUNK_SIZE: usize = 5; + + #[cfg(not(any(test, feature = "test-support")))] + const CHUNK_SIZE: usize = 100; + + let chunk = operations + .drain(..cmp::min(CHUNK_SIZE, operations.len())) + .collect(); + let is_last = operations.is_empty(); + client.send(proto::CreateBufferForPeer { + project_id, + peer_id: peer_id.0, + variant: Some(proto::create_buffer_for_peer::Variant::Chunk( + proto::BufferChunk { + buffer_id, + operations: chunk, + is_last, + }, + )), + })?; + + if is_last { + break; + } + } + + Ok(()) + } + .log_err(), + ) + .detach(); } } @@ -5686,7 +5758,10 @@ impl Project { }); if let Some(buffer) = buffer { break buffer; + } else if this.read_with(&cx, |this, _| this.is_read_only()) { + return Err(anyhow!("disconnected before buffer {} could be opened", id)); } + opened_buffer_rx .next() .await @@ -6026,7 +6101,7 @@ impl OpenBuffer { match self { OpenBuffer::Strong(handle) => Some(handle.clone()), OpenBuffer::Weak(handle) => handle.upgrade(cx), - OpenBuffer::Loading(_) => None, + OpenBuffer::Operations(_) => None, } } } diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 0da6d4d360..4cf7e38a13 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -370,7 +370,10 @@ message OpenBufferResponse { message CreateBufferForPeer { uint64 project_id = 1; uint32 peer_id = 2; - Buffer buffer = 3; + oneof variant { + BufferState state = 3; + BufferChunk chunk = 4; + } } message UpdateBuffer { @@ -808,12 +811,17 @@ message Entry { bool is_ignored = 7; } -message Buffer { +message BufferState { uint64 id = 1; optional File file = 2; string base_text = 3; - repeated Operation operations = 4; - LineEnding line_ending = 5; + LineEnding line_ending = 4; +} + +message BufferChunk { + uint64 buffer_id = 1; + repeated Operation operations = 2; + bool is_last = 3; } enum LineEnding {