diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 376c84a9d0..f915d53c01 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -13,7 +13,10 @@ use client::{proto, Client, TypedEnvelope, UserStore}; use clock::ReplicaId; use collections::{hash_map, BTreeMap, HashMap, HashSet}; use futures::{ - channel::{mpsc, oneshot}, + channel::{ + mpsc::{self, UnboundedReceiver}, + oneshot, + }, future::{try_join_all, Shared}, AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt, }; @@ -92,6 +95,7 @@ pub trait Item { pub struct Project { worktrees: Vec, active_entry: Option, + buffer_changes_tx: mpsc::UnboundedSender, languages: Arc, language_servers: HashMap, language_server_ids: HashMap<(WorktreeId, LanguageServerName), usize>, @@ -130,6 +134,14 @@ pub struct Project { terminals: Terminals, } +enum BufferMessage { + Operation { + buffer_id: u64, + operation: proto::Operation, + }, + Resync, +} + enum OpenBuffer { Strong(ModelHandle), Weak(WeakModelHandle), @@ -417,39 +429,45 @@ impl Project { fs: Arc, cx: &mut MutableAppContext, ) -> ModelHandle { - cx.add_model(|cx: &mut ModelContext| Self { - worktrees: Default::default(), - collaborators: Default::default(), - opened_buffers: Default::default(), - shared_buffers: Default::default(), - incomplete_remote_buffers: Default::default(), - loading_buffers_by_path: Default::default(), - loading_local_worktrees: Default::default(), - buffer_snapshots: Default::default(), - join_project_response_message_id: 0, - client_state: None, - opened_buffer: watch::channel(), - client_subscriptions: Vec::new(), - _subscriptions: vec![cx.observe_global::(Self::on_settings_changed)], - _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx), - _maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx), - active_entry: None, - languages, - client, - user_store, - fs, - next_entry_id: Default::default(), - next_diagnostic_group_id: Default::default(), - language_servers: Default::default(), - language_server_ids: Default::default(), - language_server_statuses: Default::default(), - last_workspace_edits_by_language_server: Default::default(), - buffers_being_formatted: Default::default(), - next_language_server_id: 0, - nonce: StdRng::from_entropy().gen(), - terminals: Terminals { - local_handles: Vec::new(), - }, + cx.add_model(|cx: &mut ModelContext| { + let (tx, rx) = mpsc::unbounded(); + cx.spawn_weak(|this, cx| Self::send_buffer_messages(this, rx, cx)) + .detach(); + Self { + worktrees: Default::default(), + buffer_changes_tx: tx, + collaborators: Default::default(), + opened_buffers: Default::default(), + shared_buffers: Default::default(), + incomplete_remote_buffers: Default::default(), + loading_buffers_by_path: Default::default(), + loading_local_worktrees: Default::default(), + buffer_snapshots: Default::default(), + join_project_response_message_id: 0, + client_state: None, + opened_buffer: watch::channel(), + client_subscriptions: Vec::new(), + _subscriptions: vec![cx.observe_global::(Self::on_settings_changed)], + _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx), + _maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx), + active_entry: None, + languages, + client, + user_store, + fs, + next_entry_id: Default::default(), + next_diagnostic_group_id: Default::default(), + language_servers: Default::default(), + language_server_ids: Default::default(), + language_server_statuses: Default::default(), + last_workspace_edits_by_language_server: Default::default(), + buffers_being_formatted: Default::default(), + next_language_server_id: 0, + nonce: StdRng::from_entropy().gen(), + terminals: Terminals { + local_handles: Vec::new(), + }, + } }) } @@ -480,8 +498,12 @@ impl Project { worktrees.push(worktree); } + let (tx, rx) = mpsc::unbounded(); + cx.spawn_weak(|this, cx| Self::send_buffer_messages(this, rx, cx)) + .detach(); let mut this = Self { worktrees: Vec::new(), + buffer_changes_tx: tx, loading_buffers_by_path: Default::default(), opened_buffer: watch::channel(), shared_buffers: Default::default(), @@ -1084,8 +1106,9 @@ impl Project { ) }) .collect(); - self.synchronize_remote_buffers(cx).detach_and_log_err(cx); - + self.buffer_changes_tx + .unbounded_send(BufferMessage::Resync) + .unwrap(); cx.notify(); Ok(()) } @@ -1635,6 +1658,53 @@ impl Project { }); } + async fn send_buffer_messages( + this: WeakModelHandle, + mut rx: UnboundedReceiver, + mut cx: AsyncAppContext, + ) { + let mut needs_resync_with_host = false; + while let Some(change) = rx.next().await { + if let Some(this) = this.upgrade(&mut cx) { + let is_local = this.read_with(&cx, |this, _| this.is_local()); + match change { + BufferMessage::Operation { + buffer_id, + operation, + } => { + if needs_resync_with_host { + continue; + } + let request = this.read_with(&cx, |this, _| { + let project_id = this.remote_id()?; + Some(this.client.request(proto::UpdateBuffer { + buffer_id, + project_id, + operations: vec![operation], + })) + }); + if let Some(request) = request { + if request.await.is_err() && !is_local { + needs_resync_with_host = true; + } + } + } + BufferMessage::Resync => { + if this + .update(&mut cx, |this, cx| this.synchronize_remote_buffers(cx)) + .await + .is_ok() + { + needs_resync_with_host = false; + } + } + } + } else { + break; + } + } + } + fn on_buffer_event( &mut self, buffer: ModelHandle, @@ -1643,14 +1713,12 @@ impl Project { ) -> Option<()> { match event { BufferEvent::Operation(operation) => { - if let Some(project_id) = self.remote_id() { - let request = self.client.request(proto::UpdateBuffer { - project_id, + self.buffer_changes_tx + .unbounded_send(BufferMessage::Operation { buffer_id: buffer.read(cx).remote_id(), - operations: vec![language::proto::serialize_operation(operation)], - }); - cx.background().spawn(request).detach_and_log_err(cx); - } + operation: language::proto::serialize_operation(operation), + }) + .ok(); } BufferEvent::Edited { .. } => { let language_server = self @@ -4861,7 +4929,9 @@ impl Project { } if is_host { - this.synchronize_remote_buffers(cx).detach_and_log_err(cx); + this.buffer_changes_tx + .unbounded_send(BufferMessage::Resync) + .unwrap(); } cx.emit(Event::CollaboratorUpdated {