diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index cfea7cbc16..8473e28c66 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -159,9 +159,7 @@ impl Server { let span = info_span!( "handle message", payload_type = envelope.payload_type_name(), - payload = serde_json::to_string_pretty(&envelope.payload) - .unwrap() - .as_str(), + payload = format!("{:?}", envelope.payload).as_str(), ); let future = (handler)(server, *envelope); async move { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 7d1aacdde9..26a9ed14d7 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -719,14 +719,14 @@ impl Project { is_directory: false, }) .await?; - worktree.update(&mut cx, |worktree, _| { - let worktree = worktree.as_remote_mut().unwrap(); - worktree.snapshot.insert_entry( - response - .entry - .ok_or_else(|| anyhow!("missing entry in response"))?, - ) - }) + let entry = response + .entry + .ok_or_else(|| anyhow!("missing entry in response"))?; + worktree + .update(&mut cx, |worktree, cx| { + worktree.as_remote().unwrap().insert_entry(entry, cx) + }) + .await })) } } @@ -758,15 +758,14 @@ impl Project { new_path: new_path.as_os_str().as_bytes().to_vec(), }) .await?; - worktree.update(&mut cx, |worktree, _| { - let worktree = worktree.as_remote_mut().unwrap(); - worktree.snapshot.remove_entry(entry_id); - worktree.snapshot.insert_entry( - response - .entry - .ok_or_else(|| anyhow!("missing entry in response"))?, - ) - }) + let entry = response + .entry + .ok_or_else(|| anyhow!("missing entry in response"))?; + worktree + .update(&mut cx, |worktree, cx| { + worktree.as_remote().unwrap().insert_entry(entry, cx) + }) + .await })) } } diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 7b3c700911..2efeea1645 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -29,6 +29,7 @@ use language::{ use lazy_static::lazy_static; use parking_lot::Mutex; use postage::{ + barrier, prelude::{Sink as _, Stream as _}, watch, }; @@ -79,16 +80,21 @@ pub struct LocalWorktree { } pub struct RemoteWorktree { - pub(crate) snapshot: Snapshot, + pub snapshot: Snapshot, + pub(crate) background_snapshot: Arc>, project_id: u64, - snapshot_rx: watch::Receiver, client: Arc, - updates_tx: UnboundedSender, + updates_tx: UnboundedSender, replica_id: ReplicaId, diagnostic_summaries: TreeMap, visible: bool, } +enum BackgroundUpdate { + Update(proto::UpdateWorktree), + Barrier(barrier::Sender), +} + #[derive(Clone)] pub struct Snapshot { id: WorktreeId, @@ -218,13 +224,14 @@ impl Worktree { }; let (updates_tx, mut updates_rx) = mpsc::unbounded(); - let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone()); + let background_snapshot = Arc::new(Mutex::new(snapshot.clone())); + let (mut snapshot_updated_tx, mut snapshot_updated_rx) = watch::channel(); let worktree_handle = cx.add_model(|_: &mut ModelContext| { Worktree::Remote(RemoteWorktree { project_id: project_remote_id, replica_id, snapshot: snapshot.clone(), - snapshot_rx: snapshot_rx.clone(), + background_snapshot: background_snapshot.clone(), updates_tx, client: client.clone(), diagnostic_summaries: TreeMap::from_ordered_entries( @@ -275,37 +282,40 @@ impl Worktree { .await; { - let mut snapshot = snapshot_tx.borrow_mut(); + let mut snapshot = background_snapshot.lock(); snapshot.entries_by_path = entries_by_path; snapshot.entries_by_id = entries_by_id; + snapshot_updated_tx.send(()).await.ok(); } cx.background() .spawn(async move { while let Some(update) = updates_rx.next().await { - let mut snapshot = snapshot_tx.borrow().clone(); - if let Err(error) = snapshot.apply_remote_update(update) { - log::error!("error applying worktree update: {}", error); + if let BackgroundUpdate::Update(update) = update { + if let Err(error) = + background_snapshot.lock().apply_remote_update(update) + { + log::error!("error applying worktree update: {}", error); + } + snapshot_updated_tx.send(()).await.ok(); } - *snapshot_tx.borrow_mut() = snapshot; } }) .detach(); - { - let mut snapshot_rx = snapshot_rx.clone(); + cx.spawn(|mut cx| { let this = worktree_handle.downgrade(); - cx.spawn(|mut cx| async move { - while let Some(_) = snapshot_rx.recv().await { + async move { + while let Some(_) = snapshot_updated_rx.recv().await { if let Some(this) = this.upgrade(&cx) { this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); } else { break; } } - }) - .detach(); - } + } + }) + .detach(); } }); (worktree_handle, deserialize_task) @@ -411,7 +421,7 @@ impl Worktree { } } Self::Remote(worktree) => { - worktree.snapshot = worktree.snapshot_rx.borrow().clone(); + worktree.snapshot = worktree.background_snapshot.lock().clone(); cx.emit(Event::UpdatedEntries); } }; @@ -923,12 +933,21 @@ impl RemoteWorktree { envelope: TypedEnvelope, ) -> Result<()> { self.updates_tx - .unbounded_send(envelope.payload) + .unbounded_send(BackgroundUpdate::Update(envelope.payload)) .expect("consumer runs to completion"); - Ok(()) } + pub fn finish_pending_remote_updates(&self) -> impl Future { + let (tx, mut rx) = barrier::channel(); + self.updates_tx + .unbounded_send(BackgroundUpdate::Barrier(tx)) + .expect("consumer runs to completion"); + async move { + rx.recv().await; + } + } + pub fn update_diagnostic_summary( &mut self, path: Arc, @@ -945,6 +964,29 @@ impl RemoteWorktree { .insert(PathKey(path.clone()), summary); } } + + pub fn insert_entry( + &self, + entry: proto::Entry, + cx: &mut ModelContext, + ) -> Task> { + cx.spawn(|this, mut cx| async move { + this.update(&mut cx, |worktree, _| { + worktree + .as_remote_mut() + .unwrap() + .finish_pending_remote_updates() + }) + .await; + this.update(&mut cx, |worktree, _| { + let worktree = worktree.as_remote_mut().unwrap(); + let mut snapshot = worktree.background_snapshot.lock(); + let entry = snapshot.insert_entry(entry); + worktree.snapshot = snapshot.clone(); + entry + }) + }) + } } impl Snapshot { @@ -956,17 +998,9 @@ impl Snapshot { self.entries_by_id.get(&entry_id, &()).is_some() } - pub(crate) fn remove_entry(&mut self, entry_id: ProjectEntryId) -> Option { - if let Some(entry) = self.entries_by_id.remove(&entry_id, &()) { - self.entries_by_path.remove(&PathKey(entry.path), &()) - } else { - None - } - } - pub(crate) fn insert_entry(&mut self, entry: proto::Entry) -> Result { let entry = Entry::try_from((&self.root_char_bag, entry))?; - self.entries_by_id.insert_or_replace( + let old_entry = self.entries_by_id.insert_or_replace( PathEntry { id: entry.id, path: entry.path.clone(), @@ -975,6 +1009,9 @@ impl Snapshot { }, &(), ); + if let Some(old_entry) = old_entry { + self.entries_by_path.remove(&PathKey(old_entry.path), &()); + } self.entries_by_path.insert_or_replace(entry.clone(), &()); Ok(entry) } diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index b2bcaa7d5f..59053997d3 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -6,13 +6,14 @@ use prost::Message as _; use serde::Serialize; use std::any::{Any, TypeId}; use std::{ + fmt::Debug, io, time::{Duration, SystemTime, UNIX_EPOCH}, }; include!(concat!(env!("OUT_DIR"), "/zed.messages.rs")); -pub trait EnvelopedMessage: Clone + Serialize + Sized + Send + Sync + 'static { +pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static { const NAME: &'static str; const PRIORITY: MessagePriority; fn into_envelope( diff --git a/crates/sum_tree/src/sum_tree.rs b/crates/sum_tree/src/sum_tree.rs index d524735bac..193786112b 100644 --- a/crates/sum_tree/src/sum_tree.rs +++ b/crates/sum_tree/src/sum_tree.rs @@ -483,17 +483,20 @@ impl PartialEq for SumTree { impl Eq for SumTree {} impl SumTree { - pub fn insert_or_replace(&mut self, item: T, cx: &::Context) -> bool { - let mut replaced = false; + pub fn insert_or_replace( + &mut self, + item: T, + cx: &::Context, + ) -> Option { + let mut replaced = None; *self = { let mut cursor = self.cursor::(); let mut new_tree = cursor.slice(&item.key(), Bias::Left, cx); - if cursor - .item() - .map_or(false, |cursor_item| cursor_item.key() == item.key()) - { - cursor.next(cx); - replaced = true; + if let Some(cursor_item) = cursor.item() { + if cursor_item.key() == item.key() { + replaced = Some(cursor_item.clone()); + cursor.next(cx); + } } new_tree.push(item, cx); new_tree.push_tree(cursor.suffix(cx), cx);