diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 44db1aa40d..5ede90f230 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -21,7 +21,10 @@ use copilot::Copilot; use debounced_delay::DebouncedDelay; use fs::repository::GitRepository; use futures::{ - channel::mpsc::{self, UnboundedReceiver}, + channel::{ + mpsc::{self, UnboundedReceiver}, + oneshot, + }, future::{try_join_all, Shared}, select, stream::FuturesUnordered, @@ -93,7 +96,7 @@ use text::{Anchor, BufferId}; use util::{ debug_panic, defer, http::HttpClient, - merge_json_value_into, + maybe, merge_json_value_into, paths::{ LOCAL_SETTINGS_RELATIVE_PATH, LOCAL_TASKS_RELATIVE_PATH, LOCAL_VSCODE_TASKS_RELATIVE_PATH, }, @@ -131,6 +134,13 @@ pub trait Item { fn project_path(&self, cx: &AppContext) -> Option; } +#[derive(Clone)] +pub enum OpenedBufferEvent { + Disconnected, + Ok(BufferId), + Err(BufferId, Arc), +} + pub struct Project { worktrees: Vec, active_entry: Option, @@ -158,7 +168,8 @@ pub struct Project { client_subscriptions: Vec, _subscriptions: Vec, next_buffer_id: BufferId, - opened_buffer: (watch::Sender<()>, watch::Receiver<()>), + loading_buffers: HashMap, anyhow::Error>>>>, + incomplete_remote_buffers: HashMap>, shared_buffers: HashMap>, #[allow(clippy::type_complexity)] loading_buffers_by_path: HashMap< @@ -171,9 +182,6 @@ pub struct Project { opened_buffers: HashMap, local_buffer_ids_by_path: HashMap, local_buffer_ids_by_entry_id: HashMap, - /// A mapping from a buffer ID to None means that we've started waiting for an ID but haven't finished loading it. - /// Used for re-issuing buffer requests when peers temporarily disconnect - incomplete_remote_buffers: HashMap>>, buffer_snapshots: HashMap>>, // buffer_id -> server_id -> vec of snapshots buffers_being_formatted: HashSet, buffers_needing_diff: HashSet>, @@ -607,7 +615,6 @@ impl Project { next_buffer_id: BufferId::new(1).unwrap(), opened_buffers: Default::default(), shared_buffers: Default::default(), - incomplete_remote_buffers: Default::default(), loading_buffers_by_path: Default::default(), loading_local_worktrees: Default::default(), local_buffer_ids_by_path: Default::default(), @@ -615,7 +622,8 @@ impl Project { buffer_snapshots: Default::default(), join_project_response_message_id: 0, client_state: ProjectClientState::Local, - opened_buffer: watch::channel(), + loading_buffers: HashMap::default(), + incomplete_remote_buffers: HashMap::default(), client_subscriptions: Vec::new(), _subscriptions: vec![ cx.observe_global::(Self::on_settings_changed), @@ -721,7 +729,7 @@ impl Project { flush_language_server_update: None, loading_buffers_by_path: Default::default(), next_buffer_id: BufferId::new(1).unwrap(), - opened_buffer: watch::channel(), + loading_buffers: Default::default(), shared_buffers: Default::default(), incomplete_remote_buffers: Default::default(), loading_local_worktrees: Default::default(), @@ -1711,7 +1719,8 @@ impl Project { // Wake up all futures currently waiting on a buffer to get opened, // to give them a chance to fail now that we've disconnected. - *self.opened_buffer.0.borrow_mut() = (); + self.loading_buffers.clear(); + // self.opened_buffer.send(OpenedBufferEvent::Disconnected); } } @@ -2144,7 +2153,11 @@ impl Project { }) .detach(); - *self.opened_buffer.0.borrow_mut() = (); + if let Some(senders) = self.loading_buffers.remove(&remote_id) { + for sender in senders { + sender.send(Ok(buffer.clone())).ok(); + } + } Ok(()) } @@ -7777,26 +7790,36 @@ impl Project { .ok_or_else(|| anyhow!("missing variant"))? { proto::create_buffer_for_peer::Variant::State(mut state) => { - let mut buffer_file = None; - if let Some(file) = state.file.take() { - let worktree_id = WorktreeId::from_proto(file.worktree_id); - let worktree = this.worktree_for_id(worktree_id, cx).ok_or_else(|| { - anyhow!("no worktree found for id {}", file.worktree_id) - })?; - buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?) - as Arc); - } - let buffer_id = BufferId::new(state.id)?; - let buffer = Buffer::from_proto( - this.replica_id(), - this.capability(), - state, - buffer_file, - )?; - let buffer = cx.new_model(|_| buffer); - this.incomplete_remote_buffers - .insert(buffer_id, Some(buffer)); + + let buffer_result = maybe!({ + let mut buffer_file = None; + if let Some(file) = state.file.take() { + let worktree_id = WorktreeId::from_proto(file.worktree_id); + let worktree = + this.worktree_for_id(worktree_id, cx).ok_or_else(|| { + anyhow!("no worktree found for id {}", file.worktree_id) + })?; + buffer_file = + Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?) + as Arc); + } + Buffer::from_proto(this.replica_id(), this.capability(), state, buffer_file) + }); + + match buffer_result { + Ok(buffer) => { + let buffer = cx.new_model(|_| buffer); + this.incomplete_remote_buffers.insert(buffer_id, buffer); + } + Err(error) => { + if let Some(listeners) = this.loading_buffers.remove(&buffer_id) { + for listener in listeners { + listener.send(Err(anyhow!(error.cloned()))).ok(); + } + } + } + }; } proto::create_buffer_for_peer::Variant::Chunk(chunk) => { let buffer_id = BufferId::new(chunk.buffer_id)?; @@ -7804,23 +7827,34 @@ impl Project { .incomplete_remote_buffers .get(&buffer_id) .cloned() - .flatten() .ok_or_else(|| { anyhow!( "received chunk for buffer {} without initial state", chunk.buffer_id ) })?; - let operations = chunk - .operations - .into_iter() - .map(language::proto::deserialize_operation) - .collect::>>()?; - buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?; - if chunk.is_last { + let result = maybe!({ + let operations = chunk + .operations + .into_iter() + .map(language::proto::deserialize_operation) + .collect::>>()?; + buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx)) + }); + + if let Err(error) = result { this.incomplete_remote_buffers.remove(&buffer_id); - this.register_buffer(&buffer, cx)?; + if let Some(listeners) = this.loading_buffers.remove(&buffer_id) { + for listener in listeners { + listener.send(Err(error.cloned())).ok(); + } + } + } else { + if chunk.is_last { + this.incomplete_remote_buffers.remove(&buffer_id); + this.register_buffer(&buffer, cx)?; + } } } } @@ -7843,12 +7877,7 @@ impl Project { .opened_buffers .get_mut(&buffer_id) .and_then(|b| b.upgrade()) - .or_else(|| { - this.incomplete_remote_buffers - .get(&buffer_id) - .cloned() - .flatten() - }) + .or_else(|| this.incomplete_remote_buffers.get(&buffer_id).cloned()) { buffer.update(cx, |buffer, cx| buffer.set_diff_base(diff_base, cx)); } @@ -7871,12 +7900,7 @@ impl Project { .opened_buffers .get(&buffer_id) .and_then(|b| b.upgrade()) - .or_else(|| { - this.incomplete_remote_buffers - .get(&buffer_id) - .cloned() - .flatten() - }) + .or_else(|| this.incomplete_remote_buffers.get(&buffer_id).cloned()) { let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?; let worktree = this @@ -8623,39 +8647,19 @@ impl Project { id: BufferId, cx: &mut ModelContext, ) -> Task>> { - let mut opened_buffer_rx = self.opened_buffer.1.clone(); + let buffer = self + .opened_buffers + .get(&id) + .and_then(|buffer| buffer.upgrade()); - cx.spawn(move |this, mut cx| async move { - let buffer = loop { - let Some(this) = this.upgrade() else { - return Err(anyhow!("project dropped")); - }; + if let Some(buffer) = buffer { + return Task::ready(Ok(buffer)); + } - let buffer = this.update(&mut cx, |this, _cx| { - this.opened_buffers - .get(&id) - .and_then(|buffer| buffer.upgrade()) - })?; + let (tx, rx) = oneshot::channel(); + self.loading_buffers.entry(id).or_default().push(tx); - if let Some(buffer) = buffer { - break buffer; - } else if this.update(&mut cx, |this, _| this.is_disconnected())? { - return Err(anyhow!("disconnected before buffer {} could be opened", id)); - } - - this.update(&mut cx, |this, _| { - this.incomplete_remote_buffers.entry(id).or_default(); - })?; - drop(this); - - opened_buffer_rx - .next() - .await - .ok_or_else(|| anyhow!("project dropped while waiting for buffer"))?; - }; - - Ok(buffer) - }) + cx.background_executor().spawn(async move { rx.await? }) } fn synchronize_remote_buffers(&mut self, cx: &mut ModelContext) -> Task> { @@ -8904,11 +8908,7 @@ impl Project { .opened_buffers .get(&buffer_id) .and_then(|buffer| buffer.upgrade()) - .or_else(|| { - this.incomplete_remote_buffers - .get(&buffer_id) - .and_then(|b| b.clone()) - }); + .or_else(|| this.incomplete_remote_buffers.get(&buffer_id).cloned()); if let Some(buffer) = buffer { buffer.update(cx, |buffer, cx| { buffer.did_save(version, fingerprint, mtime, cx); @@ -8938,12 +8938,7 @@ impl Project { .opened_buffers .get(&buffer_id) .and_then(|buffer| buffer.upgrade()) - .or_else(|| { - this.incomplete_remote_buffers - .get(&buffer_id) - .cloned() - .flatten() - }); + .or_else(|| this.incomplete_remote_buffers.get(&buffer_id).cloned()); if let Some(buffer) = buffer { buffer.update(cx, |buffer, cx| { buffer.did_reload(version, fingerprint, line_ending, mtime, cx);