From d358072c741e62e5c67b091a142ff70c9f748778 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Mon, 14 Feb 2022 17:13:50 -0800 Subject: [PATCH] Include the desired version in a SaveBuffer RPC request When handling this messages on the host, wait until the desired version has been observed before performing the save. Co-Authored-By: Nathan Sobo --- crates/language/src/buffer.rs | 4 ++++ crates/project/src/project.rs | 16 ++++++++++++---- crates/project/src/worktree.rs | 1 + crates/rpc/proto/zed.proto | 1 + crates/server/src/rpc.rs | 9 ++------- crates/text/src/text.rs | 16 +++++++++++++++- 6 files changed, 35 insertions(+), 12 deletions(-) diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index f19ff21081..b4543b02b0 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -1283,6 +1283,10 @@ impl Buffer { self.text.wait_for_edits(edit_ids) } + pub fn wait_for_version(&mut self, version: clock::Global) -> impl Future { + self.text.wait_for_version(version) + } + pub fn set_active_selections( &mut self, selections: Arc<[Selection]>, diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index e9cbfa808a..027ae52c2f 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -2254,21 +2254,29 @@ impl Project { ) -> Result { let buffer_id = envelope.payload.buffer_id; let sender_id = envelope.original_sender_id()?; - let (project_id, save) = this.update(&mut cx, |this, cx| { + let requested_version = envelope.payload.version.try_into()?; + + let (project_id, buffer) = this.update(&mut cx, |this, _| { let project_id = this.remote_id().ok_or_else(|| anyhow!("not connected"))?; let buffer = this .shared_buffers .get(&sender_id) .and_then(|shared_buffers| shared_buffers.get(&buffer_id).cloned()) .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?; - Ok::<_, anyhow::Error>((project_id, buffer.update(cx, |buffer, cx| buffer.save(cx)))) + Ok::<_, anyhow::Error>((project_id, buffer)) })?; - let (version, mtime) = save.await?; + buffer + .update(&mut cx, |buffer, _| { + buffer.wait_for_version(requested_version) + }) + .await; + + let (saved_version, mtime) = buffer.update(&mut cx, |buffer, cx| buffer.save(cx)).await?; Ok(proto::BufferSaved { project_id, buffer_id, - version: (&version).into(), + version: (&saved_version).into(), mtime: Some(mtime.into()), }) } diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 675f650342..a5d50466b6 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -1404,6 +1404,7 @@ impl language::File for File { .request(proto::SaveBuffer { project_id, buffer_id, + version: (&version).into(), }) .await?; let version = response.version.try_into()?; diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 5803b24c27..febe00e601 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -192,6 +192,7 @@ message UpdateBufferFile { message SaveBuffer { uint64 project_id = 1; uint64 buffer_id = 2; + repeated VectorClockEntry version = 3; } message BufferSaved { diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 7ba2420f01..1825bdb7b3 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -1604,14 +1604,12 @@ mod tests { ) .await .unwrap(); - let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap()); // Open a buffer as client B let buffer_b = project_b .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)) .await .unwrap(); - let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime()); buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx)); buffer_b.read_with(&cx_b, |buf, _| { @@ -1623,13 +1621,10 @@ mod tests { .update(&mut cx_b, |buf, cx| buf.save(cx)) .await .unwrap(); - worktree_b - .condition(&cx_b, |_, cx| { - buffer_b.read(cx).file().unwrap().mtime() != mtime - }) + buffer_b + .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty()) .await; buffer_b.read_with(&cx_b, |buf, _| { - assert!(!buf.is_dirty()); assert!(!buf.has_conflict()); }); diff --git a/crates/text/src/text.rs b/crates/text/src/text.rs index 9b7f8dd230..da003b5d44 100644 --- a/crates/text/src/text.rs +++ b/crates/text/src/text.rs @@ -21,7 +21,7 @@ use operation_queue::OperationQueue; pub use patch::Patch; pub use point::*; pub use point_utf16::*; -use postage::{oneshot, prelude::*}; +use postage::{barrier, oneshot, prelude::*}; #[cfg(any(test, feature = "test-support"))] pub use random_char_iter::*; use rope::TextDimension; @@ -53,6 +53,7 @@ pub struct Buffer { pub lamport_clock: clock::Lamport, subscriptions: Topic, edit_id_resolvers: HashMap>>, + version_barriers: Vec<(clock::Global, barrier::Sender)>, } #[derive(Clone, Debug)] @@ -574,6 +575,7 @@ impl Buffer { lamport_clock, subscriptions: Default::default(), edit_id_resolvers: Default::default(), + version_barriers: Default::default(), } } @@ -835,6 +837,8 @@ impl Buffer { } } } + self.version_barriers + .retain(|(version, _)| !self.snapshot.version().observed_all(version)); Ok(()) } @@ -1305,6 +1309,16 @@ impl Buffer { } } + pub fn wait_for_version(&mut self, version: clock::Global) -> impl Future { + let (tx, mut rx) = barrier::channel(); + if !self.snapshot.version.observed_all(&version) { + self.version_barriers.push((version, tx)); + } + async move { + rx.recv().await; + } + } + fn resolve_edit(&mut self, edit_id: clock::Local) { for mut tx in self .edit_id_resolvers