From 53327e2bf06ab9c850c1fe10953d9e39cdf9932d Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 3 Mar 2022 10:10:53 +0100 Subject: [PATCH] Ensure worktree is registered/shared synchronously --- crates/project/src/project.rs | 11 ++--- crates/project/src/worktree.rs | 75 ++++++++++++++++++++-------------- crates/server/src/rpc.rs | 17 +++++--- 3 files changed, 61 insertions(+), 42 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index c3915cc8b4..015b5e6ccc 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -2396,17 +2396,18 @@ impl Project { }); if let Some(project_id) = remote_project_id { - worktree - .update(&mut cx, |worktree, cx| { - worktree.as_local_mut().unwrap().register(project_id, cx) - }) - .await?; if is_shared { worktree .update(&mut cx, |worktree, cx| { worktree.as_local_mut().unwrap().share(project_id, cx) }) .await?; + } else { + worktree + .update(&mut cx, |worktree, cx| { + worktree.as_local_mut().unwrap().register(project_id, cx) + }) + .await?; } } diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index ffe229abf2..a9ce90ce61 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -741,8 +741,9 @@ impl LocalWorktree { authorized_logins: self.authorized_logins(), visible: self.visible, }; + let request = client.request(register_message); cx.spawn(|this, mut cx| async move { - let response = client.request(register_message).await; + let response = request.await; this.update(&mut cx, |this, _| { let worktree = this.as_local_mut().unwrap(); match response { @@ -759,44 +760,49 @@ impl LocalWorktree { }) } - pub fn share( - &mut self, - project_id: u64, - cx: &mut ModelContext, - ) -> impl Future> { + pub fn share(&mut self, project_id: u64, cx: &mut ModelContext) -> Task> { + let register = self.register(project_id, cx); let (mut share_tx, mut share_rx) = oneshot::channel(); + let (snapshots_to_send_tx, snapshots_to_send_rx) = + smol::channel::unbounded::(); if self.share.is_some() { let _ = share_tx.try_send(Ok(())); } else { let snapshot = self.snapshot(); let rpc = self.client.clone(); let worktree_id = cx.model_id() as u64; - let (snapshots_to_send_tx, snapshots_to_send_rx) = - smol::channel::unbounded::(); + let maintain_remote_snapshot = cx.background().spawn({ let rpc = rpc.clone(); - let snapshot = snapshot.clone(); let diagnostic_summaries = self.diagnostic_summaries.clone(); async move { - if let Err(error) = rpc - .request(proto::UpdateWorktree { - project_id, - worktree_id, - root_name: snapshot.root_name().to_string(), - updated_entries: snapshot - .entries_by_path - .iter() - .filter(|e| !e.is_ignored) - .map(Into::into) - .collect(), - removed_entries: Default::default(), - }) - .await - { - let _ = share_tx.try_send(Err(error)); - return Err(anyhow!("failed to send initial update worktree")); - } else { - let _ = share_tx.try_send(Ok(())); + match snapshots_to_send_rx.recv().await { + Ok(snapshot) => { + if let Err(error) = rpc + .request(proto::UpdateWorktree { + project_id, + worktree_id, + root_name: snapshot.root_name().to_string(), + updated_entries: snapshot + .entries_by_path + .iter() + .filter(|e| !e.is_ignored) + .map(Into::into) + .collect(), + removed_entries: Default::default(), + }) + .await + { + let _ = share_tx.try_send(Err(error)); + return Err(anyhow!("failed to send initial update worktree")); + } else { + let _ = share_tx.try_send(Ok(())); + } + } + Err(error) => { + let _ = share_tx.try_send(Err(error.into())); + return Err(anyhow!("failed to send initial update worktree")); + } } for (path, summary) in diagnostic_summaries.iter() { @@ -821,17 +827,24 @@ impl LocalWorktree { }); self.share = Some(ShareState { project_id, - snapshots_tx: snapshots_to_send_tx, + snapshots_tx: snapshots_to_send_tx.clone(), _maintain_remote_snapshot: Some(maintain_remote_snapshot), }); } - async move { + cx.spawn_weak(|this, cx| async move { + register.await?; + if let Some(this) = this.upgrade(&cx) { + this.read_with(&cx, |this, _| { + let this = this.as_local().unwrap(); + let _ = snapshots_to_send_tx.try_send(this.snapshot()); + }); + } share_rx .next() .await .unwrap_or_else(|| Err(anyhow!("share ended"))) - } + }) } pub fn unshare(&mut self) { diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 6330e39aef..0f6da796e3 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -4664,12 +4664,17 @@ mod tests { } log::info!("Host: find/create local worktree {:?}", path); - project - .update(&mut cx, |project, cx| { - project.find_or_create_local_worktree(path, true, cx) - }) - .await - .unwrap(); + let find_or_create_worktree = project.update(&mut cx, |project, cx| { + project.find_or_create_local_worktree(path, true, cx) + }); + let find_or_create_worktree = async move { + find_or_create_worktree.await.unwrap(); + }; + if rng.lock().gen() { + cx.background().spawn(find_or_create_worktree).detach(); + } else { + find_or_create_worktree.await; + } } 10..=80 if !files.lock().is_empty() => { let buffer = if self.buffers.is_empty() || rng.lock().gen() {