From 924eb622ae791f56dfbc6f797eb3bc84d95e9904 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 2 Feb 2022 17:01:23 +0100 Subject: [PATCH] Wait for additional edits before pushing transaction in remote buffer --- Cargo.lock | 1 + crates/language/src/buffer.rs | 2 ++ crates/text/Cargo.toml | 1 + crates/text/src/text.rs | 38 ++++++++++++++++++++++++++++++++++- 4 files changed, 41 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 719e231f96..ce979fb313 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4904,6 +4904,7 @@ dependencies = [ "lazy_static", "log", "parking_lot", + "postage", "rand 0.8.3", "smallvec", "sum_tree", diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index f4554e0d45..00dee0a57c 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -1910,6 +1910,8 @@ impl Buffer { ); cx.spawn(|this, mut cx| async move { let edit_ids = apply_edits.await?; + this.update(&mut cx, |this, _| this.text.wait_for_edits(&edit_ids)) + .await; if push_to_history { this.update(&mut cx, |this, _| { this.text diff --git a/crates/text/Cargo.toml b/crates/text/Cargo.toml index edc1ca7846..04648d203f 100644 --- a/crates/text/Cargo.toml +++ b/crates/text/Cargo.toml @@ -18,6 +18,7 @@ arrayvec = "0.7.1" lazy_static = "1.4" log = "0.4" parking_lot = "0.11" +postage = { version = "0.4.1", features = ["futures-traits"] } rand = { version = "0.8.3", optional = true } smallvec = { version = "1.6", features = ["union"] } diff --git a/crates/text/src/text.rs b/crates/text/src/text.rs index ad9857e264..088cd51cbe 100644 --- a/crates/text/src/text.rs +++ b/crates/text/src/text.rs @@ -21,6 +21,7 @@ use operation_queue::OperationQueue; pub use patch::Patch; pub use point::*; pub use point_utf16::*; +use postage::{oneshot, prelude::*}; #[cfg(any(test, feature = "test-support"))] pub use random_char_iter::*; use rope::TextDimension; @@ -28,6 +29,7 @@ pub use rope::{Chunks, Rope, TextSummary}; pub use selection::*; use std::{ cmp::{self, Ordering}, + future::Future, iter::Iterator, ops::{self, Deref, Range, Sub}, str, @@ -50,6 +52,7 @@ pub struct Buffer { local_clock: clock::Local, pub lamport_clock: clock::Lamport, subscriptions: Topic, + edit_id_resolvers: HashMap>>, } #[derive(Clone, Debug)] @@ -538,6 +541,7 @@ impl Buffer { local_clock, lamport_clock, subscriptions: Default::default(), + edit_id_resolvers: Default::default(), } } @@ -579,6 +583,7 @@ impl Buffer { value: lamport_timestamp, }, subscriptions: Default::default(), + edit_id_resolvers: Default::default(), snapshot: BufferSnapshot { replica_id, visible_text, @@ -833,6 +838,7 @@ impl Buffer { edit.timestamp, ); self.snapshot.version.observe(edit.timestamp.local()); + self.resolve_edit(edit.timestamp.local()); self.history.push(edit); } } @@ -1213,7 +1219,6 @@ impl Buffer { pub fn undo(&mut self) -> Option<(TransactionId, Operation)> { if let Some(transaction) = self.history.pop_undo().cloned() { - dbg!(&transaction); let transaction_id = transaction.id; let op = self.undo_or_redo(transaction).unwrap(); Some((transaction_id, op)) @@ -1286,6 +1291,37 @@ impl Buffer { pub fn subscribe(&mut self) -> Subscription { self.subscriptions.subscribe() } + + pub fn wait_for_edits( + &mut self, + edit_ids: &[clock::Local], + ) -> impl 'static + Future { + let mut futures = Vec::new(); + for edit_id in edit_ids { + if !self.version.observed(*edit_id) { + let (tx, rx) = oneshot::channel(); + self.edit_id_resolvers.entry(*edit_id).or_default().push(tx); + futures.push(rx); + } + } + + async move { + for mut future in futures { + future.recv().await; + } + } + } + + fn resolve_edit(&mut self, edit_id: clock::Local) { + for mut tx in self + .edit_id_resolvers + .remove(&edit_id) + .into_iter() + .flatten() + { + let _ = tx.try_send(()); + } + } } #[cfg(any(test, feature = "test-support"))]