From 2a90347b82a89ba24e2262607583c1a4fa29ab19 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Mon, 12 Jul 2021 17:26:14 -0700 Subject: [PATCH] Store an Fs on Workspace, pass it to each Worktree Push test-only worktree behavior down into the Fs, via a `watch` method which provides fs events. Co-Authored-By: Nathan Sobo --- server/src/tests.rs | 19 +-- zed/src/editor/buffer.rs | 8 +- zed/src/file_finder.rs | 31 +---- zed/src/lib.rs | 1 + zed/src/main.rs | 4 +- zed/src/test.rs | 5 +- zed/src/workspace.rs | 124 ++++++------------ zed/src/worktree.rs | 265 +++++++++++++++++++++------------------ 8 files changed, 213 insertions(+), 244 deletions(-) diff --git a/server/src/tests.rs b/server/src/tests.rs index 0ece0a5e28..e65e471c8b 100644 --- a/server/src/tests.rs +++ b/server/src/tests.rs @@ -19,7 +19,7 @@ use zed::{ rpc::Client, settings, test::{temp_tree, Channel}, - worktree::{Fs, InMemoryFs, Worktree}, + worktree::{FakeFs, Fs, RealFs, Worktree}, }; use zed_rpc::{ForegroundRouter, Peer, Router}; @@ -39,7 +39,8 @@ async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) "a.txt": "a-contents", "b.txt": "b-contents", })); - let worktree_a = cx_a.add_model(|cx| Worktree::local(dir.path(), lang_registry.clone(), cx)); + let worktree_a = cx_a + .add_model(|cx| Worktree::local(dir.path(), lang_registry.clone(), Arc::new(RealFs), cx)); worktree_a .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) .await; @@ -133,7 +134,8 @@ async fn test_propagate_saves_and_fs_changes_in_shared_worktree( "file1": "", "file2": "" })); - let worktree_a = cx_a.add_model(|cx| Worktree::local(dir.path(), lang_registry.clone(), cx)); + let worktree_a = cx_a + .add_model(|cx| Worktree::local(dir.path(), lang_registry.clone(), Arc::new(RealFs), cx)); worktree_a .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) .await; @@ -243,12 +245,12 @@ async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: Tes let client_b = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A - let fs = Arc::new(InMemoryFs::new()); + let fs = Arc::new(FakeFs::new()); fs.save(Path::new("/a.txt"), &"a-contents".into()) .await .unwrap(); let worktree_a = - cx_a.add_model(|cx| Worktree::test(Path::new("/"), lang_registry.clone(), fs.clone(), cx)); + cx_a.add_model(|cx| Worktree::local(Path::new("/"), lang_registry.clone(), fs.clone(), cx)); worktree_a .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) .await; @@ -314,12 +316,12 @@ async fn test_editing_while_guest_opens_buffer(mut cx_a: TestAppContext, mut cx_ let client_b = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A - let fs = Arc::new(InMemoryFs::new()); + let fs = Arc::new(FakeFs::new()); fs.save(Path::new("/a.txt"), &"a-contents".into()) .await .unwrap(); let worktree_a = - cx_a.add_model(|cx| Worktree::test(Path::new("/"), lang_registry.clone(), fs.clone(), cx)); + cx_a.add_model(|cx| Worktree::local(Path::new("/"), lang_registry.clone(), fs.clone(), cx)); worktree_a .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) .await; @@ -371,7 +373,8 @@ async fn test_peer_disconnection(mut cx_a: TestAppContext, cx_b: TestAppContext) "a.txt": "a-contents", "b.txt": "b-contents", })); - let worktree_a = cx_a.add_model(|cx| Worktree::local(dir.path(), lang_registry.clone(), cx)); + let worktree_a = cx_a + .add_model(|cx| Worktree::local(dir.path(), lang_registry.clone(), Arc::new(RealFs), cx)); worktree_a .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) .await; diff --git a/zed/src/editor/buffer.rs b/zed/src/editor/buffer.rs index 304d9e3f40..fc3524b5ea 100644 --- a/zed/src/editor/buffer.rs +++ b/zed/src/editor/buffer.rs @@ -2734,7 +2734,7 @@ mod tests { use crate::{ test::{build_app_state, temp_tree}, util::RandomCharIter, - worktree::{Worktree, WorktreeHandle}, + worktree::{RealFs, Worktree, WorktreeHandle}, }; use gpui::ModelHandle; use rand::prelude::*; @@ -3209,7 +3209,8 @@ mod tests { "file2": "def", "file3": "ghi", })); - let tree = cx.add_model(|cx| Worktree::local(dir.path(), Default::default(), cx)); + let tree = cx + .add_model(|cx| Worktree::local(dir.path(), Default::default(), Arc::new(RealFs), cx)); tree.flush_fs_events(&cx).await; cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete()) .await; @@ -3321,7 +3322,8 @@ mod tests { async fn test_file_changes_on_disk(mut cx: gpui::TestAppContext) { let initial_contents = "aaa\nbbbbb\nc\n"; let dir = temp_tree(json!({ "the-file": initial_contents })); - let tree = cx.add_model(|cx| Worktree::local(dir.path(), Default::default(), cx)); + let tree = cx + .add_model(|cx| Worktree::local(dir.path(), Default::default(), Arc::new(RealFs), cx)); cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete()) .await; diff --git a/zed/src/file_finder.rs b/zed/src/file_finder.rs index 9ea1a9b1a2..7ce8e183ca 100644 --- a/zed/src/file_finder.rs +++ b/zed/src/file_finder.rs @@ -479,12 +479,7 @@ mod tests { let app_state = cx.read(build_app_state); let (window_id, workspace) = cx.add_window(|cx| { - let mut workspace = Workspace::new( - app_state.settings.clone(), - app_state.languages.clone(), - app_state.rpc.clone(), - cx, - ); + let mut workspace = Workspace::new(&app_state, cx); workspace.add_worktree(tmp_dir.path(), cx); workspace }); @@ -551,12 +546,7 @@ mod tests { })); let app_state = cx.read(build_app_state); let (_, workspace) = cx.add_window(|cx| { - let mut workspace = Workspace::new( - app_state.settings.clone(), - app_state.languages.clone(), - app_state.rpc.clone(), - cx, - ); + let mut workspace = Workspace::new(&app_state, cx); workspace.add_worktree(tmp_dir.path(), cx); workspace }); @@ -614,12 +604,7 @@ mod tests { let app_state = cx.read(build_app_state); let (_, workspace) = cx.add_window(|cx| { - let mut workspace = Workspace::new( - app_state.settings.clone(), - app_state.languages.clone(), - app_state.rpc.clone(), - cx, - ); + let mut workspace = Workspace::new(&app_state, cx); workspace.add_worktree(&file_path, cx); workspace }); @@ -663,15 +648,7 @@ mod tests { })); let app_state = cx.read(build_app_state); - - let (_, workspace) = cx.add_window(|cx| { - Workspace::new( - app_state.settings.clone(), - app_state.languages.clone(), - app_state.rpc.clone(), - cx, - ) - }); + let (_, workspace) = cx.add_window(|cx| Workspace::new(&app_state, cx)); workspace .update(&mut cx, |workspace, cx| { diff --git a/zed/src/lib.rs b/zed/src/lib.rs index 4b1c3571d6..f666d42297 100644 --- a/zed/src/lib.rs +++ b/zed/src/lib.rs @@ -21,6 +21,7 @@ pub struct AppState { pub languages: std::sync::Arc, pub rpc_router: std::sync::Arc, pub rpc: rpc::Client, + pub fs: std::sync::Arc, } pub fn init(cx: &mut gpui::MutableAppContext) { diff --git a/zed/src/main.rs b/zed/src/main.rs index ea11ff0c69..a3bb0224e5 100644 --- a/zed/src/main.rs +++ b/zed/src/main.rs @@ -8,7 +8,8 @@ use std::{fs, path::PathBuf, sync::Arc}; use zed::{ self, assets, editor, file_finder, language, menus, rpc, settings, workspace::{self, OpenParams}, - worktree, AppState, + worktree::{self, RealFs}, + AppState, }; use zed_rpc::ForegroundRouter; @@ -26,6 +27,7 @@ fn main() { settings, rpc_router: Arc::new(ForegroundRouter::new()), rpc: rpc::Client::new(languages), + fs: Arc::new(RealFs), }; app.run(move |cx| { diff --git a/zed/src/test.rs b/zed/src/test.rs index 2fb5b6dd2e..595c2d5bd6 100644 --- a/zed/src/test.rs +++ b/zed/src/test.rs @@ -1,4 +1,6 @@ -use crate::{language::LanguageRegistry, rpc, settings, time::ReplicaId, AppState}; +use crate::{ + language::LanguageRegistry, rpc, settings, time::ReplicaId, worktree::RealFs, AppState, +}; use gpui::AppContext; use std::{ path::{Path, PathBuf}, @@ -152,5 +154,6 @@ pub fn build_app_state(cx: &AppContext) -> Arc { languages: languages.clone(), rpc_router: Arc::new(ForegroundRouter::new()), rpc: rpc::Client::new(languages), + fs: Arc::new(RealFs), }) } diff --git a/zed/src/workspace.rs b/zed/src/workspace.rs index 0de8d3e5bf..3b71e07cc2 100644 --- a/zed/src/workspace.rs +++ b/zed/src/workspace.rs @@ -6,7 +6,7 @@ use crate::{ language::LanguageRegistry, rpc, settings::Settings, - worktree::{File, Worktree}, + worktree::{File, Fs, Worktree}, AppState, }; use anyhow::{anyhow, Result}; @@ -90,12 +90,7 @@ fn open_paths(params: &OpenParams, cx: &mut MutableAppContext) { // Add a new workspace if necessary cx.add_window(|cx| { - let mut view = Workspace::new( - params.app_state.settings.clone(), - params.app_state.languages.clone(), - params.app_state.rpc.clone(), - cx, - ); + let mut view = Workspace::new(¶ms.app_state, cx); let open_paths = view.open_paths(¶ms.paths, cx); cx.foreground().spawn(open_paths).detach(); view @@ -104,12 +99,7 @@ fn open_paths(params: &OpenParams, cx: &mut MutableAppContext) { fn open_new(app_state: &Arc, cx: &mut MutableAppContext) { cx.add_window(|cx| { - let mut view = Workspace::new( - app_state.settings.clone(), - app_state.languages.clone(), - app_state.rpc.clone(), - cx, - ); + let mut view = Workspace::new(app_state.as_ref(), cx); view.open_new_file(&app_state, cx); view }); @@ -117,12 +107,7 @@ fn open_new(app_state: &Arc, cx: &mut MutableAppContext) { fn join_worktree(app_state: &Arc, cx: &mut MutableAppContext) { cx.add_window(|cx| { - let mut view = Workspace::new( - app_state.settings.clone(), - app_state.languages.clone(), - app_state.rpc.clone(), - cx, - ); + let mut view = Workspace::new(app_state.as_ref(), cx); view.join_worktree(&app_state, cx); view }); @@ -328,6 +313,7 @@ pub struct Workspace { pub settings: watch::Receiver, languages: Arc, rpc: rpc::Client, + fs: Arc, modal: Option, center: PaneGroup, panes: Vec>, @@ -341,13 +327,8 @@ pub struct Workspace { } impl Workspace { - pub fn new( - settings: watch::Receiver, - languages: Arc, - rpc: rpc::Client, - cx: &mut ViewContext, - ) -> Self { - let pane = cx.add_view(|_| Pane::new(settings.clone())); + pub fn new(app_state: &AppState, cx: &mut ViewContext) -> Self { + let pane = cx.add_view(|_| Pane::new(app_state.settings.clone())); let pane_id = pane.id(); cx.subscribe_to_view(&pane, move |me, _, event, cx| { me.handle_pane_event(pane_id, event, cx) @@ -359,9 +340,10 @@ impl Workspace { center: PaneGroup::new(pane.id()), panes: vec![pane.clone()], active_pane: pane.clone(), - settings, - languages: languages, - rpc, + settings: app_state.settings.clone(), + languages: app_state.languages.clone(), + rpc: app_state.rpc.clone(), + fs: app_state.fs.clone(), worktrees: Default::default(), items: Default::default(), loading_items: Default::default(), @@ -411,18 +393,20 @@ impl Workspace { .map(|path| self.entry_id_for_path(&path, cx)) .collect::>(); - let bg = cx.background_executor().clone(); + let fs = self.fs.clone(); let tasks = abs_paths .iter() .cloned() .zip(entries.into_iter()) .map(|(abs_path, entry_id)| { - let is_file = bg.spawn(async move { abs_path.is_file() }); - cx.spawn(|this, mut cx| async move { - if is_file.await { - return this.update(&mut cx, |this, cx| this.open_entry(entry_id, cx)); - } else { - None + cx.spawn(|this, mut cx| { + let fs = fs.clone(); + async move { + if fs.is_file(&abs_path).await { + return this.update(&mut cx, |this, cx| this.open_entry(entry_id, cx)); + } else { + None + } } }) }) @@ -476,7 +460,8 @@ impl Workspace { path: &Path, cx: &mut ViewContext, ) -> ModelHandle { - let worktree = cx.add_model(|cx| Worktree::local(path, self.languages.clone(), cx)); + let worktree = + cx.add_model(|cx| Worktree::local(path, self.languages.clone(), self.fs.clone(), cx)); cx.observe_model(&worktree, |_, _, cx| cx.notify()); self.worktrees.insert(worktree.clone()); cx.notify(); @@ -912,7 +897,7 @@ mod tests { use crate::{ editor::Editor, test::{build_app_state, temp_tree}, - worktree::WorktreeHandle, + worktree::{FakeFs, WorktreeHandle}, }; use serde_json::json; use std::{collections::HashSet, fs}; @@ -990,12 +975,7 @@ mod tests { let app_state = cx.read(build_app_state); let (_, workspace) = cx.add_window(|cx| { - let mut workspace = Workspace::new( - app_state.settings.clone(), - app_state.languages.clone(), - app_state.rpc.clone(), - cx, - ); + let mut workspace = Workspace::new(&app_state, cx); workspace.add_worktree(dir.path(), cx); workspace }); @@ -1088,22 +1068,18 @@ mod tests { #[gpui::test] async fn test_open_paths(mut cx: gpui::TestAppContext) { - let dir1 = temp_tree(json!({ - "a.txt": "", - })); - let dir2 = temp_tree(json!({ - "b.txt": "", - })); + let fs = FakeFs::new(); + fs.insert_dir("/dir1").await.unwrap(); + fs.insert_dir("/dir2").await.unwrap(); + fs.insert_file("/dir1/a.txt", "".into()).await.unwrap(); + fs.insert_file("/dir2/b.txt", "".into()).await.unwrap(); + + let mut app_state = cx.read(build_app_state); + Arc::get_mut(&mut app_state).unwrap().fs = Arc::new(fs); - let app_state = cx.read(build_app_state); let (_, workspace) = cx.add_window(|cx| { - let mut workspace = Workspace::new( - app_state.settings.clone(), - app_state.languages.clone(), - app_state.rpc.clone(), - cx, - ); - workspace.add_worktree(dir1.path(), cx); + let mut workspace = Workspace::new(&app_state, cx); + workspace.add_worktree("/dir1".as_ref(), cx); workspace }); cx.read(|cx| workspace.read(cx).worktree_scans_complete(cx)) @@ -1111,9 +1087,7 @@ mod tests { // Open a file within an existing worktree. cx.update(|cx| { - workspace.update(cx, |view, cx| { - view.open_paths(&[dir1.path().join("a.txt")], cx) - }) + workspace.update(cx, |view, cx| view.open_paths(&["/dir1/a.txt".into()], cx)) }) .await; cx.read(|cx| { @@ -1131,9 +1105,7 @@ mod tests { // Open a file outside of any existing worktree. cx.update(|cx| { - workspace.update(cx, |view, cx| { - view.open_paths(&[dir2.path().join("b.txt")], cx) - }) + workspace.update(cx, |view, cx| view.open_paths(&["/dir2/b.txt".into()], cx)) }) .await; cx.read(|cx| { @@ -1145,8 +1117,9 @@ mod tests { .collect::>(); assert_eq!( worktree_roots, - vec![dir1.path(), &dir2.path().join("b.txt")] + vec!["/dir1", "/dir2/b.txt"] .into_iter() + .map(Path::new) .collect(), ); assert_eq!( @@ -1170,12 +1143,7 @@ mod tests { let app_state = cx.read(build_app_state); let (window_id, workspace) = cx.add_window(|cx| { - let mut workspace = Workspace::new( - app_state.settings.clone(), - app_state.languages.clone(), - app_state.rpc.clone(), - cx, - ); + let mut workspace = Workspace::new(&app_state, cx); workspace.add_worktree(dir.path(), cx); workspace }); @@ -1218,12 +1186,7 @@ mod tests { let dir = TempDir::new("test-new-file").unwrap(); let app_state = cx.read(build_app_state); let (_, workspace) = cx.add_window(|cx| { - let mut workspace = Workspace::new( - app_state.settings.clone(), - app_state.languages.clone(), - app_state.rpc.clone(), - cx, - ); + let mut workspace = Workspace::new(&app_state, cx); workspace.add_worktree(dir.path(), cx); workspace }); @@ -1343,12 +1306,7 @@ mod tests { let app_state = cx.read(build_app_state); let (window_id, workspace) = cx.add_window(|cx| { - let mut workspace = Workspace::new( - app_state.settings.clone(), - app_state.languages.clone(), - app_state.rpc.clone(), - cx, - ); + let mut workspace = Workspace::new(&app_state, cx); workspace.add_worktree(dir.path(), cx); workspace }); diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index 1cc226b78b..744449a6cb 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -14,6 +14,7 @@ use crate::{ use ::ignore::gitignore::Gitignore; use anyhow::{anyhow, Context, Result}; use atomic::Ordering::SeqCst; +use fsevent::EventStream; use futures::{Stream, StreamExt}; pub use fuzzy::{match_paths, PathMatch}; use gpui::{ @@ -84,12 +85,19 @@ pub trait Fs: Send + Sync { async fn load(&self, path: &Path) -> Result; async fn save(&self, path: &Path, text: &Rope) -> Result<()>; async fn canonicalize(&self, path: &Path) -> Result; + async fn is_file(&self, path: &Path) -> bool; + fn watch( + &self, + path: &Path, + latency: Duration, + ) -> Pin>>>; + fn is_fake(&self) -> bool; } -struct ProductionFs; +pub struct RealFs; #[async_trait::async_trait] -impl Fs for ProductionFs { +impl Fs for RealFs { async fn entry( &self, root_char_bag: CharBag, @@ -188,10 +196,34 @@ impl Fs for ProductionFs { async fn canonicalize(&self, path: &Path) -> Result { Ok(smol::fs::canonicalize(path).await?) } + + async fn is_file(&self, path: &Path) -> bool { + smol::fs::metadata(path) + .await + .map_or(false, |metadata| metadata.is_file()) + } + + fn watch( + &self, + path: &Path, + latency: Duration, + ) -> Pin>>> { + let (mut tx, rx) = postage::mpsc::channel(64); + let (stream, handle) = EventStream::new(&[path], latency); + std::mem::forget(handle); + std::thread::spawn(move || { + stream.run(move |events| smol::block_on(tx.send(events)).is_ok()); + }); + Box::pin(rx) + } + + fn is_fake(&self) -> bool { + false + } } #[derive(Clone, Debug)] -struct InMemoryEntry { +struct FakeFsEntry { inode: u64, mtime: SystemTime, is_dir: bool, @@ -200,14 +232,14 @@ struct InMemoryEntry { } #[cfg(any(test, feature = "test-support"))] -struct InMemoryFsState { - entries: std::collections::BTreeMap, +struct FakeFsState { + entries: std::collections::BTreeMap, next_inode: u64, - events_tx: postage::broadcast::Sender, + events_tx: postage::broadcast::Sender>, } #[cfg(any(test, feature = "test-support"))] -impl InMemoryFsState { +impl FakeFsState { fn validate_path(&self, path: &Path) -> Result<()> { if path.is_absolute() && path @@ -221,31 +253,33 @@ impl InMemoryFsState { } } - async fn emit_event(&mut self, path: &Path) { - let _ = self - .events_tx - .send(fsevent::Event { + async fn emit_event(&mut self, paths: &[&Path]) { + let events = paths + .iter() + .map(|path| fsevent::Event { event_id: 0, flags: fsevent::StreamFlags::empty(), path: path.to_path_buf(), }) - .await; + .collect(); + + let _ = self.events_tx.send(events).await; } } #[cfg(any(test, feature = "test-support"))] -pub struct InMemoryFs { - state: smol::lock::RwLock, +pub struct FakeFs { + state: smol::lock::RwLock, } #[cfg(any(test, feature = "test-support"))] -impl InMemoryFs { +impl FakeFs { pub fn new() -> Self { let (events_tx, _) = postage::broadcast::channel(2048); let mut entries = std::collections::BTreeMap::new(); entries.insert( Path::new("/").to_path_buf(), - InMemoryEntry { + FakeFsEntry { inode: 0, mtime: SystemTime::now(), is_dir: true, @@ -254,7 +288,7 @@ impl InMemoryFs { }, ); Self { - state: smol::lock::RwLock::new(InMemoryFsState { + state: smol::lock::RwLock::new(FakeFsState { entries, next_inode: 1, events_tx, @@ -262,15 +296,16 @@ impl InMemoryFs { } } - pub async fn insert_dir(&self, path: &Path) -> Result<()> { + pub async fn insert_dir(&self, path: impl AsRef) -> Result<()> { let mut state = self.state.write().await; + let path = path.as_ref(); state.validate_path(path)?; let inode = state.next_inode; state.next_inode += 1; state.entries.insert( path.to_path_buf(), - InMemoryEntry { + FakeFsEntry { inode, mtime: SystemTime::now(), is_dir: true, @@ -278,7 +313,28 @@ impl InMemoryFs { content: None, }, ); - state.emit_event(path).await; + state.emit_event(&[path]).await; + Ok(()) + } + + pub async fn insert_file(&self, path: impl AsRef, content: String) -> Result<()> { + let mut state = self.state.write().await; + let path = path.as_ref(); + state.validate_path(path)?; + + let inode = state.next_inode; + state.next_inode += 1; + state.entries.insert( + path.to_path_buf(), + FakeFsEntry { + inode, + mtime: SystemTime::now(), + is_dir: false, + is_symlink: false, + content: Some(content), + }, + ); + state.emit_event(&[path]).await; Ok(()) } @@ -286,7 +342,7 @@ impl InMemoryFs { let mut state = self.state.write().await; state.validate_path(path)?; state.entries.retain(|path, _| !path.starts_with(path)); - state.emit_event(&path).await; + state.emit_event(&[path]).await; Ok(()) } @@ -312,21 +368,15 @@ impl InMemoryFs { state.entries.insert(new_path, entry); } - state.emit_event(source).await; - state.emit_event(target).await; - + state.emit_event(&[source, target]).await; Ok(()) } } - - pub async fn events(&self) -> postage::broadcast::Receiver { - self.state.read().await.events_tx.subscribe() - } } #[cfg(any(test, feature = "test-support"))] #[async_trait::async_trait] -impl Fs for InMemoryFs { +impl Fs for FakeFs { async fn entry( &self, root_char_bag: CharBag, @@ -405,13 +455,13 @@ impl Fs for InMemoryFs { } else { entry.content = Some(text.chunks().collect()); entry.mtime = SystemTime::now(); - state.emit_event(path).await; + state.emit_event(&[path]).await; Ok(()) } } else { let inode = state.next_inode; state.next_inode += 1; - let entry = InMemoryEntry { + let entry = FakeFsEntry { inode, mtime: SystemTime::now(), is_dir: false, @@ -419,7 +469,7 @@ impl Fs for InMemoryFs { content: Some(text.chunks().collect()), }; state.entries.insert(path.to_path_buf(), entry); - state.emit_event(path).await; + state.emit_event(&[path]).await; Ok(()) } } @@ -427,6 +477,29 @@ impl Fs for InMemoryFs { async fn canonicalize(&self, path: &Path) -> Result { Ok(path.to_path_buf()) } + + async fn is_file(&self, path: &Path) -> bool { + let state = self.state.read().await; + state.entries.get(path).map_or(false, |entry| !entry.is_dir) + } + + fn watch( + &self, + path: &Path, + _: Duration, + ) -> Pin>>> { + let state = smol::block_on(self.state.read()); + let rx = state.events_tx.subscribe(); + let path = path.to_path_buf(); + Box::pin(futures::StreamExt::filter(rx, move |events| { + let result = events.iter().any(|event| event.path.starts_with(&path)); + async move { result } + })) + } + + fn is_fake(&self) -> bool { + true + } } #[derive(Clone, Debug)] @@ -470,49 +543,22 @@ impl Worktree { pub fn local( path: impl Into>, languages: Arc, + fs: Arc, cx: &mut ModelContext, ) -> Self { - let fs = Arc::new(ProductionFs); - let (mut tree, scan_states_tx) = - LocalWorktree::new(path, languages, fs.clone(), Duration::from_millis(100), cx); - let (event_stream, event_stream_handle) = fsevent::EventStream::new( - &[tree.snapshot.abs_path.as_ref()], - Duration::from_millis(100), - ); + let (mut tree, scan_states_tx) = LocalWorktree::new(path, languages, fs.clone(), cx); + + let events = fs.watch(tree.snapshot.abs_path.as_ref(), Duration::from_millis(100)); let background_snapshot = tree.background_snapshot.clone(); - std::thread::spawn(move || { + tree._background_scanner_task = Some(cx.background().spawn(async move { let scanner = BackgroundScanner::new( background_snapshot, scan_states_tx, fs, Arc::new(executor::Background::new()), ); - scanner.run(event_stream); - }); - tree._event_stream_handle = Some(event_stream_handle); - Worktree::Local(tree) - } - - #[cfg(any(test, feature = "test-support"))] - pub fn test( - path: impl Into>, - languages: Arc, - fs: Arc, - cx: &mut ModelContext, - ) -> Self { - let (tree, scan_states_tx) = - LocalWorktree::new(path, languages, fs.clone(), Duration::ZERO, cx); - let background_snapshot = tree.background_snapshot.clone(); - let fs = fs.clone(); - let background = cx.background().clone(); - cx.background() - .spawn(async move { - let events_rx = fs.events().await; - let scanner = - BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background); - scanner.run_test(events_rx).await; - }) - .detach(); + scanner.run(events).await; + })); Worktree::Local(tree) } @@ -831,15 +877,15 @@ impl Worktree { fn poll_snapshot(&mut self, cx: &mut ModelContext) { match self { Self::Local(worktree) => { - let poll_interval = worktree.poll_interval; + let is_fake_fs = worktree.fs.is_fake(); worktree.snapshot = worktree.background_snapshot.lock().clone(); if worktree.is_scanning() { if !worktree.poll_scheduled { cx.spawn(|this, mut cx| async move { - if poll_interval.is_zero() { + if is_fake_fs { smol::future::yield_now().await; } else { - smol::Timer::after(poll_interval).await; + smol::Timer::after(Duration::from_millis(100)).await; } this.update(&mut cx, |this, cx| { this.as_local_mut().unwrap().poll_scheduled = false; @@ -961,7 +1007,7 @@ pub struct LocalWorktree { background_snapshot: Arc>, snapshots_to_send_tx: Option>, last_scan_state_rx: watch::Receiver, - _event_stream_handle: Option, + _background_scanner_task: Option>, poll_scheduled: bool, rpc: Option<(rpc::Client, u64)>, open_buffers: HashMap>, @@ -969,7 +1015,6 @@ pub struct LocalWorktree { peers: HashMap, languages: Arc, fs: Arc, - poll_interval: Duration, } impl LocalWorktree { @@ -977,7 +1022,6 @@ impl LocalWorktree { path: impl Into>, languages: Arc, fs: Arc, - poll_interval: Duration, cx: &mut ModelContext, ) -> (Self, Sender) { let abs_path = path.into(); @@ -1002,7 +1046,7 @@ impl LocalWorktree { background_snapshot: Arc::new(Mutex::new(snapshot)), snapshots_to_send_tx: None, last_scan_state_rx, - _event_stream_handle: None, + _background_scanner_task: None, poll_scheduled: false, open_buffers: Default::default(), shared_buffers: Default::default(), @@ -1010,7 +1054,6 @@ impl LocalWorktree { rpc: None, languages, fs, - poll_interval, }; cx.spawn_weak(|this, mut cx| async move { @@ -2158,40 +2201,7 @@ impl BackgroundScanner { self.snapshot.lock().clone() } - fn run(mut self, event_stream: fsevent::EventStream) { - if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() { - return; - } - - if let Err(err) = smol::block_on(self.scan_dirs()) { - if smol::block_on(self.notify.send(ScanState::Err(Arc::new(err)))).is_err() { - return; - } - } - - if smol::block_on(self.notify.send(ScanState::Idle)).is_err() { - return; - } - - event_stream.run(move |events| { - if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() { - return false; - } - - if !smol::block_on(self.process_events(events)) { - return false; - } - - if smol::block_on(self.notify.send(ScanState::Idle)).is_err() { - return false; - } - - true - }); - } - - #[cfg(any(test, feature = "test-support"))] - async fn run_test(mut self, mut events_rx: postage::broadcast::Receiver) { + async fn run(mut self, events_rx: impl Stream>) { if self.notify.send(ScanState::Scanning).await.is_err() { return; } @@ -2211,12 +2221,8 @@ impl BackgroundScanner { return; } - while let Some(event) = events_rx.recv().await { - let mut events = vec![event]; - while let Ok(event) = events_rx.try_recv() { - events.push(event); - } - + futures::pin_mut!(events_rx); + while let Some(events) = events_rx.next().await { if self.notify.send(ScanState::Scanning).await.is_err() { break; } @@ -2997,7 +3003,9 @@ mod tests { ) .unwrap(); - let tree = cx.add_model(|cx| Worktree::local(root_link_path, Default::default(), cx)); + let tree = cx.add_model(|cx| { + Worktree::local(root_link_path, Default::default(), Arc::new(RealFs), cx) + }); cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete()) .await; @@ -3039,7 +3047,14 @@ mod tests { let dir = temp_tree(json!({ "file1": "the old contents", })); - let tree = cx.add_model(|cx| Worktree::local(dir.path(), app_state.languages.clone(), cx)); + let tree = cx.add_model(|cx| { + Worktree::local( + dir.path(), + app_state.languages.clone(), + Arc::new(RealFs), + cx, + ) + }); let buffer = tree .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx)) .await @@ -3062,8 +3077,14 @@ mod tests { })); let file_path = dir.path().join("file1"); - let tree = - cx.add_model(|cx| Worktree::local(file_path.clone(), app_state.languages.clone(), cx)); + let tree = cx.add_model(|cx| { + Worktree::local( + file_path.clone(), + app_state.languages.clone(), + Arc::new(RealFs), + cx, + ) + }); cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete()) .await; cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1)); @@ -3098,7 +3119,8 @@ mod tests { } })); - let tree = cx.add_model(|cx| Worktree::local(dir.path(), Default::default(), cx)); + let tree = cx + .add_model(|cx| Worktree::local(dir.path(), Default::default(), Arc::new(RealFs), cx)); let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| { let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx)); @@ -3245,7 +3267,8 @@ mod tests { } })); - let tree = cx.add_model(|cx| Worktree::local(dir.path(), Default::default(), cx)); + let tree = cx + .add_model(|cx| Worktree::local(dir.path(), Default::default(), Arc::new(RealFs), cx)); cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete()) .await; tree.flush_fs_events(&cx).await; @@ -3313,7 +3336,7 @@ mod tests { next_entry_id: Default::default(), })), notify_tx, - Arc::new(ProductionFs), + Arc::new(RealFs), Arc::new(gpui::executor::Background::new()), ); smol::block_on(scanner.scan_dirs()).unwrap();