diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index 1a09dff780..cffb549a89 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -47,9 +47,55 @@ CREATE TABLE "projects" ( "host_connection_id" INTEGER NOT NULL ); +CREATE TABLE "worktrees" ( + "id" INTEGER NOT NULL, + "project_id" INTEGER NOT NULL REFERENCES projects (id), + "root_name" VARCHAR NOT NULL, + "abs_path" VARCHAR NOT NULL, + "visible" BOOL NOT NULL, + "scan_id" INTEGER NOT NULL, + "is_complete" BOOL NOT NULL, + PRIMARY KEY(project_id, id) +); +CREATE INDEX "index_worktrees_on_project_id" ON "worktrees" ("project_id"); + +CREATE TABLE "worktree_entries" ( + "id" INTEGER NOT NULL, + "project_id" INTEGER NOT NULL REFERENCES projects (id), + "worktree_id" INTEGER NOT NULL REFERENCES worktrees (id), + "is_dir" BOOL NOT NULL, + "path" VARCHAR NOT NULL, + "inode" INTEGER NOT NULL, + "mtime_seconds" INTEGER NOT NULL, + "mtime_nanos" INTEGER NOT NULL, + "is_symlink" BOOL NOT NULL, + "is_ignored" BOOL NOT NULL, + PRIMARY KEY(project_id, worktree_id, id) +); +CREATE INDEX "index_worktree_entries_on_project_id_and_worktree_id" ON "worktree_entries" ("project_id", "worktree_id"); + +CREATE TABLE "worktree_diagnostic_summaries" ( + "path" VARCHAR NOT NULL, + "project_id" INTEGER NOT NULL REFERENCES projects (id), + "worktree_id" INTEGER NOT NULL REFERENCES worktrees (id), + "language_server_id" INTEGER NOT NULL, + "error_count" INTEGER NOT NULL, + "warning_count" INTEGER NOT NULL, + PRIMARY KEY(project_id, worktree_id, path) +); +CREATE INDEX "index_worktree_diagnostic_summaries_on_project_id_and_worktree_id" ON "worktree_diagnostic_summaries" ("project_id", "worktree_id"); + +CREATE TABLE "language_servers" ( + "id" INTEGER NOT NULL, + "project_id" INTEGER NOT NULL REFERENCES projects (id), + "name" VARCHAR NOT NULL, + PRIMARY KEY(project_id, id) +); +CREATE INDEX "index_language_servers_on_project_id" ON "language_servers" ("project_id"); + CREATE TABLE "project_collaborators" ( "id" INTEGER PRIMARY KEY, - "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, + "project_id" INTEGER NOT NULL REFERENCES projects (id), "connection_id" INTEGER NOT NULL, "user_id" INTEGER NOT NULL, "replica_id" INTEGER NOT NULL, @@ -58,17 +104,6 @@ CREATE TABLE "project_collaborators" ( CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id"); CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" ON "project_collaborators" ("project_id", "replica_id"); -CREATE TABLE "worktrees" ( - "id" INTEGER NOT NULL, - "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, - "root_name" VARCHAR NOT NULL, - "visible" BOOL NOT NULL, - "scan_id" INTEGER NOT NULL, - "is_complete" BOOL NOT NULL, - PRIMARY KEY(project_id, id) -); -CREATE INDEX "index_worktrees_on_project_id" ON "worktrees" ("project_id"); - CREATE TABLE "room_participants" ( "id" INTEGER PRIMARY KEY, "room_id" INTEGER NOT NULL REFERENCES rooms (id), diff --git a/crates/collab/migrations/20221111092550_reconnection_support.sql b/crates/collab/migrations/20221111092550_reconnection_support.sql index 617e282a0a..a5b49ad763 100644 --- a/crates/collab/migrations/20221111092550_reconnection_support.sql +++ b/crates/collab/migrations/20221111092550_reconnection_support.sql @@ -20,14 +20,52 @@ CREATE TABLE "project_collaborators" ( CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id"); CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" ON "project_collaborators" ("project_id", "replica_id"); -CREATE TABLE IF NOT EXISTS "worktrees" ( +CREATE TABLE "worktrees" ( "id" INTEGER NOT NULL, - "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, + "project_id" INTEGER NOT NULL REFERENCES projects (id), "root_name" VARCHAR NOT NULL, + "abs_path" VARCHAR NOT NULL, + "visible" BOOL NOT NULL, + "scan_id" INTEGER NOT NULL, + "is_complete" BOOL NOT NULL, PRIMARY KEY(project_id, id) ); CREATE INDEX "index_worktrees_on_project_id" ON "worktrees" ("project_id"); +CREATE TABLE "worktree_entries" ( + "id" INTEGER NOT NULL, + "project_id" INTEGER NOT NULL REFERENCES projects (id), + "worktree_id" INTEGER NOT NULL REFERENCES worktrees (id), + "is_dir" BOOL NOT NULL, + "path" VARCHAR NOT NULL, + "inode" INTEGER NOT NULL, + "mtime_seconds" INTEGER NOT NULL, + "mtime_nanos" INTEGER NOT NULL, + "is_symlink" BOOL NOT NULL, + "is_ignored" BOOL NOT NULL, + PRIMARY KEY(project_id, worktree_id, id) +); +CREATE INDEX "index_worktree_entries_on_project_id_and_worktree_id" ON "worktree_entries" ("project_id", "worktree_id"); + +CREATE TABLE "worktree_diagnostic_summaries" ( + "path" VARCHAR NOT NULL, + "project_id" INTEGER NOT NULL REFERENCES projects (id), + "worktree_id" INTEGER NOT NULL REFERENCES worktrees (id), + "language_server_id" INTEGER NOT NULL, + "error_count" INTEGER NOT NULL, + "warning_count" INTEGER NOT NULL, + PRIMARY KEY(project_id, worktree_id, path) +); +CREATE INDEX "index_worktree_diagnostic_summaries_on_project_id_and_worktree_id" ON "worktree_diagnostic_summaries" ("project_id", "worktree_id"); + +CREATE TABLE "language_servers" ( + "id" INTEGER NOT NULL, + "project_id" INTEGER NOT NULL REFERENCES projects (id), + "name" VARCHAR NOT NULL, + PRIMARY KEY(project_id, id) +); +CREATE INDEX "index_language_servers_on_project_id" ON "language_servers" ("project_id"); + CREATE TABLE IF NOT EXISTS "room_participants" ( "id" SERIAL PRIMARY KEY, "room_id" INTEGER NOT NULL REFERENCES rooms (id), diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 88b6f20953..6db4ad101b 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -10,11 +10,7 @@ use sqlx::{ types::Uuid, FromRow, }; -use std::{ - future::Future, - path::{Path, PathBuf}, - time::Duration, -}; +use std::{future::Future, path::Path, time::Duration}; use time::{OffsetDateTime, PrimitiveDateTime}; #[cfg(test)] @@ -1443,13 +1439,17 @@ where for worktree in worktrees { sqlx::query( " - INSERT INTO worktrees (id, project_id, root_name) - VALUES ($1, $2, $3) + INSERT INTO worktrees (project_id, id, root_name, abs_path, visible, scan_id, is_complete) + VALUES ($1, $2, $3, $4, $5, $6, $7) ", ) - .bind(worktree.id as i32) .bind(project_id) + .bind(worktree.id as i32) .bind(&worktree.root_name) + .bind(&*String::from_utf8_lossy(&worktree.abs_path)) + .bind(worktree.visible) + .bind(0) + .bind(false) .execute(&mut tx) .await?; } @@ -1502,32 +1502,36 @@ where for worktree in worktrees { sqlx::query( " - INSERT INTO worktrees (project_id, id, root_name) - VALUES ($1, $2, $3) + INSERT INTO worktrees (project_id, id, root_name, abs_path, visible, scan_id, is_complete) + VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (project_id, id) DO UPDATE SET root_name = excluded.root_name ", ) .bind(project_id) .bind(worktree.id as i32) .bind(&worktree.root_name) + .bind(String::from_utf8_lossy(&worktree.abs_path).as_ref()) + .bind(worktree.visible) + .bind(0) + .bind(false) .execute(&mut tx) .await?; } - let mut params = "?,".repeat(worktrees.len()); + let mut params = "(?, ?),".repeat(worktrees.len()); if !worktrees.is_empty() { params.pop(); } let query = format!( " DELETE FROM worktrees - WHERE id NOT IN ({params}) + WHERE (project_id, id) NOT IN ({params}) ", ); let mut query = sqlx::query(&query); for worktree in worktrees { - query = query.bind(worktree.id as i32); + query = query.bind(project_id).bind(WorktreeId(worktree.id as i32)); } query.execute(&mut tx).await?; @@ -1556,7 +1560,7 @@ where &self, project_id: ProjectId, connection_id: ConnectionId, - ) -> Result<(Project, i32)> { + ) -> Result<(Project, ReplicaId)> { self.transact(|mut tx| async move { let (room_id, user_id) = sqlx::query_as::<_, (RoomId, UserId)>( " @@ -1574,7 +1578,7 @@ where " SELECT 1 FROM projects - WHERE project_id = $1 AND room_id = $2 + WHERE id = $1 AND room_id = $2 ", ) .bind(project_id) @@ -1582,9 +1586,9 @@ where .fetch_one(&mut tx) .await?; - let replica_ids = sqlx::query_scalar::<_, i32>( + let mut collaborators = sqlx::query_as::<_, ProjectCollaborator>( " - SELECT replica_id + SELECT * FROM project_collaborators WHERE project_id = $1 ", @@ -1592,11 +1596,21 @@ where .bind(project_id) .fetch_all(&mut tx) .await?; - let replica_ids = HashSet::from_iter(replica_ids); - let mut replica_id = 1; + let replica_ids = collaborators + .iter() + .map(|c| c.replica_id) + .collect::>(); + let mut replica_id = ReplicaId(1); while replica_ids.contains(&replica_id) { - replica_id += 1; + replica_id.0 += 1; } + let new_collaborator = ProjectCollaborator { + project_id, + connection_id: connection_id.0 as i32, + user_id, + replica_id, + is_host: false, + }; sqlx::query( " @@ -1610,51 +1624,140 @@ where VALUES ($1, $2, $3, $4, $5) ", ) - .bind(project_id) - .bind(connection_id.0 as i32) - .bind(user_id) - .bind(replica_id) - .bind(false) + .bind(new_collaborator.project_id) + .bind(new_collaborator.connection_id) + .bind(new_collaborator.user_id) + .bind(new_collaborator.replica_id) + .bind(new_collaborator.is_host) .execute(&mut tx) .await?; + collaborators.push(new_collaborator); + + let worktree_rows = sqlx::query_as::<_, WorktreeRow>( + " + SELECT * + FROM worktrees + WHERE project_id = $1 + ", + ) + .bind(project_id) + .fetch_all(&mut tx) + .await?; + let mut worktrees = worktree_rows + .into_iter() + .map(|worktree_row| { + ( + worktree_row.id, + Worktree { + id: worktree_row.id, + abs_path: worktree_row.abs_path, + root_name: worktree_row.root_name, + visible: worktree_row.visible, + entries: Default::default(), + diagnostic_summaries: Default::default(), + scan_id: worktree_row.scan_id as u64, + is_complete: worktree_row.is_complete, + }, + ) + }) + .collect::>(); + + let mut params = "(?, ?),".repeat(worktrees.len()); + if !worktrees.is_empty() { + params.pop(); + } + + // Populate worktree entries. + { + let query = format!( + " + SELECT * + FROM worktree_entries + WHERE (project_id, worktree_id) IN ({params}) + ", + ); + let mut entries = sqlx::query_as::<_, WorktreeEntry>(&query); + for worktree_id in worktrees.keys() { + entries = entries.bind(project_id).bind(*worktree_id); + } + let mut entries = entries.fetch(&mut tx); + while let Some(entry) = entries.next().await { + let entry = entry?; + if let Some(worktree) = worktrees.get_mut(&entry.worktree_id) { + worktree.entries.push(proto::Entry { + id: entry.id as u64, + is_dir: entry.is_dir, + path: entry.path.into_bytes(), + inode: entry.inode as u64, + mtime: Some(proto::Timestamp { + seconds: entry.mtime_seconds as u64, + nanos: entry.mtime_nanos as u32, + }), + is_symlink: entry.is_symlink, + is_ignored: entry.is_ignored, + }); + } + } + } + + // Populate worktree diagnostic summaries. + { + let query = format!( + " + SELECT * + FROM worktree_diagnostic_summaries + WHERE (project_id, worktree_id) IN ({params}) + ", + ); + let mut summaries = sqlx::query_as::<_, WorktreeDiagnosticSummary>(&query); + for worktree_id in worktrees.keys() { + summaries = summaries.bind(project_id).bind(*worktree_id); + } + let mut summaries = summaries.fetch(&mut tx); + while let Some(summary) = summaries.next().await { + let summary = summary?; + if let Some(worktree) = worktrees.get_mut(&summary.worktree_id) { + worktree + .diagnostic_summaries + .push(proto::DiagnosticSummary { + path: summary.path, + language_server_id: summary.language_server_id as u64, + error_count: summary.error_count as u32, + warning_count: summary.warning_count as u32, + }); + } + } + } + + // Populate language servers. + let language_servers = sqlx::query_as::<_, LanguageServer>( + " + SELECT * + FROM language_servers + WHERE project_id = $1 + ", + ) + .bind(project_id) + .fetch_all(&mut tx) + .await?; tx.commit().await?; - todo!() + Ok(( + Project { + collaborators, + worktrees, + language_servers: language_servers + .into_iter() + .map(|language_server| proto::LanguageServer { + id: language_server.id.to_proto(), + name: language_server.name, + }) + .collect(), + }, + replica_id as ReplicaId, + )) }) .await - // sqlx::query( - // " - // SELECT replica_id - // FROM project_collaborators - // WHERE project_id = $ - // ", - // ) - // .bind(project_id) - // .bind(connection_id.0 as i32) - // .bind(user_id) - // .bind(0) - // .bind(true) - // .execute(&mut tx) - // .await?; - // sqlx::query( - // " - // INSERT INTO project_collaborators ( - // project_id, - // connection_id, - // user_id, - // replica_id, - // is_host - // ) - // VALUES ($1, $2, $3, $4, $5) - // ", - // ) - // .bind(project_id) - // .bind(connection_id.0 as i32) - // .bind(user_id) - // .bind(0) - // .bind(true) - // .execute(&mut tx) - // .await?; } pub async fn unshare_project(&self, project_id: ProjectId) -> Result<()> { @@ -2089,32 +2192,72 @@ pub struct Room { id_type!(ProjectId); pub struct Project { - pub id: ProjectId, pub collaborators: Vec, - pub worktrees: BTreeMap, + pub worktrees: BTreeMap, pub language_servers: Vec, } +id_type!(ReplicaId); #[derive(Clone, Debug, Default, FromRow, PartialEq)] pub struct ProjectCollaborator { pub project_id: ProjectId, pub connection_id: i32, pub user_id: UserId, - pub replica_id: i32, + pub replica_id: ReplicaId, pub is_host: bool, } -#[derive(Default)] -pub struct Worktree { - pub abs_path: PathBuf, +id_type!(WorktreeId); +#[derive(Clone, Debug, Default, FromRow, PartialEq)] +struct WorktreeRow { + pub id: WorktreeId, + pub abs_path: String, pub root_name: String, pub visible: bool, - pub entries: BTreeMap, - pub diagnostic_summaries: BTreeMap, + pub scan_id: i64, + pub is_complete: bool, +} + +pub struct Worktree { + pub id: WorktreeId, + pub abs_path: String, + pub root_name: String, + pub visible: bool, + pub entries: Vec, + pub diagnostic_summaries: Vec, pub scan_id: u64, pub is_complete: bool, } +#[derive(Clone, Debug, Default, FromRow, PartialEq)] +struct WorktreeEntry { + id: i64, + worktree_id: WorktreeId, + is_dir: bool, + path: String, + inode: i64, + mtime_seconds: i64, + mtime_nanos: i32, + is_symlink: bool, + is_ignored: bool, +} + +#[derive(Clone, Debug, Default, FromRow, PartialEq)] +struct WorktreeDiagnosticSummary { + worktree_id: WorktreeId, + path: String, + language_server_id: i64, + error_count: i32, + warning_count: i32, +} + +id_type!(LanguageServerId); +#[derive(Clone, Debug, Default, FromRow, PartialEq)] +struct LanguageServer { + id: LanguageServerId, + name: String, +} + pub struct LeftProject { pub id: ProjectId, pub host_user_id: UserId, diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index b54f03ce53..1236af42cb 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -1,5 +1,5 @@ use crate::{ - db::{NewUserParams, ProjectId, SqliteTestDb as TestDb, UserId}, + db::{NewUserParams, SqliteTestDb as TestDb, UserId}, rpc::{Executor, Server}, AppState, }; @@ -2401,12 +2401,6 @@ async fn test_collaborating_with_diagnostics( // Wait for server to see the diagnostics update. deterministic.run_until_parked(); - { - let store = server.store.lock().await; - let project = store.project(ProjectId::from_proto(project_id)).unwrap(); - let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap(); - assert!(!worktree.diagnostic_summaries.is_empty()); - } // Ensure client B observes the new diagnostics. project_b.read_with(cx_b, |project, cx| { diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 02d8f25f38..3c7d4ec61b 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -42,7 +42,6 @@ use std::{ marker::PhantomData, net::SocketAddr, ops::{Deref, DerefMut}, - os::unix::prelude::OsStrExt, rc::Rc, sync::{ atomic::{AtomicBool, Ordering::SeqCst}, @@ -930,16 +929,8 @@ impl Server { ) -> Result<()> { let project_id = ProjectId::from_proto(request.payload.project_id); let guest_user_id = request.sender_user_id; - let host_user_id; - let host_connection_id; - { - let state = self.store().await; - let project = state.project(project_id)?; - host_user_id = project.host.user_id; - host_connection_id = project.host_connection_id; - }; - tracing::info!(%project_id, %host_user_id, %host_connection_id, "join project"); + tracing::info!(%project_id, "join project"); let (project, replica_id) = self .app_state @@ -952,7 +943,7 @@ impl Server { .iter() .map(|collaborator| proto::Collaborator { peer_id: collaborator.connection_id as u32, - replica_id: collaborator.replica_id as u32, + replica_id: collaborator.replica_id.0 as u32, user_id: collaborator.user_id.to_proto(), }) .collect::>(); @@ -960,10 +951,10 @@ impl Server { .worktrees .iter() .map(|(id, worktree)| proto::WorktreeMetadata { - id: *id, + id: id.to_proto(), root_name: worktree.root_name.clone(), visible: worktree.visible, - abs_path: worktree.abs_path.as_os_str().as_bytes().to_vec(), + abs_path: worktree.abs_path.as_bytes().to_vec(), }) .collect::>(); @@ -977,7 +968,7 @@ impl Server { project_id: project_id.to_proto(), collaborator: Some(proto::Collaborator { peer_id: request.sender_connection_id.0, - replica_id: replica_id as u32, + replica_id: replica_id.0 as u32, user_id: guest_user_id.to_proto(), }), }, @@ -989,12 +980,12 @@ impl Server { // First, we send the metadata associated with each worktree. response.send(proto::JoinProjectResponse { worktrees: worktrees.clone(), - replica_id: replica_id as u32, + replica_id: replica_id.0 as u32, collaborators: collaborators.clone(), language_servers: project.language_servers.clone(), })?; - for (worktree_id, worktree) in &project.worktrees { + for (worktree_id, worktree) in project.worktrees { #[cfg(any(test, feature = "test-support"))] const MAX_CHUNK_SIZE: usize = 2; #[cfg(not(any(test, feature = "test-support")))] @@ -1003,10 +994,10 @@ impl Server { // Stream this worktree's entries. let message = proto::UpdateWorktree { project_id: project_id.to_proto(), - worktree_id: *worktree_id, - abs_path: worktree.abs_path.as_os_str().as_bytes().to_vec(), - root_name: worktree.root_name.clone(), - updated_entries: worktree.entries.values().cloned().collect(), + worktree_id: worktree_id.to_proto(), + abs_path: worktree.abs_path.as_bytes().to_vec(), + root_name: worktree.root_name, + updated_entries: worktree.entries, removed_entries: Default::default(), scan_id: worktree.scan_id, is_last_update: worktree.is_complete, @@ -1017,13 +1008,13 @@ impl Server { } // Stream this worktree's diagnostics. - for summary in worktree.diagnostic_summaries.values() { + for summary in worktree.diagnostic_summaries { self.peer.send( request.sender_connection_id, proto::UpdateDiagnosticSummary { project_id: project_id.to_proto(), - worktree_id: *worktree_id, - summary: Some(summary.clone()), + worktree_id: worktree.id.to_proto(), + summary: Some(summary), }, )?; } diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 4be9354788..a93182d50b 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -294,49 +294,6 @@ impl Store { Err(anyhow!("no such project"))? } - pub fn join_project( - &mut self, - requester_connection_id: ConnectionId, - project_id: ProjectId, - ) -> Result<(&Project, ReplicaId)> { - let connection = self - .connections - .get_mut(&requester_connection_id) - .ok_or_else(|| anyhow!("no such connection"))?; - let user = self - .connected_users - .get(&connection.user_id) - .ok_or_else(|| anyhow!("no such connection"))?; - let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?; - anyhow::ensure!( - active_call.connection_id == Some(requester_connection_id), - "no such project" - ); - - let project = self - .projects - .get_mut(&project_id) - .ok_or_else(|| anyhow!("no such project"))?; - anyhow::ensure!(project.room_id == active_call.room_id, "no such project"); - - connection.projects.insert(project_id); - let mut replica_id = 1; - while project.active_replica_ids.contains(&replica_id) { - replica_id += 1; - } - project.active_replica_ids.insert(replica_id); - project.guests.insert( - requester_connection_id, - Collaborator { - replica_id, - user_id: connection.user_id, - admin: connection.admin, - }, - ); - - Ok((project, replica_id)) - } - pub fn leave_project( &mut self, project_id: ProjectId, @@ -409,12 +366,6 @@ impl Store { .connection_ids()) } - pub fn project(&self, project_id: ProjectId) -> Result<&Project> { - self.projects - .get(&project_id) - .ok_or_else(|| anyhow!("no such project")) - } - pub fn read_project( &self, project_id: ProjectId, diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index e688cad1f8..8aed5ef5cf 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -282,13 +282,6 @@ message UpdateWorktree { bytes abs_path = 8; } -message UpdateWorktreeExtensions { - uint64 project_id = 1; - uint64 worktree_id = 2; - repeated string extensions = 3; - repeated uint32 counts = 4; -} - message CreateProjectEntry { uint64 project_id = 1; uint64 worktree_id = 2;