From bef575e30a7d95adbf97a10f5c9a400593377be3 Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Tue, 27 Aug 2024 15:36:38 -0600 Subject: [PATCH] Simplify project syncing (#16976) As part of allowing LSPs to run remotely, we need to move LSP stuff out of project. To do that we'd like to simplify the concurrency story on project syncing. Co-Authored-By: Max Release Notes: - N/A Co-authored-by: Max --- crates/project/src/project.rs | 174 +++++++++++++--------------------- 1 file changed, 67 insertions(+), 107 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 33c8af9a58..f676f72843 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -286,22 +286,11 @@ enum BufferOrderedMessage { Resync, } -#[derive(Debug)] -enum LocalProjectUpdate { - WorktreesChanged, - CreateBufferForPeer { - peer_id: proto::PeerId, - buffer_id: BufferId, - }, -} - #[derive(Debug)] enum ProjectClientState { Local, Shared { remote_id: u64, - updates_tx: mpsc::UnboundedSender, - _send_updates: Task>, }, Remote { sharing_has_stopped: bool, @@ -1448,12 +1437,48 @@ impl Project { } fn metadata_changed(&mut self, cx: &mut ModelContext) { - if let ProjectClientState::Shared { updates_tx, .. } = &mut self.client_state { - updates_tx - .unbounded_send(LocalProjectUpdate::WorktreesChanged) - .ok(); - } cx.notify(); + let ProjectClientState::Shared { remote_id } = self.client_state else { + return; + }; + let worktrees = self.worktrees(cx).collect::>(); + let project_id = remote_id; + + let update_project = self.client.request(proto::UpdateProject { + project_id, + worktrees: self.worktree_metadata_protos(cx), + }); + cx.spawn(|this, mut cx| async move { + update_project.await?; + + this.update(&mut cx, |this, cx| { + let client = this.client.clone(); + for worktree in worktrees { + worktree.update(cx, |worktree, cx| { + if let Some(summaries) = this.diagnostic_summaries.get(&worktree.id()) { + for (path, summaries) in summaries { + for (&server_id, summary) in summaries { + this.client.send(proto::UpdateDiagnosticSummary { + project_id, + worktree_id: worktree.id().to_proto(), + summary: Some(summary.to_proto(server_id, path)), + })?; + } + } + } + + worktree.observe_updates(project_id, cx, { + let client = client.clone(); + move |update| client.request(update).map(|result| result.is_ok()) + }); + + anyhow::Ok(()) + })?; + } + anyhow::Ok(()) + }) + }) + .detach_and_log_err(cx); } pub fn task_inventory(&self) -> &Model { @@ -1688,95 +1713,8 @@ impl Project { } } - let (updates_tx, mut updates_rx) = mpsc::unbounded(); - let client = self.client.clone(); self.client_state = ProjectClientState::Shared { remote_id: project_id, - updates_tx, - _send_updates: cx.spawn(move |this, mut cx| async move { - while let Some(update) = updates_rx.next().await { - match update { - LocalProjectUpdate::WorktreesChanged => { - let worktrees = this.update(&mut cx, |this, cx| { - this.worktrees(cx).collect::>() - })?; - - let update_project = this - .update(&mut cx, |this, cx| { - this.client.request(proto::UpdateProject { - project_id, - worktrees: this.worktree_metadata_protos(cx), - }) - })? - .await; - if update_project.log_err().is_none() { - continue; - } - - this.update(&mut cx, |this, cx| { - for worktree in worktrees { - worktree.update(cx, |worktree, cx| { - if let Some(summaries) = - this.diagnostic_summaries.get(&worktree.id()) - { - for (path, summaries) in summaries { - for (&server_id, summary) in summaries { - this.client.send( - proto::UpdateDiagnosticSummary { - project_id, - worktree_id: worktree.id().to_proto(), - summary: Some( - summary.to_proto(server_id, path), - ), - }, - )?; - } - } - } - - worktree.observe_updates(project_id, cx, { - let client = client.clone(); - move |update| { - client.request(update).map(|result| result.is_ok()) - } - }); - - anyhow::Ok(()) - })?; - } - anyhow::Ok(()) - })??; - } - LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id } => { - let Some(buffer_store) = this.update(&mut cx, |this, _| { - if this - .shared_buffers - .entry(peer_id) - .or_default() - .insert(buffer_id) - { - Some(this.buffer_store.clone()) - } else { - None - } - })? - else { - continue; - }; - BufferStore::create_buffer_for_peer( - buffer_store, - peer_id, - buffer_id, - project_id, - client.clone().into(), - &mut cx, - ) - .await?; - } - } - } - Ok(()) - }), }; self.metadata_changed(cx); @@ -9857,11 +9795,33 @@ impl Project { cx: &mut AppContext, ) -> BufferId { let buffer_id = buffer.read(cx).remote_id(); - if let ProjectClientState::Shared { updates_tx, .. } = &self.client_state { - updates_tx - .unbounded_send(LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id }) - .ok(); + if !self + .shared_buffers + .entry(peer_id) + .or_default() + .insert(buffer_id) + { + return buffer_id; } + let ProjectClientState::Shared { remote_id } = self.client_state else { + return buffer_id; + }; + let buffer_store = self.buffer_store.clone(); + let client = self.client().clone(); + + cx.spawn(|mut cx| async move { + BufferStore::create_buffer_for_peer( + buffer_store, + peer_id, + buffer_id, + remote_id, + client.clone().into(), + &mut cx, + ) + .await?; + anyhow::Ok(()) + }) + .detach_and_log_err(cx); buffer_id }