diff --git a/crates/project/src/buffer_store.rs b/crates/project/src/buffer_store.rs index ddca456400..61805b9cb2 100644 --- a/crates/project/src/buffer_store.rs +++ b/crates/project/src/buffer_store.rs @@ -4,6 +4,7 @@ use crate::{ Item, NoRepositoryError, ProjectPath, }; use anyhow::{anyhow, Context as _, Result}; +use client::Client; use collections::{hash_map, HashMap, HashSet}; use fs::Fs; use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt}; @@ -17,13 +18,13 @@ use language::{ Buffer, Capability, Event as BufferEvent, File as _, Language, Operation, }; use rpc::{ - proto::{self, AnyProtoClient, EnvelopedMessage, PeerId}, + proto::{self, AnyProtoClient, EnvelopedMessage}, ErrorExt as _, TypedEnvelope, }; use smol::channel::Receiver; use std::{io, path::Path, str::FromStr as _, sync::Arc}; use text::BufferId; -use util::{debug_panic, maybe, ResultExt as _}; +use util::{debug_panic, maybe, ResultExt as _, TryFutureExt}; use worktree::{ File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree, WorktreeId, @@ -45,6 +46,7 @@ pub struct BufferStore { loading_remote_buffers_by_id: HashMap>, remote_buffer_listeners: HashMap, anyhow::Error>>>>, + shared_buffers: HashMap>, } enum OpenBuffer { @@ -55,6 +57,7 @@ enum OpenBuffer { pub enum BufferStoreEvent { BufferAdded(Model), + BufferDropped(BufferId), BufferChangedFilePath { buffer: Model, old_file: Option>, @@ -93,6 +96,7 @@ impl BufferStore { local_buffer_ids_by_path: Default::default(), local_buffer_ids_by_entry_id: Default::default(), loading_buffers_by_path: Default::default(), + shared_buffers: Default::default(), } } @@ -613,6 +617,18 @@ impl BufferStore { OpenBuffer::Weak(buffer.downgrade()) }; + let handle = cx.handle().downgrade(); + buffer.update(cx, move |_, cx| { + cx.on_release(move |buffer, cx| { + handle + .update(cx, |_, cx| { + cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id())) + }) + .ok(); + }) + .detach() + }); + match self.opened_buffers.entry(remote_id) { hash_map::Entry::Vacant(entry) => { entry.insert(open_buffer); @@ -997,55 +1013,6 @@ impl BufferStore { Some(()) } - pub async fn create_buffer_for_peer( - this: Model, - peer_id: PeerId, - buffer_id: BufferId, - project_id: u64, - client: AnyProtoClient, - cx: &mut AsyncAppContext, - ) -> Result<()> { - let Some(buffer) = this.update(cx, |this, _| this.get(buffer_id))? else { - return Ok(()); - }; - - let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?; - let operations = operations.await; - let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx))?; - - let initial_state = proto::CreateBufferForPeer { - project_id, - peer_id: Some(peer_id), - variant: Some(proto::create_buffer_for_peer::Variant::State(state)), - }; - - if client.send(initial_state).log_err().is_some() { - let client = client.clone(); - cx.background_executor() - .spawn(async move { - let mut chunks = split_operations(operations).peekable(); - while let Some(chunk) = chunks.next() { - let is_last = chunks.peek().is_none(); - client.send(proto::CreateBufferForPeer { - project_id, - peer_id: Some(peer_id), - variant: Some(proto::create_buffer_for_peer::Variant::Chunk( - proto::BufferChunk { - buffer_id: buffer_id.into(), - operations: chunk, - is_last, - }, - )), - })?; - } - anyhow::Ok(()) - }) - .await - .log_err(); - } - Ok(()) - } - pub async fn handle_update_buffer( this: Model, envelope: TypedEnvelope, @@ -1075,6 +1042,90 @@ impl BufferStore { })? } + pub fn handle_synchronize_buffers( + &mut self, + envelope: TypedEnvelope, + cx: &mut ModelContext, + client: Arc, + ) -> Result { + let project_id = envelope.payload.project_id; + let mut response = proto::SynchronizeBuffersResponse { + buffers: Default::default(), + }; + let Some(guest_id) = envelope.original_sender_id else { + anyhow::bail!("missing original_sender_id on SynchronizeBuffers request"); + }; + + self.shared_buffers.entry(guest_id).or_default().clear(); + for buffer in envelope.payload.buffers { + let buffer_id = BufferId::new(buffer.id)?; + let remote_version = language::proto::deserialize_version(&buffer.version); + if let Some(buffer) = self.get(buffer_id) { + self.shared_buffers + .entry(guest_id) + .or_default() + .insert(buffer_id); + + let buffer = buffer.read(cx); + response.buffers.push(proto::BufferVersion { + id: buffer_id.into(), + version: language::proto::serialize_version(&buffer.version), + }); + + let operations = buffer.serialize_ops(Some(remote_version), cx); + let client = client.clone(); + if let Some(file) = buffer.file() { + client + .send(proto::UpdateBufferFile { + project_id, + buffer_id: buffer_id.into(), + file: Some(file.to_proto(cx)), + }) + .log_err(); + } + + client + .send(proto::UpdateDiffBase { + project_id, + buffer_id: buffer_id.into(), + diff_base: buffer.diff_base().map(ToString::to_string), + }) + .log_err(); + + client + .send(proto::BufferReloaded { + project_id, + buffer_id: buffer_id.into(), + version: language::proto::serialize_version(buffer.saved_version()), + mtime: buffer.saved_mtime().map(|time| time.into()), + line_ending: language::proto::serialize_line_ending(buffer.line_ending()) + as i32, + }) + .log_err(); + + cx.background_executor() + .spawn( + async move { + let operations = operations.await; + for chunk in split_operations(operations) { + client + .request(proto::UpdateBuffer { + project_id, + buffer_id: buffer_id.into(), + operations: chunk, + }) + .await?; + } + anyhow::Ok(()) + } + .log_err(), + ) + .detach(); + } + } + Ok(response) + } + pub fn handle_create_buffer_for_peer( &mut self, envelope: TypedEnvelope, @@ -1254,6 +1305,30 @@ impl BufferStore { }) } + pub async fn handle_close_buffer( + this: Model, + envelope: TypedEnvelope, + mut cx: AsyncAppContext, + ) -> Result<()> { + let peer_id = envelope.sender_id; + let buffer_id = BufferId::new(envelope.payload.buffer_id)?; + this.update(&mut cx, |this, _| { + if let Some(shared) = this.shared_buffers.get_mut(&peer_id) { + if shared.remove(&buffer_id) { + if shared.is_empty() { + this.shared_buffers.remove(&peer_id); + } + return; + } + }; + debug_panic!( + "peer_id {} closed buffer_id {} which was either not open or already closed", + peer_id, + buffer_id + ) + }) + } + pub async fn handle_buffer_saved( this: Model, envelope: TypedEnvelope, @@ -1326,6 +1401,85 @@ impl BufferStore { receiver.next().await; } } + + pub fn create_buffer_for_peer( + &mut self, + buffer: &Model, + peer_id: proto::PeerId, + project_id: u64, + client: AnyProtoClient, + cx: &mut ModelContext, + ) -> Task> { + let buffer_id = buffer.read(cx).remote_id(); + if !self + .shared_buffers + .entry(peer_id) + .or_default() + .insert(buffer_id) + { + return Task::ready(Ok(())); + } + + cx.spawn(|this, mut cx| async move { + let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else { + return anyhow::Ok(()); + }; + + let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?; + let operations = operations.await; + let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?; + + let initial_state = proto::CreateBufferForPeer { + project_id, + peer_id: Some(peer_id), + variant: Some(proto::create_buffer_for_peer::Variant::State(state)), + }; + + if client.send(initial_state).log_err().is_some() { + let client = client.clone(); + cx.background_executor() + .spawn(async move { + let mut chunks = split_operations(operations).peekable(); + while let Some(chunk) = chunks.next() { + let is_last = chunks.peek().is_none(); + client.send(proto::CreateBufferForPeer { + project_id, + peer_id: Some(peer_id), + variant: Some(proto::create_buffer_for_peer::Variant::Chunk( + proto::BufferChunk { + buffer_id: buffer_id.into(), + operations: chunk, + is_last, + }, + )), + })?; + } + anyhow::Ok(()) + }) + .await + .log_err(); + } + Ok(()) + }) + } + + pub fn forget_shared_buffers(&mut self) { + self.shared_buffers.clear(); + } + + pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) { + self.shared_buffers.remove(peer_id); + } + + pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) { + if let Some(buffers) = self.shared_buffers.remove(old_peer_id) { + self.shared_buffers.insert(new_peer_id, buffers); + } + } + + pub fn shared_buffers(&self) -> &HashMap> { + &self.shared_buffers + } } impl OpenBuffer { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 7b2eab5c47..7ef29926fd 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -16,7 +16,7 @@ mod project_tests; pub mod search_history; mod yarn; -use anyhow::{anyhow, bail, Context as _, Result}; +use anyhow::{anyhow, Context as _, Result}; use async_trait::async_trait; use buffer_store::{BufferStore, BufferStoreEvent}; use client::{ @@ -208,7 +208,6 @@ pub struct Project { worktree_store: Model, buffer_store: Model, _subscriptions: Vec, - shared_buffers: HashMap>, #[allow(clippy::type_complexity)] loading_worktrees: HashMap, Shared, Arc>>>>, @@ -807,7 +806,6 @@ impl Project { collaborators: Default::default(), worktree_store, buffer_store, - shared_buffers: Default::default(), loading_worktrees: Default::default(), buffer_snapshots: Default::default(), join_project_response_message_id: 0, @@ -979,7 +977,6 @@ impl Project { buffer_ordered_messages_tx: tx, buffer_store: buffer_store.clone(), worktree_store, - shared_buffers: Default::default(), loading_worktrees: Default::default(), active_entry: None, collaborators: Default::default(), @@ -1729,7 +1726,8 @@ impl Project { message: proto::ResharedProject, cx: &mut ModelContext, ) -> Result<()> { - self.shared_buffers.clear(); + self.buffer_store + .update(cx, |buffer_store, _| buffer_store.forget_shared_buffers()); self.set_collaborators_from_proto(message.collaborators, cx)?; self.metadata_changed(cx); cx.emit(Event::Reshared); @@ -1799,13 +1797,14 @@ impl Project { if let ProjectClientState::Shared { remote_id, .. } = self.client_state { self.client_state = ProjectClientState::Local; self.collaborators.clear(); - self.shared_buffers.clear(); self.client_subscriptions.clear(); self.worktree_store.update(cx, |store, cx| { store.set_shared(false, cx); }); - self.buffer_store - .update(cx, |buffer_store, cx| buffer_store.set_remote_id(None, cx)); + self.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.forget_shared_buffers(); + buffer_store.set_remote_id(None, cx) + }); self.client .send(proto::UnshareProject { project_id: remote_id, @@ -2421,6 +2420,16 @@ impl Project { BufferStoreEvent::MessageToReplicas(message) => { self.client.send_dynamic(message.as_ref().clone()).log_err(); } + BufferStoreEvent::BufferDropped(buffer_id) => { + if let Some(ref ssh_session) = self.ssh_session { + ssh_session + .send(proto::CloseBuffer { + project_id: 0, + buffer_id: buffer_id.to_proto(), + }) + .log_err(); + } + } } } @@ -7317,7 +7326,7 @@ impl Project { query: Some(query.to_proto()), limit: limit as _, }); - let guard = self.retain_remotely_created_buffers(); + let guard = self.retain_remotely_created_buffers(cx); cx.spawn(move |this, mut cx| async move { let response = request.await?; @@ -8543,7 +8552,9 @@ impl Project { let collaborator = Collaborator::from_proto(collaborator)?; this.update(&mut cx, |this, cx| { - this.shared_buffers.remove(&collaborator.peer_id); + this.buffer_store.update(cx, |buffer_store, _| { + buffer_store.forget_shared_buffers_for(&collaborator.peer_id); + }); cx.emit(Event::CollaboratorJoined(collaborator.peer_id)); this.collaborators .insert(collaborator.peer_id, collaborator); @@ -8574,16 +8585,10 @@ impl Project { let is_host = collaborator.replica_id == 0; this.collaborators.insert(new_peer_id, collaborator); - let buffers = this.shared_buffers.remove(&old_peer_id); - log::info!( - "peer {} became {}. moving buffers {:?}", - old_peer_id, - new_peer_id, - &buffers - ); - if let Some(buffers) = buffers { - this.shared_buffers.insert(new_peer_id, buffers); - } + log::info!("peer {} became {}", old_peer_id, new_peer_id,); + this.buffer_store.update(cx, |buffer_store, _| { + buffer_store.update_peer_id(&old_peer_id, new_peer_id) + }); if is_host { this.buffer_store @@ -8618,11 +8623,11 @@ impl Project { .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))? .replica_id; this.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.forget_shared_buffers_for(&peer_id); for buffer in buffer_store.buffers() { buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx)); } }); - this.shared_buffers.remove(&peer_id); cx.emit(Event::CollaboratorLeft(peer_id)); cx.notify(); @@ -8835,8 +8840,17 @@ impl Project { BufferStore::handle_update_buffer(buffer_store, envelope, cx).await } - fn retain_remotely_created_buffers(&mut self) -> RemotelyCreatedBufferGuard { - self.remotely_created_buffers.lock().retain_count += 1; + fn retain_remotely_created_buffers( + &mut self, + cx: &mut ModelContext, + ) -> RemotelyCreatedBufferGuard { + { + let mut remotely_created_buffers = self.remotely_created_buffers.lock(); + if remotely_created_buffers.retain_count == 0 { + remotely_created_buffers.buffers = self.buffer_store.read(cx).buffers().collect(); + } + remotely_created_buffers.retain_count += 1; + } RemotelyCreatedBufferGuard { remote_buffers: Arc::downgrade(&self.remotely_created_buffers), } @@ -8888,86 +8902,11 @@ impl Project { envelope: TypedEnvelope, mut cx: AsyncAppContext, ) -> Result { - let project_id = envelope.payload.project_id; - let mut response = proto::SynchronizeBuffersResponse { - buffers: Default::default(), - }; - - this.update(&mut cx, |this, cx| { - let Some(guest_id) = envelope.original_sender_id else { - error!("missing original_sender_id on SynchronizeBuffers request"); - bail!("missing original_sender_id on SynchronizeBuffers request"); - }; - - this.shared_buffers.entry(guest_id).or_default().clear(); - for buffer in envelope.payload.buffers { - let buffer_id = BufferId::new(buffer.id)?; - let remote_version = language::proto::deserialize_version(&buffer.version); - if let Some(buffer) = this.buffer_for_id(buffer_id, cx) { - this.shared_buffers - .entry(guest_id) - .or_default() - .insert(buffer_id); - - let buffer = buffer.read(cx); - response.buffers.push(proto::BufferVersion { - id: buffer_id.into(), - version: language::proto::serialize_version(&buffer.version), - }); - - let operations = buffer.serialize_ops(Some(remote_version), cx); - let client = this.client.clone(); - if let Some(file) = buffer.file() { - client - .send(proto::UpdateBufferFile { - project_id, - buffer_id: buffer_id.into(), - file: Some(file.to_proto(cx)), - }) - .log_err(); - } - - client - .send(proto::UpdateDiffBase { - project_id, - buffer_id: buffer_id.into(), - diff_base: buffer.diff_base().map(ToString::to_string), - }) - .log_err(); - - client - .send(proto::BufferReloaded { - project_id, - buffer_id: buffer_id.into(), - version: language::proto::serialize_version(buffer.saved_version()), - mtime: buffer.saved_mtime().map(|time| time.into()), - line_ending: language::proto::serialize_line_ending( - buffer.line_ending(), - ) as i32, - }) - .log_err(); - - cx.background_executor() - .spawn( - async move { - let operations = operations.await; - for chunk in split_operations(operations) { - client - .request(proto::UpdateBuffer { - project_id, - buffer_id: buffer_id.into(), - operations: chunk, - }) - .await?; - } - anyhow::Ok(()) - } - .log_err(), - ) - .detach(); - } - } - Ok(()) + let response = this.update(&mut cx, |this, cx| { + let client = this.client.clone(); + this.buffer_store.update(cx, |this, cx| { + this.handle_synchronize_buffers(envelope, cx, client) + }) })??; Ok(response) @@ -9795,35 +9734,20 @@ impl Project { peer_id: proto::PeerId, cx: &mut AppContext, ) -> BufferId { - let buffer_id = buffer.read(cx).remote_id(); - if !self - .shared_buffers - .entry(peer_id) - .or_default() - .insert(buffer_id) - { - return buffer_id; + if let Some(project_id) = self.remote_id() { + self.buffer_store + .update(cx, |buffer_store, cx| { + buffer_store.create_buffer_for_peer( + buffer, + peer_id, + project_id, + self.client.clone().into(), + cx, + ) + }) + .detach_and_log_err(cx); } - let ProjectClientState::Shared { remote_id } = self.client_state else { - return buffer_id; - }; - let buffer_store = self.buffer_store.clone(); - let client = self.client().clone(); - - cx.spawn(|mut cx| async move { - BufferStore::create_buffer_for_peer( - buffer_store, - peer_id, - buffer_id, - remote_id, - client.clone().into(), - &mut cx, - ) - .await?; - anyhow::Ok(()) - }) - .detach_and_log_err(cx); - buffer_id + buffer.read(cx).remote_id() } fn wait_for_remote_buffer( diff --git a/crates/proto/proto/zed.proto b/crates/proto/proto/zed.proto index b702a67be8..6f5321a5bd 100644 --- a/crates/proto/proto/zed.proto +++ b/crates/proto/proto/zed.proto @@ -278,7 +278,9 @@ message Envelope { LspExtSwitchSourceHeaderResponse lsp_ext_switch_source_header_response = 242; FindSearchCandidates find_search_candidates = 243; - FindSearchCandidatesResponse find_search_candidates_response = 244; // current max + FindSearchCandidatesResponse find_search_candidates_response = 244; + + CloseBuffer close_buffer = 245; // current max } reserved 158 to 161; @@ -870,6 +872,11 @@ message SaveBuffer { optional ProjectPath new_path = 4; } +message CloseBuffer { + uint64 project_id = 1; + uint64 buffer_id = 2; +} + message ProjectPath { uint64 worktree_id = 1; string path = 2; diff --git a/crates/proto/src/proto.rs b/crates/proto/src/proto.rs index 38d1dfa8c7..7ce54f4a35 100644 --- a/crates/proto/src/proto.rs +++ b/crates/proto/src/proto.rs @@ -411,7 +411,8 @@ messages!( (AddWorktree, Foreground), (AddWorktreeResponse, Foreground), (FindSearchCandidates, Background), - (FindSearchCandidatesResponse, Background) + (FindSearchCandidatesResponse, Background), + (CloseBuffer, Foreground) ); request_messages!( diff --git a/crates/remote_server/src/headless_project.rs b/crates/remote_server/src/headless_project.rs index 459e413742..f31e9c4d61 100644 --- a/crates/remote_server/src/headless_project.rs +++ b/crates/remote_server/src/headless_project.rs @@ -55,6 +55,7 @@ impl HeadlessProject { session.add_request_handler(buffer_store.downgrade(), BufferStore::handle_blame_buffer); session.add_request_handler(buffer_store.downgrade(), BufferStore::handle_update_buffer); session.add_request_handler(buffer_store.downgrade(), BufferStore::handle_save_buffer); + session.add_message_handler(buffer_store.downgrade(), BufferStore::handle_close_buffer); session.add_request_handler( worktree_store.downgrade(), @@ -143,19 +144,11 @@ impl HeadlessProject { let buffer = buffer.await?; let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?; - - cx.spawn(|mut cx| async move { - BufferStore::create_buffer_for_peer( - buffer_store, - PEER_ID, - buffer_id, - PROJECT_ID, - session, - &mut cx, - ) - .await - }) - .detach(); + buffer_store.update(&mut cx, |buffer_store, cx| { + buffer_store + .create_buffer_for_peer(&buffer, PEER_ID, PROJECT_ID, session, cx) + .detach_and_log_err(cx); + })?; Ok(proto::OpenBufferResponse { buffer_id: buffer_id.to_proto(), @@ -190,16 +183,17 @@ impl HeadlessProject { while let Some(buffer) = results.next().await { let buffer_id = buffer.update(&mut cx, |this, _| this.remote_id())?; response.buffer_ids.push(buffer_id.to_proto()); - - BufferStore::create_buffer_for_peer( - buffer_store.clone(), - PEER_ID, - buffer_id, - PROJECT_ID, - client.clone(), - &mut cx, - ) - .await?; + buffer_store + .update(&mut cx, |buffer_store, cx| { + buffer_store.create_buffer_for_peer( + &buffer, + PEER_ID, + PROJECT_ID, + client.clone(), + cx, + ) + })? + .await?; } Ok(response) diff --git a/crates/remote_server/src/remote_editing_tests.rs b/crates/remote_server/src/remote_editing_tests.rs index 56f64bb50e..de035c1527 100644 --- a/crates/remote_server/src/remote_editing_tests.rs +++ b/crates/remote_server/src/remote_editing_tests.rs @@ -4,7 +4,7 @@ use clock::FakeSystemClock; use fs::{FakeFs, Fs}; use gpui::{Context, Model, TestAppContext}; use http_client::FakeHttpClient; -use language::LanguageRegistry; +use language::{Buffer, LanguageRegistry}; use node_runtime::FakeNodeRuntime; use project::{ search::{SearchQuery, SearchResult}, @@ -121,7 +121,7 @@ async fn test_remote_editing(cx: &mut TestAppContext, server_cx: &mut TestAppCon #[gpui::test] async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut TestAppContext) { - let (project, _, _) = init_test(cx, server_cx).await; + let (project, headless, _) = init_test(cx, server_cx).await; project .update(cx, |project, cx| { @@ -132,33 +132,55 @@ async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut Tes cx.run_until_parked(); - let mut receiver = project.update(cx, |project, cx| { - project.search( - SearchQuery::text( - "project", - false, - true, - false, - Default::default(), - Default::default(), + async fn do_search(project: &Model, mut cx: TestAppContext) -> Model { + let mut receiver = project.update(&mut cx, |project, cx| { + project.search( + SearchQuery::text( + "project", + false, + true, + false, + Default::default(), + Default::default(), + ) + .unwrap(), + cx, ) - .unwrap(), - cx, - ) + }); + + let first_response = receiver.next().await.unwrap(); + let SearchResult::Buffer { buffer, .. } = first_response else { + panic!("incorrect result"); + }; + buffer.update(&mut cx, |buffer, cx| { + assert_eq!( + buffer.file().unwrap().full_path(cx).to_string_lossy(), + "project1/README.md" + ) + }); + + assert!(receiver.next().await.is_none()); + buffer + } + + let buffer = do_search(&project, cx.clone()).await; + + // test that the headless server is tracking which buffers we have open correctly. + cx.run_until_parked(); + headless.update(server_cx, |headless, cx| { + assert!(!headless.buffer_store.read(cx).shared_buffers().is_empty()) + }); + do_search(&project, cx.clone()).await; + + cx.update(|_| { + drop(buffer); + }); + cx.run_until_parked(); + headless.update(server_cx, |headless, cx| { + assert!(headless.buffer_store.read(cx).shared_buffers().is_empty()) }); - let first_response = receiver.next().await.unwrap(); - let SearchResult::Buffer { buffer, .. } = first_response else { - panic!("incorrect result"); - }; - buffer.update(cx, |buffer, cx| { - assert_eq!( - buffer.file().unwrap().full_path(cx).to_string_lossy(), - "project1/README.md" - ) - }); - - assert!(receiver.next().await.is_none()); + do_search(&project, cx.clone()).await; } fn init_logger() {