diff --git a/Cargo.lock b/Cargo.lock index b495bbd7c6..71fe0871c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3568,7 +3568,6 @@ version = "0.1.0" dependencies = [ "aho-corasick", "anyhow", - "async-broadcast", "async-trait", "client", "clock", diff --git a/crates/project/Cargo.toml b/crates/project/Cargo.toml index 5e58f4530a..dea5a10279 100644 --- a/crates/project/Cargo.toml +++ b/crates/project/Cargo.toml @@ -28,7 +28,6 @@ sum_tree = { path = "../sum_tree" } util = { path = "../util" } aho-corasick = "0.7" anyhow = "1.0.38" -async-broadcast = "0.3.4" async-trait = "0.1" futures = "0.3" ignore = "0.4" diff --git a/crates/project/src/fs.rs b/crates/project/src/fs.rs index d436e5e9fa..ec7925685d 100644 --- a/crates/project/src/fs.rs +++ b/crates/project/src/fs.rs @@ -225,6 +225,7 @@ struct FakeFsEntry { struct FakeFsState { entries: std::collections::BTreeMap, next_inode: u64, + event_txs: Vec>>, } #[cfg(any(test, feature = "test-support"))] @@ -241,6 +242,26 @@ impl FakeFsState { Err(anyhow!("invalid path {:?}", path)) } } + + async fn emit_event(&mut self, paths: I) + where + I: IntoIterator, + T: Into, + { + let events = paths + .into_iter() + .map(|path| fsevent::Event { + event_id: 0, + flags: fsevent::StreamFlags::empty(), + path: path.into(), + }) + .collect::>(); + + self.event_txs.retain(|tx| { + let _ = tx.try_send(events.clone()); + !tx.is_closed() + }); + } } #[cfg(any(test, feature = "test-support"))] @@ -248,10 +269,6 @@ pub struct FakeFs { // Use an unfair lock to ensure tests are deterministic. state: futures::lock::Mutex, executor: std::sync::Weak, - events: ( - async_broadcast::Sender>, - async_broadcast::Receiver>, - ), } #[cfg(any(test, feature = "test-support"))] @@ -275,8 +292,8 @@ impl FakeFs { state: futures::lock::Mutex::new(FakeFsState { entries, next_inode: 1, + event_txs: Default::default(), }), - events: async_broadcast::broadcast(16), }) } @@ -299,9 +316,7 @@ impl FakeFs { content: None, }, ); - - drop(state); - self.emit_event(&[path]).await; + state.emit_event(&[path]).await; } pub async fn insert_file(&self, path: impl AsRef, content: String) { @@ -323,9 +338,7 @@ impl FakeFs { content: Some(content), }, ); - - drop(state); - self.emit_event(&[path]).await; + state.emit_event(&[path]).await; } #[must_use] @@ -370,23 +383,6 @@ impl FakeFs { .simulate_random_delay() .await; } - - async fn emit_event(&self, paths: I) - where - I: IntoIterator, - T: Into, - { - let events = paths - .into_iter() - .map(|path| fsevent::Event { - event_id: 0, - flags: fsevent::StreamFlags::empty(), - path: path.into(), - }) - .collect::>(); - - let _ = self.events.0.broadcast(events).await; - } } #[cfg(any(test, feature = "test-support"))] @@ -424,8 +420,7 @@ impl Fs for FakeFs { )); } } - drop(state); - self.emit_event(&created_dir_paths).await; + state.emit_event(&created_dir_paths).await; Ok(()) } @@ -466,8 +461,7 @@ impl Fs for FakeFs { }; state.entries.insert(path.to_path_buf(), entry); } - drop(state); - self.emit_event(&[path]).await; + state.emit_event(&[path]).await; Ok(()) } @@ -503,8 +497,7 @@ impl Fs for FakeFs { state.entries.insert(new_path, entry); } - drop(state); - self.emit_event(&[source, target]).await; + state.emit_event(&[source, target]).await; Ok(()) } @@ -529,8 +522,7 @@ impl Fs for FakeFs { } state.entries.retain(|path, _| !path.starts_with(path)); - drop(state); - self.emit_event(&[path]).await; + state.emit_event(&[path]).await; } else if !options.ignore_if_not_exists { return Err(anyhow!("{path:?} does not exist")); } @@ -548,8 +540,7 @@ impl Fs for FakeFs { } state.entries.remove(&path); - drop(state); - self.emit_event(&[path]).await; + state.emit_event(&[path]).await; } else if !options.ignore_if_not_exists { return Err(anyhow!("{path:?} does not exist")); } @@ -584,8 +575,7 @@ impl Fs for FakeFs { } else { entry.content = Some(text.chunks().collect()); entry.metadata.mtime = SystemTime::now(); - drop(state); - self.emit_event(&[path]).await; + state.emit_event(&[path]).await; Ok(()) } } else { @@ -601,8 +591,7 @@ impl Fs for FakeFs { content: Some(text.chunks().collect()), }; state.entries.insert(path.to_path_buf(), entry); - drop(state); - self.emit_event(&[path]).await; + state.emit_event(&[path]).await; Ok(()) } } @@ -653,8 +642,10 @@ impl Fs for FakeFs { path: &Path, _: Duration, ) -> Pin>>> { + let mut state = self.state.lock().await; self.simulate_random_delay().await; - let rx = self.events.1.clone(); + let (tx, rx) = smol::channel::unbounded(); + state.event_txs.push(tx); let path = path.to_path_buf(); Box::pin(futures::StreamExt::filter(rx, move |events| { let result = events.iter().any(|event| event.path.starts_with(&path));