From 4cfd345f9d91ebd8e76a668f3494ecf2e45c4b9d Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 1 Mar 2022 15:48:15 +0100 Subject: [PATCH] Use `async_broadcast` to emit fake FS events Co-Authored-By: Nathan Sobo --- Cargo.lock | 1 + crates/project/Cargo.toml | 1 + crates/project/src/fs.rs | 77 ++++++++++++++++++++++----------------- 3 files changed, 45 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 71fe0871c4..b495bbd7c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3568,6 +3568,7 @@ version = "0.1.0" dependencies = [ "aho-corasick", "anyhow", + "async-broadcast", "async-trait", "client", "clock", diff --git a/crates/project/Cargo.toml b/crates/project/Cargo.toml index dea5a10279..5e58f4530a 100644 --- a/crates/project/Cargo.toml +++ b/crates/project/Cargo.toml @@ -28,6 +28,7 @@ sum_tree = { path = "../sum_tree" } util = { path = "../util" } aho-corasick = "0.7" anyhow = "1.0.38" +async-broadcast = "0.3.4" async-trait = "0.1" futures = "0.3" ignore = "0.4" diff --git a/crates/project/src/fs.rs b/crates/project/src/fs.rs index ec7925685d..d436e5e9fa 100644 --- a/crates/project/src/fs.rs +++ b/crates/project/src/fs.rs @@ -225,7 +225,6 @@ struct FakeFsEntry { struct FakeFsState { entries: std::collections::BTreeMap, next_inode: u64, - event_txs: Vec>>, } #[cfg(any(test, feature = "test-support"))] @@ -242,26 +241,6 @@ impl FakeFsState { Err(anyhow!("invalid path {:?}", path)) } } - - async fn emit_event(&mut self, paths: I) - where - I: IntoIterator, - T: Into, - { - let events = paths - .into_iter() - .map(|path| fsevent::Event { - event_id: 0, - flags: fsevent::StreamFlags::empty(), - path: path.into(), - }) - .collect::>(); - - self.event_txs.retain(|tx| { - let _ = tx.try_send(events.clone()); - !tx.is_closed() - }); - } } #[cfg(any(test, feature = "test-support"))] @@ -269,6 +248,10 @@ pub struct FakeFs { // Use an unfair lock to ensure tests are deterministic. state: futures::lock::Mutex, executor: std::sync::Weak, + events: ( + async_broadcast::Sender>, + async_broadcast::Receiver>, + ), } #[cfg(any(test, feature = "test-support"))] @@ -292,8 +275,8 @@ impl FakeFs { state: futures::lock::Mutex::new(FakeFsState { entries, next_inode: 1, - event_txs: Default::default(), }), + events: async_broadcast::broadcast(16), }) } @@ -316,7 +299,9 @@ impl FakeFs { content: None, }, ); - state.emit_event(&[path]).await; + + drop(state); + self.emit_event(&[path]).await; } pub async fn insert_file(&self, path: impl AsRef, content: String) { @@ -338,7 +323,9 @@ impl FakeFs { content: Some(content), }, ); - state.emit_event(&[path]).await; + + drop(state); + self.emit_event(&[path]).await; } #[must_use] @@ -383,6 +370,23 @@ impl FakeFs { .simulate_random_delay() .await; } + + async fn emit_event(&self, paths: I) + where + I: IntoIterator, + T: Into, + { + let events = paths + .into_iter() + .map(|path| fsevent::Event { + event_id: 0, + flags: fsevent::StreamFlags::empty(), + path: path.into(), + }) + .collect::>(); + + let _ = self.events.0.broadcast(events).await; + } } #[cfg(any(test, feature = "test-support"))] @@ -420,7 +424,8 @@ impl Fs for FakeFs { )); } } - state.emit_event(&created_dir_paths).await; + drop(state); + self.emit_event(&created_dir_paths).await; Ok(()) } @@ -461,7 +466,8 @@ impl Fs for FakeFs { }; state.entries.insert(path.to_path_buf(), entry); } - state.emit_event(&[path]).await; + drop(state); + self.emit_event(&[path]).await; Ok(()) } @@ -497,7 +503,8 @@ impl Fs for FakeFs { state.entries.insert(new_path, entry); } - state.emit_event(&[source, target]).await; + drop(state); + self.emit_event(&[source, target]).await; Ok(()) } @@ -522,7 +529,8 @@ impl Fs for FakeFs { } state.entries.retain(|path, _| !path.starts_with(path)); - state.emit_event(&[path]).await; + drop(state); + self.emit_event(&[path]).await; } else if !options.ignore_if_not_exists { return Err(anyhow!("{path:?} does not exist")); } @@ -540,7 +548,8 @@ impl Fs for FakeFs { } state.entries.remove(&path); - state.emit_event(&[path]).await; + drop(state); + self.emit_event(&[path]).await; } else if !options.ignore_if_not_exists { return Err(anyhow!("{path:?} does not exist")); } @@ -575,7 +584,8 @@ impl Fs for FakeFs { } else { entry.content = Some(text.chunks().collect()); entry.metadata.mtime = SystemTime::now(); - state.emit_event(&[path]).await; + drop(state); + self.emit_event(&[path]).await; Ok(()) } } else { @@ -591,7 +601,8 @@ impl Fs for FakeFs { content: Some(text.chunks().collect()), }; state.entries.insert(path.to_path_buf(), entry); - state.emit_event(&[path]).await; + drop(state); + self.emit_event(&[path]).await; Ok(()) } } @@ -642,10 +653,8 @@ impl Fs for FakeFs { path: &Path, _: Duration, ) -> Pin>>> { - let mut state = self.state.lock().await; self.simulate_random_delay().await; - let (tx, rx) = smol::channel::unbounded(); - state.event_txs.push(tx); + let rx = self.events.1.clone(); let path = path.to_path_buf(); Box::pin(futures::StreamExt::filter(rx, move |events| { let result = events.iter().any(|event| event.path.starts_with(&path));