Merge pull request #2011 from zed-industries/project-reconnection

Retain connection to remote projects when temporarily disconnected
This commit is contained in:
Nathan Sobo 2023-01-06 18:01:08 -07:00 committed by GitHub
commit 3cffee4065
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 3712 additions and 2106 deletions

3
Cargo.lock generated
View File

@ -820,8 +820,10 @@ dependencies = [
"async-broadcast",
"client",
"collections",
"fs",
"futures 0.3.25",
"gpui",
"language",
"live_kit_client",
"log",
"media",
@ -4450,6 +4452,7 @@ dependencies = [
"aho-corasick",
"anyhow",
"async-trait",
"backtrace",
"client",
"clock",
"collections",

View File

@ -23,6 +23,8 @@ collections = { path = "../collections" }
gpui = { path = "../gpui" }
log = "0.4"
live_kit_client = { path = "../live_kit_client" }
fs = { path = "../fs" }
language = { path = "../language" }
media = { path = "../media" }
project = { path = "../project" }
util = { path = "../util" }
@ -34,6 +36,8 @@ postage = { version = "0.4.1", features = ["futures-traits"] }
[dev-dependencies]
client = { path = "../client", features = ["test-support"] }
fs = { path = "../fs", features = ["test-support"] }
language = { path = "../language", features = ["test-support"] }
collections = { path = "../collections", features = ["test-support"] }
gpui = { path = "../gpui", features = ["test-support"] }
live_kit_client = { path = "../live_kit_client", features = ["test-support"] }

View File

@ -7,11 +7,13 @@ use client::{
proto::{self, PeerId},
Client, TypedEnvelope, User, UserStore,
};
use collections::{BTreeMap, HashSet};
use collections::{BTreeMap, HashMap, HashSet};
use fs::Fs;
use futures::{FutureExt, StreamExt};
use gpui::{
AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
};
use language::LanguageRegistry;
use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
use postage::stream::Stream;
use project::Project;
@ -43,6 +45,8 @@ pub struct Room {
id: u64,
live_kit: Option<LiveKitRoom>,
status: RoomStatus,
shared_projects: HashSet<WeakModelHandle<Project>>,
joined_projects: HashSet<WeakModelHandle<Project>>,
local_participant: LocalParticipant,
remote_participants: BTreeMap<u64, RemoteParticipant>,
pending_participants: Vec<Arc<User>>,
@ -62,7 +66,7 @@ impl Entity for Room {
fn release(&mut self, _: &mut MutableAppContext) {
if self.status.is_online() {
log::info!("room was released, sending leave message");
self.client.send(proto::LeaveRoom {}).log_err();
let _ = self.client.send(proto::LeaveRoom {});
}
}
}
@ -132,6 +136,8 @@ impl Room {
id,
live_kit: live_kit_room,
status: RoomStatus::Online,
shared_projects: Default::default(),
joined_projects: Default::default(),
participant_user_ids: Default::default(),
local_participant: Default::default(),
remote_participants: Default::default(),
@ -234,6 +240,22 @@ impl Room {
cx.notify();
cx.emit(Event::Left);
log::info!("leaving room");
for project in self.shared_projects.drain() {
if let Some(project) = project.upgrade(cx) {
project.update(cx, |project, cx| {
project.unshare(cx).log_err();
});
}
}
for project in self.joined_projects.drain() {
if let Some(project) = project.upgrade(cx) {
project.update(cx, |project, cx| {
project.disconnected_from_host(cx);
});
}
}
self.status = RoomStatus::Offline;
self.remote_participants.clear();
self.pending_participants.clear();
@ -257,16 +279,15 @@ impl Room {
.next()
.await
.map_or(false, |s| s.is_connected());
// Even if we're initially connected, any future change of the status means we momentarily disconnected.
if !is_connected || client_status.next().await.is_some() {
log::info!("detected client disconnection");
let room_id = this
.upgrade(&cx)
this.upgrade(&cx)
.ok_or_else(|| anyhow!("room was dropped"))?
.update(&mut cx, |this, cx| {
this.status = RoomStatus::Rejoining;
cx.notify();
this.id
});
// Wait for client to re-establish a connection to the server.
@ -279,32 +300,22 @@ impl Room {
"waiting for client status change, remaining attempts {}",
remaining_attempts
);
if let Some(status) = client_status.next().await {
let Some(status) = client_status.next().await else { break };
if status.is_connected() {
log::info!("client reconnected, attempting to rejoin room");
let rejoin_room = async {
let response =
client.request(proto::JoinRoom { id: room_id }).await?;
let room_proto =
response.room.ok_or_else(|| anyhow!("invalid room"))?;
this.upgrade(&cx)
.ok_or_else(|| anyhow!("room was dropped"))?
.update(&mut cx, |this, cx| {
this.status = RoomStatus::Online;
this.apply_room_update(room_proto, cx)
})?;
anyhow::Ok(())
};
if rejoin_room.await.log_err().is_some() {
let Some(this) = this.upgrade(&cx) else { break };
if this
.update(&mut cx, |this, cx| this.rejoin(cx))
.await
.log_err()
.is_some()
{
return true;
} else {
remaining_attempts -= 1;
}
}
} else {
return false;
}
}
false
}
@ -340,6 +351,82 @@ impl Room {
}
}
fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
let mut projects = HashMap::default();
let mut reshared_projects = Vec::new();
let mut rejoined_projects = Vec::new();
self.shared_projects.retain(|project| {
if let Some(handle) = project.upgrade(cx) {
let project = handle.read(cx);
if let Some(project_id) = project.remote_id() {
projects.insert(project_id, handle.clone());
reshared_projects.push(proto::UpdateProject {
project_id,
worktrees: project.worktree_metadata_protos(cx),
});
return true;
}
}
false
});
self.joined_projects.retain(|project| {
if let Some(handle) = project.upgrade(cx) {
let project = handle.read(cx);
if let Some(project_id) = project.remote_id() {
projects.insert(project_id, handle.clone());
rejoined_projects.push(proto::RejoinProject {
id: project_id,
worktrees: project
.worktrees(cx)
.map(|worktree| {
let worktree = worktree.read(cx);
proto::RejoinWorktree {
id: worktree.id().to_proto(),
scan_id: worktree.completed_scan_id() as u64,
}
})
.collect(),
});
}
return true;
}
false
});
let response = self.client.request(proto::RejoinRoom {
id: self.id,
reshared_projects,
rejoined_projects,
});
cx.spawn(|this, mut cx| async move {
let response = response.await?;
let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
this.update(&mut cx, |this, cx| {
this.status = RoomStatus::Online;
this.apply_room_update(room_proto, cx)?;
for reshared_project in response.reshared_projects {
if let Some(project) = projects.get(&reshared_project.id) {
project.update(cx, |project, cx| {
project.reshared(reshared_project, cx).log_err();
});
}
}
for rejoined_project in response.rejoined_projects {
if let Some(project) = projects.get(&rejoined_project.id) {
project.update(cx, |project, cx| {
project.rejoined(rejoined_project, cx).log_err();
});
}
}
anyhow::Ok(())
})
})
}
pub fn id(&self) -> u64 {
self.id
}
@ -454,6 +541,20 @@ impl Room {
}
for unshared_project_id in old_projects.difference(&new_projects) {
this.joined_projects.retain(|project| {
if let Some(project) = project.upgrade(cx) {
project.update(cx, |project, cx| {
if project.remote_id() == Some(*unshared_project_id) {
project.disconnected_from_host(cx);
false
} else {
true
}
})
} else {
false
}
});
cx.emit(Event::RemoteProjectUnshared {
project_id: *unshared_project_id,
});
@ -630,6 +731,32 @@ impl Room {
})
}
pub fn join_project(
&mut self,
id: u64,
language_registry: Arc<LanguageRegistry>,
fs: Arc<dyn Fs>,
cx: &mut ModelContext<Self>,
) -> Task<Result<ModelHandle<Project>>> {
let client = self.client.clone();
let user_store = self.user_store.clone();
cx.spawn(|this, mut cx| async move {
let project =
Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
this.update(&mut cx, |this, cx| {
this.joined_projects.retain(|project| {
if let Some(project) = project.upgrade(cx) {
!project.read(cx).is_read_only()
} else {
false
}
});
this.joined_projects.insert(project.downgrade());
});
Ok(project)
})
}
pub(crate) fn share_project(
&mut self,
project: ModelHandle<Project>,
@ -641,31 +768,18 @@ impl Room {
let request = self.client.request(proto::ShareProject {
room_id: self.id(),
worktrees: project
.read(cx)
.worktrees(cx)
.map(|worktree| {
let worktree = worktree.read(cx);
proto::WorktreeMetadata {
id: worktree.id().to_proto(),
root_name: worktree.root_name().into(),
visible: worktree.is_visible(),
abs_path: worktree.abs_path().to_string_lossy().into(),
}
})
.collect(),
worktrees: project.read(cx).worktree_metadata_protos(cx),
});
cx.spawn(|this, mut cx| async move {
let response = request.await?;
project.update(&mut cx, |project, cx| {
project
.shared(response.project_id, cx)
.detach_and_log_err(cx)
});
project.shared(response.project_id, cx)
})?;
// If the user's location is in this project, it changes from UnsharedProject to SharedProject.
this.update(&mut cx, |this, cx| {
this.shared_projects.insert(project.downgrade());
let active_project = this.local_participant.active_project.as_ref();
if active_project.map_or(false, |location| *location == project) {
this.set_location(Some(&project), cx)

View File

@ -1235,6 +1235,7 @@ impl Client {
subscriber
} else {
log::info!("unhandled message {}", type_name);
self.peer.respond_with_unhandled_message(message).log_err();
return;
};
@ -1278,6 +1279,7 @@ impl Client {
.detach();
} else {
log::info!("unhandled message {}", type_name);
self.peer.respond_with_unhandled_message(message).log_err();
}
}

View File

@ -57,7 +57,7 @@ CREATE TABLE "worktrees" (
"abs_path" VARCHAR NOT NULL,
"visible" BOOL NOT NULL,
"scan_id" INTEGER NOT NULL,
"is_complete" BOOL NOT NULL,
"completed_scan_id" INTEGER NOT NULL,
PRIMARY KEY(project_id, id)
);
CREATE INDEX "index_worktrees_on_project_id" ON "worktrees" ("project_id");
@ -65,6 +65,7 @@ CREATE INDEX "index_worktrees_on_project_id" ON "worktrees" ("project_id");
CREATE TABLE "worktree_entries" (
"project_id" INTEGER NOT NULL,
"worktree_id" INTEGER NOT NULL,
"scan_id" INTEGER NOT NULL,
"id" INTEGER NOT NULL,
"is_dir" BOOL NOT NULL,
"path" VARCHAR NOT NULL,
@ -73,6 +74,7 @@ CREATE TABLE "worktree_entries" (
"mtime_nanos" INTEGER NOT NULL,
"is_symlink" BOOL NOT NULL,
"is_ignored" BOOL NOT NULL,
"is_deleted" BOOL NOT NULL,
PRIMARY KEY(project_id, worktree_id, id),
FOREIGN KEY(project_id, worktree_id) REFERENCES worktrees (project_id, id) ON DELETE CASCADE
);

View File

@ -0,0 +1,3 @@
ALTER TABLE "worktree_entries"
ADD COLUMN "scan_id" INT8,
ADD COLUMN "is_deleted" BOOL;

View File

@ -0,0 +1,3 @@
ALTER TABLE worktrees
DROP COLUMN is_complete,
ADD COLUMN completed_scan_id INT8;

View File

@ -123,34 +123,6 @@ impl Database {
.await
}
pub async fn delete_stale_projects(
&self,
environment: &str,
new_server_id: ServerId,
) -> Result<()> {
self.transaction(|tx| async move {
let stale_server_epochs = self
.stale_server_ids(environment, new_server_id, &tx)
.await?;
project_collaborator::Entity::delete_many()
.filter(
project_collaborator::Column::ConnectionServerId
.is_in(stale_server_epochs.iter().copied()),
)
.exec(&*tx)
.await?;
project::Entity::delete_many()
.filter(
project::Column::HostConnectionServerId
.is_in(stale_server_epochs.iter().copied()),
)
.exec(&*tx)
.await?;
Ok(())
})
.await
}
pub async fn stale_room_ids(
&self,
environment: &str,
@ -235,8 +207,8 @@ impl Database {
pub async fn delete_stale_servers(
&self,
new_server_id: ServerId,
environment: &str,
new_server_id: ServerId,
) -> Result<()> {
self.transaction(|tx| async move {
server::Entity::delete_many()
@ -1319,15 +1291,7 @@ impl Database {
Condition::all()
.add(room_participant::Column::RoomId.eq(room_id))
.add(room_participant::Column::UserId.eq(user_id))
.add(
Condition::any()
.add(room_participant::Column::AnsweringConnectionId.is_null())
.add(room_participant::Column::AnsweringConnectionLost.eq(true))
.add(
room_participant::Column::AnsweringConnectionServerId
.ne(connection.owner_id as i32),
),
),
.add(room_participant::Column::AnsweringConnectionId.is_null()),
)
.set(room_participant::ActiveModel {
answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
@ -1349,6 +1313,245 @@ impl Database {
.await
}
pub async fn rejoin_room(
&self,
rejoin_room: proto::RejoinRoom,
user_id: UserId,
connection: ConnectionId,
) -> Result<RoomGuard<RejoinedRoom>> {
self.room_transaction(|tx| async {
let tx = tx;
let room_id = RoomId::from_proto(rejoin_room.id);
let participant_update = room_participant::Entity::update_many()
.filter(
Condition::all()
.add(room_participant::Column::RoomId.eq(room_id))
.add(room_participant::Column::UserId.eq(user_id))
.add(room_participant::Column::AnsweringConnectionId.is_not_null())
.add(
Condition::any()
.add(room_participant::Column::AnsweringConnectionLost.eq(true))
.add(
room_participant::Column::AnsweringConnectionServerId
.ne(connection.owner_id as i32),
),
),
)
.set(room_participant::ActiveModel {
answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
answering_connection_server_id: ActiveValue::set(Some(ServerId(
connection.owner_id as i32,
))),
answering_connection_lost: ActiveValue::set(false),
..Default::default()
})
.exec(&*tx)
.await?;
if participant_update.rows_affected == 0 {
return Err(anyhow!("room does not exist or was already joined"))?;
}
let mut reshared_projects = Vec::new();
for reshared_project in &rejoin_room.reshared_projects {
let project_id = ProjectId::from_proto(reshared_project.project_id);
let project = project::Entity::find_by_id(project_id)
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("project does not exist"))?;
if project.host_user_id != user_id {
return Err(anyhow!("no such project"))?;
}
let mut collaborators = project
.find_related(project_collaborator::Entity)
.all(&*tx)
.await?;
let host_ix = collaborators
.iter()
.position(|collaborator| {
collaborator.user_id == user_id && collaborator.is_host
})
.ok_or_else(|| anyhow!("host not found among collaborators"))?;
let host = collaborators.swap_remove(host_ix);
let old_connection_id = host.connection();
project::Entity::update(project::ActiveModel {
host_connection_id: ActiveValue::set(Some(connection.id as i32)),
host_connection_server_id: ActiveValue::set(Some(ServerId(
connection.owner_id as i32,
))),
..project.into_active_model()
})
.exec(&*tx)
.await?;
project_collaborator::Entity::update(project_collaborator::ActiveModel {
connection_id: ActiveValue::set(connection.id as i32),
connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
..host.into_active_model()
})
.exec(&*tx)
.await?;
self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
.await?;
reshared_projects.push(ResharedProject {
id: project_id,
old_connection_id,
collaborators: collaborators
.iter()
.map(|collaborator| ProjectCollaborator {
connection_id: collaborator.connection(),
user_id: collaborator.user_id,
replica_id: collaborator.replica_id,
is_host: collaborator.is_host,
})
.collect(),
worktrees: reshared_project.worktrees.clone(),
});
}
project::Entity::delete_many()
.filter(
Condition::all()
.add(project::Column::RoomId.eq(room_id))
.add(project::Column::HostUserId.eq(user_id))
.add(
project::Column::Id
.is_not_in(reshared_projects.iter().map(|project| project.id)),
),
)
.exec(&*tx)
.await?;
let mut rejoined_projects = Vec::new();
for rejoined_project in &rejoin_room.rejoined_projects {
let project_id = ProjectId::from_proto(rejoined_project.id);
let Some(project) = project::Entity::find_by_id(project_id)
.one(&*tx)
.await? else { continue };
let mut worktrees = Vec::new();
let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
for db_worktree in db_worktrees {
let mut worktree = RejoinedWorktree {
id: db_worktree.id as u64,
abs_path: db_worktree.abs_path,
root_name: db_worktree.root_name,
visible: db_worktree.visible,
updated_entries: Default::default(),
removed_entries: Default::default(),
diagnostic_summaries: Default::default(),
scan_id: db_worktree.scan_id as u64,
completed_scan_id: db_worktree.completed_scan_id as u64,
};
let rejoined_worktree = rejoined_project
.worktrees
.iter()
.find(|worktree| worktree.id == db_worktree.id as u64);
let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
} else {
worktree_entry::Column::IsDeleted.eq(false)
};
let mut db_entries = worktree_entry::Entity::find()
.filter(
Condition::all()
.add(worktree_entry::Column::WorktreeId.eq(worktree.id))
.add(entry_filter),
)
.stream(&*tx)
.await?;
while let Some(db_entry) = db_entries.next().await {
let db_entry = db_entry?;
if db_entry.is_deleted {
worktree.removed_entries.push(db_entry.id as u64);
} else {
worktree.updated_entries.push(proto::Entry {
id: db_entry.id as u64,
is_dir: db_entry.is_dir,
path: db_entry.path,
inode: db_entry.inode as u64,
mtime: Some(proto::Timestamp {
seconds: db_entry.mtime_seconds as u64,
nanos: db_entry.mtime_nanos as u32,
}),
is_symlink: db_entry.is_symlink,
is_ignored: db_entry.is_ignored,
});
}
}
worktrees.push(worktree);
}
let language_servers = project
.find_related(language_server::Entity)
.all(&*tx)
.await?
.into_iter()
.map(|language_server| proto::LanguageServer {
id: language_server.id as u64,
name: language_server.name,
})
.collect::<Vec<_>>();
let mut collaborators = project
.find_related(project_collaborator::Entity)
.all(&*tx)
.await?;
let self_collaborator = if let Some(self_collaborator_ix) = collaborators
.iter()
.position(|collaborator| collaborator.user_id == user_id)
{
collaborators.swap_remove(self_collaborator_ix)
} else {
continue;
};
let old_connection_id = self_collaborator.connection();
project_collaborator::Entity::update(project_collaborator::ActiveModel {
connection_id: ActiveValue::set(connection.id as i32),
connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
..self_collaborator.into_active_model()
})
.exec(&*tx)
.await?;
let collaborators = collaborators
.into_iter()
.map(|collaborator| ProjectCollaborator {
connection_id: collaborator.connection(),
user_id: collaborator.user_id,
replica_id: collaborator.replica_id,
is_host: collaborator.is_host,
})
.collect::<Vec<_>>();
rejoined_projects.push(RejoinedProject {
id: project_id,
old_connection_id,
collaborators,
worktrees,
language_servers,
});
}
let room = self.get_room(room_id, &tx).await?;
Ok((
room_id,
RejoinedRoom {
room,
rejoined_projects,
reshared_projects,
},
))
})
.await
}
pub async fn leave_room(
&self,
connection: ConnectionId,
@ -1445,10 +1648,7 @@ impl Database {
host_connection_id: Default::default(),
});
let collaborator_connection_id = ConnectionId {
owner_id: collaborator.connection_server_id.0 as u32,
id: collaborator.connection_id as u32,
};
let collaborator_connection_id = collaborator.connection();
if collaborator_connection_id != connection {
left_project.connection_ids.push(collaborator_connection_id);
}
@ -1572,11 +1772,8 @@ impl Database {
.await
}
pub async fn connection_lost(
&self,
connection: ConnectionId,
) -> Result<RoomGuard<Vec<LeftProject>>> {
self.room_transaction(|tx| async move {
pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
self.transaction(|tx| async move {
let participant = room_participant::Entity::find()
.filter(
Condition::all()
@ -1592,7 +1789,6 @@ impl Database {
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("not a participant in any room"))?;
let room_id = participant.room_id;
room_participant::Entity::update(room_participant::ActiveModel {
answering_connection_lost: ActiveValue::set(true),
@ -1601,66 +1797,7 @@ impl Database {
.exec(&*tx)
.await?;
let collaborator_on_projects = project_collaborator::Entity::find()
.find_also_related(project::Entity)
.filter(
Condition::all()
.add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
.add(
project_collaborator::Column::ConnectionServerId
.eq(connection.owner_id as i32),
),
)
.all(&*tx)
.await?;
project_collaborator::Entity::delete_many()
.filter(
Condition::all()
.add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
.add(
project_collaborator::Column::ConnectionServerId
.eq(connection.owner_id as i32),
),
)
.exec(&*tx)
.await?;
let mut left_projects = Vec::new();
for (_, project) in collaborator_on_projects {
if let Some(project) = project {
let collaborators = project
.find_related(project_collaborator::Entity)
.all(&*tx)
.await?;
let connection_ids = collaborators
.into_iter()
.map(|collaborator| ConnectionId {
id: collaborator.connection_id as u32,
owner_id: collaborator.connection_server_id.0 as u32,
})
.collect();
left_projects.push(LeftProject {
id: project.id,
host_user_id: project.host_user_id,
host_connection_id: project.host_connection()?,
connection_ids,
});
}
}
project::Entity::delete_many()
.filter(
Condition::all()
.add(project::Column::HostConnectionId.eq(connection.id as i32))
.add(
project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
),
)
.exec(&*tx)
.await?;
Ok((room_id, left_projects))
Ok(())
})
.await
}
@ -1860,7 +1997,7 @@ impl Database {
root_name: ActiveValue::set(worktree.root_name.clone()),
visible: ActiveValue::set(worktree.visible),
scan_id: ActiveValue::set(0),
is_complete: ActiveValue::set(false),
completed_scan_id: ActiveValue::set(0),
}
}))
.exec(&*tx)
@ -1930,17 +2067,31 @@ impl Database {
.await?
.ok_or_else(|| anyhow!("no such project"))?;
self.update_project_worktrees(project.id, worktrees, &tx)
.await?;
let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
let room = self.get_room(project.room_id, &tx).await?;
Ok((project.room_id, (room, guest_connection_ids)))
})
.await
}
async fn update_project_worktrees(
&self,
project_id: ProjectId,
worktrees: &[proto::WorktreeMetadata],
tx: &DatabaseTransaction,
) -> Result<()> {
if !worktrees.is_empty() {
worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
worktree::ActiveModel {
worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
id: ActiveValue::set(worktree.id as i64),
project_id: ActiveValue::set(project.id),
project_id: ActiveValue::set(project_id),
abs_path: ActiveValue::set(worktree.abs_path.clone()),
root_name: ActiveValue::set(worktree.root_name.clone()),
visible: ActiveValue::set(worktree.visible),
scan_id: ActiveValue::set(0),
is_complete: ActiveValue::set(false),
}
completed_scan_id: ActiveValue::set(0),
}))
.on_conflict(
OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
@ -1952,20 +2103,13 @@ impl Database {
}
worktree::Entity::delete_many()
.filter(
worktree::Column::ProjectId.eq(project.id).and(
worktree::Column::Id
.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
),
)
.filter(worktree::Column::ProjectId.eq(project_id).and(
worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
))
.exec(&*tx)
.await?;
let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
let room = self.get_room(project.room_id, &tx).await?;
Ok((project.room_id, (room, guest_connection_ids)))
})
.await
Ok(())
}
pub async fn update_worktree(
@ -1997,7 +2141,11 @@ impl Database {
project_id: ActiveValue::set(project_id),
root_name: ActiveValue::set(update.root_name.clone()),
scan_id: ActiveValue::set(update.scan_id as i64),
is_complete: ActiveValue::set(update.is_last_update),
completed_scan_id: if update.is_last_update {
ActiveValue::set(update.scan_id as i64)
} else {
ActiveValue::default()
},
abs_path: ActiveValue::set(update.abs_path.clone()),
..Default::default()
})
@ -2018,6 +2166,8 @@ impl Database {
mtime_nanos: ActiveValue::set(mtime.nanos as i32),
is_symlink: ActiveValue::set(entry.is_symlink),
is_ignored: ActiveValue::set(entry.is_ignored),
is_deleted: ActiveValue::set(false),
scan_id: ActiveValue::set(update.scan_id as i64),
}
}))
.on_conflict(
@ -2034,6 +2184,7 @@ impl Database {
worktree_entry::Column::MtimeNanos,
worktree_entry::Column::IsSymlink,
worktree_entry::Column::IsIgnored,
worktree_entry::Column::ScanId,
])
.to_owned(),
)
@ -2042,7 +2193,7 @@ impl Database {
}
if !update.removed_entries.is_empty() {
worktree_entry::Entity::delete_many()
worktree_entry::Entity::update_many()
.filter(
worktree_entry::Column::ProjectId
.eq(project_id)
@ -2052,6 +2203,11 @@ impl Database {
.is_in(update.removed_entries.iter().map(|id| *id as i64)),
),
)
.set(worktree_entry::ActiveModel {
is_deleted: ActiveValue::Set(true),
scan_id: ActiveValue::Set(update.scan_id as i64),
..Default::default()
})
.exec(&*tx)
.await?;
}
@ -2230,7 +2386,7 @@ impl Database {
entries: Default::default(),
diagnostic_summaries: Default::default(),
scan_id: db_worktree.scan_id as u64,
is_complete: db_worktree.is_complete,
completed_scan_id: db_worktree.completed_scan_id as u64,
},
)
})
@ -2239,7 +2395,11 @@ impl Database {
// Populate worktree entries.
{
let mut db_entries = worktree_entry::Entity::find()
.filter(worktree_entry::Column::ProjectId.eq(project_id))
.filter(
Condition::all()
.add(worktree_entry::Column::ProjectId.eq(project_id))
.add(worktree_entry::Column::IsDeleted.eq(false)),
)
.stream(&*tx)
.await?;
while let Some(db_entry) = db_entries.next().await {
@ -2290,7 +2450,15 @@ impl Database {
let room_id = project.room_id;
let project = Project {
collaborators,
collaborators: collaborators
.into_iter()
.map(|collaborator| ProjectCollaborator {
connection_id: collaborator.connection(),
user_id: collaborator.user_id,
replica_id: collaborator.replica_id,
is_host: collaborator.is_host,
})
.collect(),
worktrees,
language_servers: language_servers
.into_iter()
@ -2337,10 +2505,7 @@ impl Database {
.await?;
let connection_ids = collaborators
.into_iter()
.map(|collaborator| ConnectionId {
owner_id: collaborator.connection_server_id.0 as u32,
id: collaborator.connection_id as u32,
})
.map(|collaborator| collaborator.connection())
.collect();
let left_project = LeftProject {
@ -2357,8 +2522,8 @@ impl Database {
pub async fn project_collaborators(
&self,
project_id: ProjectId,
connection: ConnectionId,
) -> Result<RoomGuard<Vec<project_collaborator::Model>>> {
connection_id: ConnectionId,
) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
self.room_transaction(|tx| async move {
let project = project::Entity::find_by_id(project_id)
.one(&*tx)
@ -2367,15 +2532,20 @@ impl Database {
let collaborators = project_collaborator::Entity::find()
.filter(project_collaborator::Column::ProjectId.eq(project_id))
.all(&*tx)
.await?;
.await?
.into_iter()
.map(|collaborator| ProjectCollaborator {
connection_id: collaborator.connection(),
user_id: collaborator.user_id,
replica_id: collaborator.replica_id,
is_host: collaborator.is_host,
})
.collect::<Vec<_>>();
if collaborators.iter().any(|collaborator| {
let collaborator_connection = ConnectionId {
owner_id: collaborator.connection_server_id.0 as u32,
id: collaborator.connection_id as u32,
};
collaborator_connection == connection
}) {
if collaborators
.iter()
.any(|collaborator| collaborator.connection_id == connection_id)
{
Ok((project.room_id, collaborators))
} else {
Err(anyhow!("no such project"))?
@ -2394,18 +2564,15 @@ impl Database {
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no such project"))?;
let mut participants = project_collaborator::Entity::find()
let mut collaborators = project_collaborator::Entity::find()
.filter(project_collaborator::Column::ProjectId.eq(project_id))
.stream(&*tx)
.await?;
let mut connection_ids = HashSet::default();
while let Some(participant) = participants.next().await {
let participant = participant?;
connection_ids.insert(ConnectionId {
owner_id: participant.connection_server_id.0 as u32,
id: participant.connection_id as u32,
});
while let Some(collaborator) = collaborators.next().await {
let collaborator = collaborator?;
connection_ids.insert(collaborator.connection());
}
if connection_ids.contains(&connection_id) {
@ -2422,7 +2589,7 @@ impl Database {
project_id: ProjectId,
tx: &DatabaseTransaction,
) -> Result<Vec<ConnectionId>> {
let mut participants = project_collaborator::Entity::find()
let mut collaborators = project_collaborator::Entity::find()
.filter(
project_collaborator::Column::ProjectId
.eq(project_id)
@ -2432,12 +2599,9 @@ impl Database {
.await?;
let mut guest_connection_ids = Vec::new();
while let Some(participant) = participants.next().await {
let participant = participant?;
guest_connection_ids.push(ConnectionId {
owner_id: participant.connection_server_id.0 as u32,
id: participant.connection_id as u32,
});
while let Some(collaborator) = collaborators.next().await {
let collaborator = collaborator?;
guest_connection_ids.push(collaborator.connection());
}
Ok(guest_connection_ids)
}
@ -2849,6 +3013,40 @@ id_type!(ServerId);
id_type!(SignupId);
id_type!(UserId);
pub struct RejoinedRoom {
pub room: proto::Room,
pub rejoined_projects: Vec<RejoinedProject>,
pub reshared_projects: Vec<ResharedProject>,
}
pub struct ResharedProject {
pub id: ProjectId,
pub old_connection_id: ConnectionId,
pub collaborators: Vec<ProjectCollaborator>,
pub worktrees: Vec<proto::WorktreeMetadata>,
}
pub struct RejoinedProject {
pub id: ProjectId,
pub old_connection_id: ConnectionId,
pub collaborators: Vec<ProjectCollaborator>,
pub worktrees: Vec<RejoinedWorktree>,
pub language_servers: Vec<proto::LanguageServer>,
}
#[derive(Debug)]
pub struct RejoinedWorktree {
pub id: u64,
pub abs_path: String,
pub root_name: String,
pub visible: bool,
pub updated_entries: Vec<proto::Entry>,
pub removed_entries: Vec<u64>,
pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
pub scan_id: u64,
pub completed_scan_id: u64,
}
pub struct LeftRoom {
pub room: proto::Room,
pub left_projects: HashMap<ProjectId, LeftProject>,
@ -2862,11 +3060,29 @@ pub struct RefreshedRoom {
}
pub struct Project {
pub collaborators: Vec<project_collaborator::Model>,
pub collaborators: Vec<ProjectCollaborator>,
pub worktrees: BTreeMap<u64, Worktree>,
pub language_servers: Vec<proto::LanguageServer>,
}
pub struct ProjectCollaborator {
pub connection_id: ConnectionId,
pub user_id: UserId,
pub replica_id: ReplicaId,
pub is_host: bool,
}
impl ProjectCollaborator {
pub fn to_proto(&self) -> proto::Collaborator {
proto::Collaborator {
peer_id: Some(self.connection_id.into()),
replica_id: self.replica_id.0 as u32,
user_id: self.user_id.to_proto(),
}
}
}
#[derive(Debug)]
pub struct LeftProject {
pub id: ProjectId,
pub host_user_id: UserId,
@ -2882,7 +3098,7 @@ pub struct Worktree {
pub entries: Vec<proto::Entry>,
pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
pub scan_id: u64,
pub is_complete: bool,
pub completed_scan_id: u64,
}
#[cfg(test)]

View File

@ -1,4 +1,5 @@
use super::{ProjectCollaboratorId, ProjectId, ReplicaId, ServerId, UserId};
use rpc::ConnectionId;
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
@ -14,6 +15,15 @@ pub struct Model {
pub is_host: bool,
}
impl Model {
pub fn connection(&self) -> ConnectionId {
ConnectionId {
owner_id: self.connection_server_id.0 as u32,
id: self.connection_id as u32,
}
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(

View File

@ -11,8 +11,10 @@ pub struct Model {
pub abs_path: String,
pub root_name: String,
pub visible: bool,
/// The last scan for which we've observed entries. It may be in progress.
pub scan_id: i64,
pub is_complete: bool,
/// The last scan that fully completed.
pub completed_scan_id: i64,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -17,6 +17,8 @@ pub struct Model {
pub mtime_nanos: i32,
pub is_symlink: bool,
pub is_ignored: bool,
pub is_deleted: bool,
pub scan_id: i64,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -33,4 +33,12 @@ impl Executor {
}
}
}
pub fn record_backtrace(&self) {
match self {
Executor::Production => {}
#[cfg(test)]
Executor::Deterministic(background) => background.record_backtrace(),
}
}
}

View File

@ -3,10 +3,11 @@ pub mod auth;
pub mod db;
pub mod env;
pub mod executor;
#[cfg(test)]
mod integration_tests;
pub mod rpc;
#[cfg(test)]
mod tests;
use axum::{http::StatusCode, response::IntoResponse};
use db::Database;
use serde::Deserialize;

View File

@ -95,6 +95,7 @@ struct Session {
peer: Arc<Peer>,
connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
executor: Executor,
}
impl Session {
@ -184,6 +185,7 @@ impl Server {
.add_request_handler(ping)
.add_request_handler(create_room)
.add_request_handler(join_room)
.add_request_handler(rejoin_room)
.add_message_handler(leave_room)
.add_request_handler(call)
.add_request_handler(cancel_call)
@ -215,6 +217,7 @@ impl Server {
.add_request_handler(forward_project_request::<proto::PrepareRename>)
.add_request_handler(forward_project_request::<proto::PerformRename>)
.add_request_handler(forward_project_request::<proto::ReloadBuffers>)
.add_request_handler(forward_project_request::<proto::SynchronizeBuffers>)
.add_request_handler(forward_project_request::<proto::FormatBuffers>)
.add_request_handler(forward_project_request::<proto::CreateProjectEntry>)
.add_request_handler(forward_project_request::<proto::RenameProjectEntry>)
@ -249,16 +252,6 @@ impl Server {
let live_kit_client = self.app_state.live_kit_client.clone();
let span = info_span!("start server");
let span_enter = span.enter();
tracing::info!("begin deleting stale projects");
app_state
.db
.delete_stale_projects(&app_state.config.zed_environment, server_id)
.await?;
tracing::info!("finish deleting stale projects");
drop(span_enter);
self.executor.spawn_detached(
async move {
tracing::info!("waiting for cleanup timeout");
@ -354,7 +347,7 @@ impl Server {
app_state
.db
.delete_stale_servers(server_id, &app_state.config.zed_environment)
.delete_stale_servers(&app_state.config.zed_environment, server_id)
.await
.trace_err();
}
@ -529,7 +522,8 @@ impl Server {
db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
peer: this.peer.clone(),
connection_pool: this.connection_pool.clone(),
live_kit_client: this.app_state.live_kit_client.clone()
live_kit_client: this.app_state.live_kit_client.clone(),
executor: executor.clone(),
};
update_user_contacts(user_id, &session).await?;
@ -586,7 +580,7 @@ impl Server {
drop(foreground_message_handlers);
tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
if let Err(error) = sign_out(session, teardown, executor).await {
if let Err(error) = connection_lost(session, teardown, executor).await {
tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
}
@ -678,15 +672,17 @@ impl<'a> Drop for ConnectionPoolGuard<'a> {
}
fn broadcast<F>(
sender_id: ConnectionId,
sender_id: Option<ConnectionId>,
receiver_ids: impl IntoIterator<Item = ConnectionId>,
mut f: F,
) where
F: FnMut(ConnectionId) -> anyhow::Result<()>,
{
for receiver_id in receiver_ids {
if receiver_id != sender_id {
f(receiver_id).trace_err();
if Some(receiver_id) != sender_id {
if let Err(error) = f(receiver_id) {
tracing::error!("failed to send to {:?} {}", receiver_id, error);
}
}
}
}
@ -787,7 +783,7 @@ pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result
}
#[instrument(err, skip(executor))]
async fn sign_out(
async fn connection_lost(
session: Session,
mut teardown: watch::Receiver<()>,
executor: Executor,
@ -798,17 +794,12 @@ async fn sign_out(
.await
.remove_connection(session.connection_id)?;
if let Some(mut left_projects) = session
session
.db()
.await
.connection_lost(session.connection_id)
.await
.trace_err()
{
for left_project in mem::take(&mut *left_projects) {
project_left(&left_project, &session);
}
}
.trace_err();
futures::select_biased! {
_ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
@ -941,6 +932,164 @@ async fn join_room(
Ok(())
}
async fn rejoin_room(
request: proto::RejoinRoom,
response: Response<proto::RejoinRoom>,
session: Session,
) -> Result<()> {
{
let mut rejoined_room = session
.db()
.await
.rejoin_room(request, session.user_id, session.connection_id)
.await?;
response.send(proto::RejoinRoomResponse {
room: Some(rejoined_room.room.clone()),
reshared_projects: rejoined_room
.reshared_projects
.iter()
.map(|project| proto::ResharedProject {
id: project.id.to_proto(),
collaborators: project
.collaborators
.iter()
.map(|collaborator| collaborator.to_proto())
.collect(),
})
.collect(),
rejoined_projects: rejoined_room
.rejoined_projects
.iter()
.map(|rejoined_project| proto::RejoinedProject {
id: rejoined_project.id.to_proto(),
worktrees: rejoined_project
.worktrees
.iter()
.map(|worktree| proto::WorktreeMetadata {
id: worktree.id,
root_name: worktree.root_name.clone(),
visible: worktree.visible,
abs_path: worktree.abs_path.clone(),
})
.collect(),
collaborators: rejoined_project
.collaborators
.iter()
.map(|collaborator| collaborator.to_proto())
.collect(),
language_servers: rejoined_project.language_servers.clone(),
})
.collect(),
})?;
room_updated(&rejoined_room.room, &session.peer);
for project in &rejoined_room.reshared_projects {
for collaborator in &project.collaborators {
session
.peer
.send(
collaborator.connection_id,
proto::UpdateProjectCollaborator {
project_id: project.id.to_proto(),
old_peer_id: Some(project.old_connection_id.into()),
new_peer_id: Some(session.connection_id.into()),
},
)
.trace_err();
}
broadcast(
Some(session.connection_id),
project
.collaborators
.iter()
.map(|collaborator| collaborator.connection_id),
|connection_id| {
session.peer.forward_send(
session.connection_id,
connection_id,
proto::UpdateProject {
project_id: project.id.to_proto(),
worktrees: project.worktrees.clone(),
},
)
},
);
}
for project in &rejoined_room.rejoined_projects {
for collaborator in &project.collaborators {
session
.peer
.send(
collaborator.connection_id,
proto::UpdateProjectCollaborator {
project_id: project.id.to_proto(),
old_peer_id: Some(project.old_connection_id.into()),
new_peer_id: Some(session.connection_id.into()),
},
)
.trace_err();
}
}
for project in &mut rejoined_room.rejoined_projects {
for worktree in mem::take(&mut project.worktrees) {
#[cfg(any(test, feature = "test-support"))]
const MAX_CHUNK_SIZE: usize = 2;
#[cfg(not(any(test, feature = "test-support")))]
const MAX_CHUNK_SIZE: usize = 256;
// Stream this worktree's entries.
let message = proto::UpdateWorktree {
project_id: project.id.to_proto(),
worktree_id: worktree.id,
abs_path: worktree.abs_path.clone(),
root_name: worktree.root_name,
updated_entries: worktree.updated_entries,
removed_entries: worktree.removed_entries,
scan_id: worktree.scan_id,
is_last_update: worktree.completed_scan_id == worktree.scan_id,
};
for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
session.peer.send(session.connection_id, update.clone())?;
}
// Stream this worktree's diagnostics.
for summary in worktree.diagnostic_summaries {
session.peer.send(
session.connection_id,
proto::UpdateDiagnosticSummary {
project_id: project.id.to_proto(),
worktree_id: worktree.id,
summary: Some(summary),
},
)?;
}
}
for language_server in &project.language_servers {
session.peer.send(
session.connection_id,
proto::UpdateLanguageServer {
project_id: project.id.to_proto(),
language_server_id: language_server.id,
variant: Some(
proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
proto::LspDiskBasedDiagnosticsUpdated {},
),
),
},
)?;
}
}
}
update_user_contacts(session.user_id, &session).await?;
Ok(())
}
async fn leave_room(_message: proto::LeaveRoom, session: Session) -> Result<()> {
leave_room_for_session(&session).await
}
@ -1132,7 +1281,7 @@ async fn unshare_project(message: proto::UnshareProject, session: Session) -> Re
.await?;
broadcast(
session.connection_id,
Some(session.connection_id),
guest_connection_ids.iter().copied(),
|conn_id| session.peer.send(conn_id, message.clone()),
);
@ -1160,18 +1309,8 @@ async fn join_project(
let collaborators = project
.collaborators
.iter()
.map(|collaborator| {
let peer_id = proto::PeerId {
owner_id: collaborator.connection_server_id.0 as u32,
id: collaborator.connection_id as u32,
};
proto::Collaborator {
peer_id: Some(peer_id),
replica_id: collaborator.replica_id.0 as u32,
user_id: collaborator.user_id.to_proto(),
}
})
.filter(|collaborator| collaborator.peer_id != Some(session.connection_id.into()))
.filter(|collaborator| collaborator.connection_id != session.connection_id)
.map(|collaborator| collaborator.to_proto())
.collect::<Vec<_>>();
let worktrees = project
.worktrees
@ -1224,7 +1363,7 @@ async fn join_project(
updated_entries: worktree.entries,
removed_entries: Default::default(),
scan_id: worktree.scan_id,
is_last_update: worktree.is_complete,
is_last_update: worktree.scan_id == worktree.completed_scan_id,
};
for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
session.peer.send(session.connection_id, update.clone())?;
@ -1293,7 +1432,7 @@ async fn update_project(
.update_project(project_id, session.connection_id, &request.worktrees)
.await?;
broadcast(
session.connection_id,
Some(session.connection_id),
guest_connection_ids.iter().copied(),
|connection_id| {
session
@ -1319,7 +1458,7 @@ async fn update_worktree(
.await?;
broadcast(
session.connection_id,
Some(session.connection_id),
guest_connection_ids.iter().copied(),
|connection_id| {
session
@ -1342,7 +1481,7 @@ async fn update_diagnostic_summary(
.await?;
broadcast(
session.connection_id,
Some(session.connection_id),
guest_connection_ids.iter().copied(),
|connection_id| {
session
@ -1365,7 +1504,7 @@ async fn start_language_server(
.await?;
broadcast(
session.connection_id,
Some(session.connection_id),
guest_connection_ids.iter().copied(),
|connection_id| {
session
@ -1380,6 +1519,7 @@ async fn update_language_server(
request: proto::UpdateLanguageServer,
session: Session,
) -> Result<()> {
session.executor.record_backtrace();
let project_id = ProjectId::from_proto(request.project_id);
let project_connection_ids = session
.db()
@ -1387,7 +1527,7 @@ async fn update_language_server(
.project_connection_ids(project_id, session.connection_id)
.await?;
broadcast(
session.connection_id,
Some(session.connection_id),
project_connection_ids.iter().copied(),
|connection_id| {
session
@ -1406,6 +1546,7 @@ async fn forward_project_request<T>(
where
T: EntityMessage + RequestMessage,
{
session.executor.record_backtrace();
let project_id = ProjectId::from_proto(request.remote_entity_id());
let host_connection_id = {
let collaborators = session
@ -1413,14 +1554,11 @@ where
.await
.project_collaborators(project_id, session.connection_id)
.await?;
let host = collaborators
collaborators
.iter()
.find(|collaborator| collaborator.is_host)
.ok_or_else(|| anyhow!("host not found"))?;
ConnectionId {
owner_id: host.connection_server_id.0 as u32,
id: host.connection_id as u32,
}
.ok_or_else(|| anyhow!("host not found"))?
.connection_id
};
let payload = session
@ -1444,14 +1582,11 @@ async fn save_buffer(
.await
.project_collaborators(project_id, session.connection_id)
.await?;
let host = collaborators
collaborators
.iter()
.find(|collaborator| collaborator.is_host)
.ok_or_else(|| anyhow!("host not found"))?;
ConnectionId {
owner_id: host.connection_server_id.0 as u32,
id: host.connection_id as u32,
}
.ok_or_else(|| anyhow!("host not found"))?
.connection_id
};
let response_payload = session
.peer
@ -1463,22 +1598,19 @@ async fn save_buffer(
.await
.project_collaborators(project_id, session.connection_id)
.await?;
collaborators.retain(|collaborator| {
let collaborator_connection = ConnectionId {
owner_id: collaborator.connection_server_id.0 as u32,
id: collaborator.connection_id as u32,
};
collaborator_connection != session.connection_id
});
let project_connection_ids = collaborators.iter().map(|collaborator| ConnectionId {
owner_id: collaborator.connection_server_id.0 as u32,
id: collaborator.connection_id as u32,
});
broadcast(host_connection_id, project_connection_ids, |conn_id| {
collaborators.retain(|collaborator| collaborator.connection_id != session.connection_id);
let project_connection_ids = collaborators
.iter()
.map(|collaborator| collaborator.connection_id);
broadcast(
Some(host_connection_id),
project_connection_ids,
|conn_id| {
session
.peer
.forward_send(host_connection_id, conn_id, response_payload.clone())
});
},
);
response.send(response_payload)?;
Ok(())
}
@ -1487,6 +1619,7 @@ async fn create_buffer_for_peer(
request: proto::CreateBufferForPeer,
session: Session,
) -> Result<()> {
session.executor.record_backtrace();
let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
session
.peer
@ -1499,6 +1632,7 @@ async fn update_buffer(
response: Response<proto::UpdateBuffer>,
session: Session,
) -> Result<()> {
session.executor.record_backtrace();
let project_id = ProjectId::from_proto(request.project_id);
let project_connection_ids = session
.db()
@ -1506,8 +1640,10 @@ async fn update_buffer(
.project_connection_ids(project_id, session.connection_id)
.await?;
session.executor.record_backtrace();
broadcast(
session.connection_id,
Some(session.connection_id),
project_connection_ids.iter().copied(),
|connection_id| {
session
@ -1528,7 +1664,7 @@ async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session)
.await?;
broadcast(
session.connection_id,
Some(session.connection_id),
project_connection_ids.iter().copied(),
|connection_id| {
session
@ -1547,7 +1683,7 @@ async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Re
.project_connection_ids(project_id, session.connection_id)
.await?;
broadcast(
session.connection_id,
Some(session.connection_id),
project_connection_ids.iter().copied(),
|connection_id| {
session
@ -1566,7 +1702,7 @@ async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<(
.project_connection_ids(project_id, session.connection_id)
.await?;
broadcast(
session.connection_id,
Some(session.connection_id),
project_connection_ids.iter().copied(),
|connection_id| {
session
@ -1858,7 +1994,7 @@ async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> R
.project_connection_ids(project_id, session.connection_id)
.await?;
broadcast(
session.connection_id,
Some(session.connection_id),
project_connection_ids.iter().copied(),
|connection_id| {
session
@ -1968,21 +2104,20 @@ fn contact_for_user(
}
fn room_updated(room: &proto::Room, peer: &Peer) {
for participant in &room.participants {
if let Some(peer_id) = participant
.peer_id
.ok_or_else(|| anyhow!("invalid participant peer id"))
.trace_err()
{
broadcast(
None,
room.participants
.iter()
.filter_map(|participant| Some(participant.peer_id?.into())),
|peer_id| {
peer.send(
peer_id.into(),
proto::RoomUpdated {
room: Some(room.clone()),
},
)
.trace_err();
}
}
},
);
}
async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> {
@ -2103,16 +2238,6 @@ fn project_left(project: &db::LeftProject, session: &Session) {
.trace_err();
}
}
session
.peer
.send(
session.connection_id,
proto::UnshareProject {
project_id: project.id.to_proto(),
},
)
.trace_err();
}
pub trait ResultExt {

466
crates/collab/src/tests.rs Normal file
View File

@ -0,0 +1,466 @@
use crate::{
db::{NewUserParams, TestDb, UserId},
executor::Executor,
rpc::{Server, CLEANUP_TIMEOUT},
AppState,
};
use anyhow::anyhow;
use call::ActiveCall;
use client::{
self, proto::PeerId, test::FakeHttpClient, Client, Connection, Credentials,
EstablishConnectionError, UserStore,
};
use collections::{HashMap, HashSet};
use fs::{FakeFs, HomeDir};
use futures::{channel::oneshot, StreamExt as _};
use gpui::{
executor::Deterministic, test::EmptyView, ModelHandle, Task, TestAppContext, ViewHandle,
};
use language::LanguageRegistry;
use parking_lot::Mutex;
use project::{Project, WorktreeId};
use settings::Settings;
use std::{
env,
ops::Deref,
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
Arc,
},
};
use theme::ThemeRegistry;
use workspace::Workspace;
mod integration_tests;
mod randomized_integration_tests;
struct TestServer {
app_state: Arc<AppState>,
server: Arc<Server>,
connection_killers: Arc<Mutex<HashMap<PeerId, Arc<AtomicBool>>>>,
forbid_connections: Arc<AtomicBool>,
_test_db: TestDb,
test_live_kit_server: Arc<live_kit_client::TestServer>,
}
impl TestServer {
async fn start(deterministic: &Arc<Deterministic>) -> Self {
static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0);
let use_postgres = env::var("USE_POSTGRES").ok();
let use_postgres = use_postgres.as_deref();
let test_db = if use_postgres == Some("true") || use_postgres == Some("1") {
TestDb::postgres(deterministic.build_background())
} else {
TestDb::sqlite(deterministic.build_background())
};
let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst);
let live_kit_server = live_kit_client::TestServer::create(
format!("http://livekit.{}.test", live_kit_server_id),
format!("devkey-{}", live_kit_server_id),
format!("secret-{}", live_kit_server_id),
deterministic.build_background(),
)
.unwrap();
let app_state = Self::build_app_state(&test_db, &live_kit_server).await;
let epoch = app_state
.db
.create_server(&app_state.config.zed_environment)
.await
.unwrap();
let server = Server::new(
epoch,
app_state.clone(),
Executor::Deterministic(deterministic.build_background()),
);
server.start().await.unwrap();
// Advance clock to ensure the server's cleanup task is finished.
deterministic.advance_clock(CLEANUP_TIMEOUT);
Self {
app_state,
server,
connection_killers: Default::default(),
forbid_connections: Default::default(),
_test_db: test_db,
test_live_kit_server: live_kit_server,
}
}
async fn reset(&self) {
self.app_state.db.reset();
let epoch = self
.app_state
.db
.create_server(&self.app_state.config.zed_environment)
.await
.unwrap();
self.server.reset(epoch);
}
async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
cx.update(|cx| {
cx.set_global(HomeDir(Path::new("/tmp/").to_path_buf()));
let mut settings = Settings::test(cx);
settings.projects_online_by_default = false;
cx.set_global(settings);
});
let http = FakeHttpClient::with_404_response();
let user_id = if let Ok(Some(user)) = self
.app_state
.db
.get_user_by_github_account(name, None)
.await
{
user.id
} else {
self.app_state
.db
.create_user(
&format!("{name}@example.com"),
false,
NewUserParams {
github_login: name.into(),
github_user_id: 0,
invite_count: 0,
},
)
.await
.expect("creating user failed")
.user_id
};
let client_name = name.to_string();
let mut client = cx.read(|cx| Client::new(http.clone(), cx));
let server = self.server.clone();
let db = self.app_state.db.clone();
let connection_killers = self.connection_killers.clone();
let forbid_connections = self.forbid_connections.clone();
Arc::get_mut(&mut client)
.unwrap()
.set_id(user_id.0 as usize)
.override_authenticate(move |cx| {
cx.spawn(|_| async move {
let access_token = "the-token".to_string();
Ok(Credentials {
user_id: user_id.0 as u64,
access_token,
})
})
})
.override_establish_connection(move |credentials, cx| {
assert_eq!(credentials.user_id, user_id.0 as u64);
assert_eq!(credentials.access_token, "the-token");
let server = server.clone();
let db = db.clone();
let connection_killers = connection_killers.clone();
let forbid_connections = forbid_connections.clone();
let client_name = client_name.clone();
cx.spawn(move |cx| async move {
if forbid_connections.load(SeqCst) {
Err(EstablishConnectionError::other(anyhow!(
"server is forbidding connections"
)))
} else {
let (client_conn, server_conn, killed) =
Connection::in_memory(cx.background());
let (connection_id_tx, connection_id_rx) = oneshot::channel();
let user = db
.get_user_by_id(user_id)
.await
.expect("retrieving user failed")
.unwrap();
cx.background()
.spawn(server.handle_connection(
server_conn,
client_name,
user,
Some(connection_id_tx),
Executor::Deterministic(cx.background()),
))
.detach();
let connection_id = connection_id_rx.await.unwrap();
connection_killers
.lock()
.insert(connection_id.into(), killed);
Ok(client_conn)
}
})
});
let fs = FakeFs::new(cx.background());
let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
let app_state = Arc::new(workspace::AppState {
client: client.clone(),
user_store: user_store.clone(),
languages: Arc::new(LanguageRegistry::new(Task::ready(()))),
themes: ThemeRegistry::new((), cx.font_cache()),
fs: fs.clone(),
build_window_options: Default::default,
initialize_workspace: |_, _, _| unimplemented!(),
dock_default_item_factory: |_, _| unimplemented!(),
});
Project::init(&client);
cx.update(|cx| {
workspace::init(app_state.clone(), cx);
call::init(client.clone(), user_store.clone(), cx);
});
client
.authenticate_and_connect(false, &cx.to_async())
.await
.unwrap();
let client = TestClient {
client,
username: name.to_string(),
local_projects: Default::default(),
remote_projects: Default::default(),
next_root_dir_id: 0,
user_store,
fs,
language_registry: Arc::new(LanguageRegistry::test()),
buffers: Default::default(),
};
client.wait_for_current_user(cx).await;
client
}
fn disconnect_client(&self, peer_id: PeerId) {
self.connection_killers
.lock()
.remove(&peer_id)
.unwrap()
.store(true, SeqCst);
}
fn forbid_connections(&self) {
self.forbid_connections.store(true, SeqCst);
}
fn allow_connections(&self) {
self.forbid_connections.store(false, SeqCst);
}
async fn make_contacts(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) {
for ix in 1..clients.len() {
let (left, right) = clients.split_at_mut(ix);
let (client_a, cx_a) = left.last_mut().unwrap();
for (client_b, cx_b) in right {
client_a
.user_store
.update(*cx_a, |store, cx| {
store.request_contact(client_b.user_id().unwrap(), cx)
})
.await
.unwrap();
cx_a.foreground().run_until_parked();
client_b
.user_store
.update(*cx_b, |store, cx| {
store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
})
.await
.unwrap();
}
}
}
async fn create_room(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) {
self.make_contacts(clients).await;
let (left, right) = clients.split_at_mut(1);
let (_client_a, cx_a) = &mut left[0];
let active_call_a = cx_a.read(ActiveCall::global);
for (client_b, cx_b) in right {
let user_id_b = client_b.current_user_id(*cx_b).to_proto();
active_call_a
.update(*cx_a, |call, cx| call.invite(user_id_b, None, cx))
.await
.unwrap();
cx_b.foreground().run_until_parked();
let active_call_b = cx_b.read(ActiveCall::global);
active_call_b
.update(*cx_b, |call, cx| call.accept_incoming(cx))
.await
.unwrap();
}
}
async fn build_app_state(
test_db: &TestDb,
fake_server: &live_kit_client::TestServer,
) -> Arc<AppState> {
Arc::new(AppState {
db: test_db.db().clone(),
live_kit_client: Some(Arc::new(fake_server.create_api_client())),
config: Default::default(),
})
}
}
impl Deref for TestServer {
type Target = Server;
fn deref(&self) -> &Self::Target {
&self.server
}
}
impl Drop for TestServer {
fn drop(&mut self) {
self.server.teardown();
self.test_live_kit_server.teardown().unwrap();
}
}
struct TestClient {
client: Arc<Client>,
username: String,
local_projects: Vec<ModelHandle<Project>>,
remote_projects: Vec<ModelHandle<Project>>,
next_root_dir_id: usize,
pub user_store: ModelHandle<UserStore>,
language_registry: Arc<LanguageRegistry>,
fs: Arc<FakeFs>,
buffers: HashMap<ModelHandle<Project>, HashSet<ModelHandle<language::Buffer>>>,
}
impl Deref for TestClient {
type Target = Arc<Client>;
fn deref(&self) -> &Self::Target {
&self.client
}
}
struct ContactsSummary {
pub current: Vec<String>,
pub outgoing_requests: Vec<String>,
pub incoming_requests: Vec<String>,
}
impl TestClient {
pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
UserId::from_proto(
self.user_store
.read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
)
}
async fn wait_for_current_user(&self, cx: &TestAppContext) {
let mut authed_user = self
.user_store
.read_with(cx, |user_store, _| user_store.watch_current_user());
while authed_user.next().await.unwrap().is_none() {}
}
async fn clear_contacts(&self, cx: &mut TestAppContext) {
self.user_store
.update(cx, |store, _| store.clear_contacts())
.await;
}
fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
self.user_store.read_with(cx, |store, _| ContactsSummary {
current: store
.contacts()
.iter()
.map(|contact| contact.user.github_login.clone())
.collect(),
outgoing_requests: store
.outgoing_contact_requests()
.iter()
.map(|user| user.github_login.clone())
.collect(),
incoming_requests: store
.incoming_contact_requests()
.iter()
.map(|user| user.github_login.clone())
.collect(),
})
}
async fn build_local_project(
&self,
root_path: impl AsRef<Path>,
cx: &mut TestAppContext,
) -> (ModelHandle<Project>, WorktreeId) {
let project = cx.update(|cx| {
Project::local(
self.client.clone(),
self.user_store.clone(),
self.language_registry.clone(),
self.fs.clone(),
cx,
)
});
let (worktree, _) = project
.update(cx, |p, cx| {
p.find_or_create_local_worktree(root_path, true, cx)
})
.await
.unwrap();
worktree
.read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
.await;
(project, worktree.read_with(cx, |tree, _| tree.id()))
}
async fn build_remote_project(
&self,
host_project_id: u64,
guest_cx: &mut TestAppContext,
) -> ModelHandle<Project> {
let active_call = guest_cx.read(ActiveCall::global);
let room = active_call.read_with(guest_cx, |call, _| call.room().unwrap().clone());
room.update(guest_cx, |room, cx| {
room.join_project(
host_project_id,
self.language_registry.clone(),
self.fs.clone(),
cx,
)
})
.await
.unwrap()
}
fn build_workspace(
&self,
project: &ModelHandle<Project>,
cx: &mut TestAppContext,
) -> ViewHandle<Workspace> {
let (_, root_view) = cx.add_window(|_| EmptyView);
cx.add_view(&root_view, |cx| {
Workspace::new(
Default::default(),
0,
project.clone(),
|_, _| unimplemented!(),
cx,
)
})
}
fn create_new_root_dir(&mut self) -> PathBuf {
format!(
"/{}-root-{}",
self.username,
util::post_inc(&mut self.next_root_dir_id)
)
.into()
}
}
impl Drop for TestClient {
fn drop(&mut self) {
self.client.teardown();
}
}

File diff suppressed because it is too large Load Diff

View File

@ -7,10 +7,10 @@ mod incoming_call_notification;
mod notifications;
mod project_shared_notification;
use anyhow::anyhow;
use call::ActiveCall;
pub use collab_titlebar_item::{CollabTitlebarItem, ToggleCollaborationMenu};
use gpui::MutableAppContext;
use project::Project;
use std::sync::Arc;
use workspace::{AppState, JoinProject, ToggleFollow, Workspace};
@ -39,14 +39,19 @@ pub fn init(app_state: Arc<AppState>, cx: &mut MutableAppContext) {
let workspace = if let Some(existing_workspace) = existing_workspace {
existing_workspace
} else {
let project = Project::remote(
let active_call = cx.read(ActiveCall::global);
let room = active_call
.read_with(&cx, |call, _| call.room().cloned())
.ok_or_else(|| anyhow!("not in a call"))?;
let project = room
.update(&mut cx, |room, cx| {
room.join_project(
project_id,
app_state.client.clone(),
app_state.user_store.clone(),
app_state.languages.clone(),
app_state.fs.clone(),
cx.clone(),
cx,
)
})
.await?;
let (_, workspace) = cx.add_window((app_state.build_window_options)(), |cx| {

View File

@ -327,8 +327,6 @@ mod tests {
.path();
corrupted_backup_dir.push(DB_FILE_NAME);
dbg!(&corrupted_backup_dir);
let backup = Connection::open_file(&corrupted_backup_dir.to_string_lossy());
assert!(backup.select_row::<usize>("SELECT * FROM test").unwrap()()
.unwrap()

View File

@ -3612,7 +3612,9 @@ impl Editor {
}
pub fn undo(&mut self, _: &Undo, cx: &mut ViewContext<Self>) {
dbg!("undo");
if let Some(tx_id) = self.buffer.update(cx, |buffer, cx| buffer.undo(cx)) {
dbg!(tx_id);
if let Some((selections, _)) = self.selection_history.transaction(tx_id).cloned() {
self.change_selections(None, cx, |s| {
s.select_anchors(selections.to_vec());

View File

@ -29,7 +29,11 @@ use workspace::{
#[gpui::test]
fn test_edit_events(cx: &mut MutableAppContext) {
cx.set_global(Settings::test(cx));
let buffer = cx.add_model(|cx| language::Buffer::new(0, "123456", cx));
let buffer = cx.add_model(|cx| {
let mut buffer = language::Buffer::new(0, "123456", cx);
buffer.set_group_interval(Duration::from_secs(1));
buffer
});
let events = Rc::new(RefCell::new(Vec::new()));
let (_, editor1) = cx.add_window(Default::default(), {
@ -3502,6 +3506,8 @@ async fn test_surround_with_pair(cx: &mut gpui::TestAppContext) {
]
);
view.undo(&Undo, cx);
view.undo(&Undo, cx);
view.undo(&Undo, cx);
assert_eq!(
view.text(cx),

View File

@ -1114,7 +1114,7 @@ fn path_for_buffer<'a>(
cx: &'a AppContext,
) -> Option<Cow<'a, Path>> {
let file = buffer.read(cx).as_singleton()?.read(cx).file()?;
path_for_file(file, height, include_filename, cx)
path_for_file(file.as_ref(), height, include_filename, cx)
}
fn path_for_file<'a>(

View File

@ -1311,7 +1311,7 @@ impl MultiBuffer {
.and_then(|(buffer, offset)| buffer.read(cx).language_at(offset))
}
pub fn files<'a>(&'a self, cx: &'a AppContext) -> SmallVec<[&'a dyn File; 2]> {
pub fn files<'a>(&'a self, cx: &'a AppContext) -> SmallVec<[&'a Arc<dyn File>; 2]> {
let buffers = self.buffers.borrow();
buffers
.values()
@ -3651,7 +3651,7 @@ mod tests {
let state = host_buffer.read(cx).to_proto();
let ops = cx
.background()
.block(host_buffer.read(cx).serialize_ops(cx));
.block(host_buffer.read(cx).serialize_ops(None, cx));
let mut buffer = Buffer::from_proto(1, state, None).unwrap();
buffer
.apply_ops(

View File

@ -389,6 +389,7 @@ pub struct FakeFs {
struct FakeFsState {
root: Arc<Mutex<FakeFsEntry>>,
next_inode: u64,
next_mtime: SystemTime,
event_txs: Vec<smol::channel::Sender<Vec<fsevent::Event>>>,
}
@ -517,10 +518,11 @@ impl FakeFs {
state: Mutex::new(FakeFsState {
root: Arc::new(Mutex::new(FakeFsEntry::Dir {
inode: 0,
mtime: SystemTime::now(),
mtime: SystemTime::UNIX_EPOCH,
entries: Default::default(),
git_repo_state: None,
})),
next_mtime: SystemTime::UNIX_EPOCH,
next_inode: 1,
event_txs: Default::default(),
}),
@ -531,10 +533,12 @@ impl FakeFs {
let mut state = self.state.lock().await;
let path = path.as_ref();
let inode = state.next_inode;
let mtime = state.next_mtime;
state.next_inode += 1;
state.next_mtime += Duration::from_nanos(1);
let file = Arc::new(Mutex::new(FakeFsEntry::File {
inode,
mtime: SystemTime::now(),
mtime,
content,
}));
state
@ -631,6 +635,21 @@ impl FakeFs {
}
}
pub async fn paths(&self) -> Vec<PathBuf> {
let mut result = Vec::new();
let mut queue = collections::VecDeque::new();
queue.push_back((PathBuf::from("/"), self.state.lock().await.root.clone()));
while let Some((path, entry)) = queue.pop_front() {
if let FakeFsEntry::Dir { entries, .. } = &*entry.lock().await {
for (name, entry) in entries {
queue.push_back((path.join(name), entry.clone()));
}
}
result.push(path);
}
result
}
pub async fn directories(&self) -> Vec<PathBuf> {
let mut result = Vec::new();
let mut queue = collections::VecDeque::new();
@ -726,6 +745,8 @@ impl Fs for FakeFs {
}
let inode = state.next_inode;
let mtime = state.next_mtime;
state.next_mtime += Duration::from_nanos(1);
state.next_inode += 1;
state
.write_path(&cur_path, |entry| {
@ -733,7 +754,7 @@ impl Fs for FakeFs {
created_dirs.push(cur_path.clone());
Arc::new(Mutex::new(FakeFsEntry::Dir {
inode,
mtime: SystemTime::now(),
mtime,
entries: Default::default(),
git_repo_state: None,
}))
@ -751,10 +772,12 @@ impl Fs for FakeFs {
self.simulate_random_delay().await;
let mut state = self.state.lock().await;
let inode = state.next_inode;
let mtime = state.next_mtime;
state.next_mtime += Duration::from_nanos(1);
state.next_inode += 1;
let file = Arc::new(Mutex::new(FakeFsEntry::File {
inode,
mtime: SystemTime::now(),
mtime,
content: String::new(),
}));
state
@ -816,6 +839,9 @@ impl Fs for FakeFs {
let source = normalize_path(source);
let target = normalize_path(target);
let mut state = self.state.lock().await;
let mtime = state.next_mtime;
let inode = util::post_inc(&mut state.next_inode);
state.next_mtime += Duration::from_nanos(1);
let source_entry = state.read_path(&source).await?;
let content = source_entry.lock().await.file_content(&source)?.clone();
let entry = state
@ -831,8 +857,8 @@ impl Fs for FakeFs {
}
btree_map::Entry::Vacant(e) => Ok(Some(
e.insert(Arc::new(Mutex::new(FakeFsEntry::File {
inode: 0,
mtime: SystemTime::now(),
inode,
mtime,
content: String::new(),
})))
.clone(),

View File

@ -74,11 +74,18 @@ struct DeterministicState {
pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
waiting_backtrace: Option<backtrace::Backtrace>,
next_runnable_id: usize,
poll_history: Vec<usize>,
poll_history: Vec<ExecutorEvent>,
previous_poll_history: Option<Vec<ExecutorEvent>>,
enable_runnable_backtraces: bool,
runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum ExecutorEvent {
PollRunnable { id: usize },
EnqueuRunnable { id: usize },
}
#[cfg(any(test, feature = "test-support"))]
struct ForegroundRunnable {
id: usize,
@ -130,6 +137,7 @@ impl Deterministic {
waiting_backtrace: None,
next_runnable_id: 0,
poll_history: Default::default(),
previous_poll_history: Default::default(),
enable_runnable_backtraces: false,
runnable_backtraces: Default::default(),
})),
@ -137,10 +145,14 @@ impl Deterministic {
})
}
pub fn runnable_history(&self) -> Vec<usize> {
pub fn execution_history(&self) -> Vec<ExecutorEvent> {
self.state.lock().poll_history.clone()
}
pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
self.state.lock().previous_poll_history = history;
}
pub fn enable_runnable_backtrace(&self) {
self.state.lock().enable_runnable_backtraces = true;
}
@ -185,6 +197,7 @@ impl Deterministic {
let unparker = self.parker.lock().unparker();
let (runnable, task) = async_task::spawn_local(future, move |runnable| {
let mut state = state.lock();
state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
state
.scheduled_from_foreground
.entry(cx_id)
@ -212,6 +225,9 @@ impl Deterministic {
let unparker = self.parker.lock().unparker();
let (runnable, task) = async_task::spawn(future, move |runnable| {
let mut state = state.lock();
state
.poll_history
.push(ExecutorEvent::EnqueuRunnable { id });
state
.scheduled_from_background
.push(BackgroundRunnable { id, runnable });
@ -314,7 +330,9 @@ impl Deterministic {
let background_len = state.scheduled_from_background.len();
let ix = state.rng.gen_range(0..background_len);
let background_runnable = state.scheduled_from_background.remove(ix);
state.poll_history.push(background_runnable.id);
state.push_to_history(ExecutorEvent::PollRunnable {
id: background_runnable.id,
});
drop(state);
background_runnable.runnable.run();
} else if !state.scheduled_from_foreground.is_empty() {
@ -332,7 +350,9 @@ impl Deterministic {
if scheduled_from_cx.is_empty() {
state.scheduled_from_foreground.remove(&cx_id_to_run);
}
state.poll_history.push(foreground_runnable.id);
state.push_to_history(ExecutorEvent::PollRunnable {
id: foreground_runnable.id,
});
drop(state);
@ -366,7 +386,9 @@ impl Deterministic {
let ix = state.rng.gen_range(0..=runnable_count);
if ix < state.scheduled_from_background.len() {
let background_runnable = state.scheduled_from_background.remove(ix);
state.poll_history.push(background_runnable.id);
state.push_to_history(ExecutorEvent::PollRunnable {
id: background_runnable.id,
});
drop(state);
background_runnable.runnable.run();
} else {
@ -465,6 +487,25 @@ impl Deterministic {
}
}
}
pub fn record_backtrace(&self) {
let mut state = self.state.lock();
if state.enable_runnable_backtraces {
let current_id = state
.poll_history
.iter()
.rev()
.find_map(|event| match event {
ExecutorEvent::PollRunnable { id } => Some(*id),
_ => None,
});
if let Some(id) = current_id {
state
.runnable_backtraces
.insert(id, backtrace::Backtrace::new_unresolved());
}
}
}
}
impl Drop for Timer {
@ -506,6 +547,40 @@ impl Future for Timer {
#[cfg(any(test, feature = "test-support"))]
impl DeterministicState {
fn push_to_history(&mut self, event: ExecutorEvent) {
use std::fmt::Write as _;
self.poll_history.push(event);
if let Some(prev_history) = &self.previous_poll_history {
let ix = self.poll_history.len() - 1;
let prev_event = prev_history[ix];
if event != prev_event {
let mut message = String::new();
writeln!(
&mut message,
"current runnable backtrace:\n{:?}",
self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
trace.resolve();
util::CwdBacktrace(trace)
})
)
.unwrap();
writeln!(
&mut message,
"previous runnable backtrace:\n{:?}",
self.runnable_backtraces
.get_mut(&prev_event.id())
.map(|trace| {
trace.resolve();
util::CwdBacktrace(trace)
})
)
.unwrap();
panic!("detected non-determinism after {ix}. {message}");
}
}
}
fn will_park(&mut self) {
if self.forbid_parking {
let mut backtrace_message = String::new();
@ -526,6 +601,16 @@ impl DeterministicState {
}
}
#[cfg(any(test, feature = "test-support"))]
impl ExecutorEvent {
pub fn id(&self) -> usize {
match self {
ExecutorEvent::PollRunnable { id } => *id,
ExecutorEvent::EnqueuRunnable { id } => *id,
}
}
}
impl Foreground {
pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
if dispatcher.is_main_thread() {
@ -755,6 +840,16 @@ impl Background {
}
}
}
#[cfg(any(test, feature = "test-support"))]
pub fn record_backtrace(&self) {
match self {
Self::Deterministic { executor, .. } => executor.record_backtrace(),
_ => {
panic!("this method can only be called on a deterministic executor")
}
}
}
}
impl Default for Background {

View File

@ -1,7 +1,10 @@
use crate::{
elements::Empty, executor, platform, util::CwdBacktrace, Element, ElementBox, Entity,
FontCache, Handle, LeakDetector, MutableAppContext, Platform, RenderContext, Subscription,
TestAppContext, View,
elements::Empty,
executor::{self, ExecutorEvent},
platform,
util::CwdBacktrace,
Element, ElementBox, Entity, FontCache, Handle, LeakDetector, MutableAppContext, Platform,
RenderContext, Subscription, TestAppContext, View,
};
use futures::StreamExt;
use parking_lot::Mutex;
@ -62,7 +65,7 @@ pub fn run_test(
let platform = Arc::new(platform::test::platform());
let font_system = platform.fonts();
let font_cache = Arc::new(FontCache::new(font_system));
let mut prev_runnable_history: Option<Vec<usize>> = None;
let mut prev_runnable_history: Option<Vec<ExecutorEvent>> = None;
for _ in 0..num_iterations {
let seed = atomic_seed.load(SeqCst);
@ -73,6 +76,7 @@ pub fn run_test(
let deterministic = executor::Deterministic::new(seed);
if detect_nondeterminism {
deterministic.set_previous_execution_history(prev_runnable_history.clone());
deterministic.enable_runnable_backtrace();
}
@ -98,7 +102,7 @@ pub fn run_test(
leak_detector.lock().detect();
if detect_nondeterminism {
let curr_runnable_history = deterministic.runnable_history();
let curr_runnable_history = deterministic.execution_history();
if let Some(prev_runnable_history) = prev_runnable_history {
let mut prev_entries = prev_runnable_history.iter().fuse();
let mut curr_entries = curr_runnable_history.iter().fuse();
@ -138,7 +142,7 @@ pub fn run_test(
let last_common_backtrace = common_history_prefix
.last()
.map(|runnable_id| deterministic.runnable_backtrace(*runnable_id));
.map(|event| deterministic.runnable_backtrace(event.id()));
writeln!(
&mut error,

View File

@ -398,7 +398,11 @@ impl Buffer {
}
}
pub fn serialize_ops(&self, cx: &AppContext) -> Task<Vec<proto::Operation>> {
pub fn serialize_ops(
&self,
since: Option<clock::Global>,
cx: &AppContext,
) -> Task<Vec<proto::Operation>> {
let mut operations = Vec::new();
operations.extend(self.deferred_ops.iter().map(proto::serialize_operation));
operations.extend(self.remote_selections.iter().map(|(_, set)| {
@ -422,9 +426,11 @@ impl Buffer {
let text_operations = self.text.operations().clone();
cx.background().spawn(async move {
let since = since.unwrap_or_default();
operations.extend(
text_operations
.iter()
.filter(|(_, op)| !since.observed(op.local_timestamp()))
.map(|(_, op)| proto::serialize_operation(&Operation::Buffer(op.clone()))),
);
operations.sort_unstable_by_key(proto::lamport_timestamp_for_operation);
@ -508,8 +514,8 @@ impl Buffer {
self.text.snapshot()
}
pub fn file(&self) -> Option<&dyn File> {
self.file.as_deref()
pub fn file(&self) -> Option<&Arc<dyn File>> {
self.file.as_ref()
}
pub fn save(
@ -2367,8 +2373,8 @@ impl BufferSnapshot {
self.selections_update_count
}
pub fn file(&self) -> Option<&dyn File> {
self.file.as_deref()
pub fn file(&self) -> Option<&Arc<dyn File>> {
self.file.as_ref()
}
pub fn resolve_file_path(&self, cx: &AppContext, include_root: bool) -> Option<PathBuf> {

View File

@ -289,6 +289,9 @@ async fn test_reparse(cx: &mut gpui::TestAppContext) {
);
buffer.update(cx, |buf, cx| {
buf.undo(cx);
buf.undo(cx);
buf.undo(cx);
buf.undo(cx);
assert_eq!(buf.text(), "fn a() {}");
assert!(buf.is_parsing());
@ -304,6 +307,9 @@ async fn test_reparse(cx: &mut gpui::TestAppContext) {
);
buffer.update(cx, |buf, cx| {
buf.redo(cx);
buf.redo(cx);
buf.redo(cx);
buf.redo(cx);
assert_eq!(buf.text(), "fn a(b: C) { d.e::<G>(f); }");
assert!(buf.is_parsing());
@ -1022,8 +1028,11 @@ fn test_autoindent_block_mode(cx: &mut MutableAppContext) {
.unindent()
);
// Grouping is disabled in tests, so we need 2 undos
buffer.undo(cx); // Undo the auto-indent
buffer.undo(cx); // Undo the original edit
// Insert the block at a deeper indent level. The entire block is outdented.
buffer.undo(cx);
buffer.edit([(Point::new(2, 0)..Point::new(2, 0), " ")], None, cx);
buffer.edit(
[(Point::new(2, 8)..Point::new(2, 8), inserted_text)],
@ -1275,7 +1284,9 @@ fn test_serialization(cx: &mut gpui::MutableAppContext) {
assert_eq!(buffer1.read(cx).text(), "abcDF");
let state = buffer1.read(cx).to_proto();
let ops = cx.background().block(buffer1.read(cx).serialize_ops(cx));
let ops = cx
.background()
.block(buffer1.read(cx).serialize_ops(None, cx));
let buffer2 = cx.add_model(|cx| {
let mut buffer = Buffer::from_proto(1, state, None).unwrap();
buffer
@ -1316,7 +1327,7 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
let state = base_buffer.read(cx).to_proto();
let ops = cx
.background()
.block(base_buffer.read(cx).serialize_ops(cx));
.block(base_buffer.read(cx).serialize_ops(None, cx));
let mut buffer = Buffer::from_proto(i as ReplicaId, state, None).unwrap();
buffer
.apply_ops(
@ -1413,7 +1424,9 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
}
50..=59 if replica_ids.len() < max_peers => {
let old_buffer_state = buffer.read(cx).to_proto();
let old_buffer_ops = cx.background().block(buffer.read(cx).serialize_ops(cx));
let old_buffer_ops = cx
.background()
.block(buffer.read(cx).serialize_ops(None, cx));
let new_replica_id = (0..=replica_ids.len() as ReplicaId)
.filter(|replica_id| *replica_id != buffer.read(cx).replica_id())
.choose(&mut rng)

View File

@ -40,7 +40,7 @@ pub struct LanguageServer {
name: String,
capabilities: ServerCapabilities,
notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
response_handlers: Arc<Mutex<Option<HashMap<usize, ResponseHandler>>>>,
executor: Arc<executor::Background>,
#[allow(clippy::type_complexity)]
io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
@ -170,12 +170,18 @@ impl LanguageServer {
let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
let notification_handlers =
Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default()));
let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::default()));
let response_handlers =
Arc::new(Mutex::new(Some(HashMap::<_, ResponseHandler>::default())));
let input_task = cx.spawn(|cx| {
let notification_handlers = notification_handlers.clone();
let response_handlers = response_handlers.clone();
async move {
let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone());
let _clear_response_handlers = util::defer({
let response_handlers = response_handlers.clone();
move || {
response_handlers.lock().take();
}
});
let mut buffer = Vec::new();
loop {
buffer.clear();
@ -200,7 +206,11 @@ impl LanguageServer {
} else if let Ok(AnyResponse { id, error, result }) =
serde_json::from_slice(&buffer)
{
if let Some(handler) = response_handlers.lock().remove(&id) {
if let Some(handler) = response_handlers
.lock()
.as_mut()
.and_then(|handlers| handlers.remove(&id))
{
if let Some(error) = error {
handler(Err(error));
} else if let Some(result) = result {
@ -226,7 +236,12 @@ impl LanguageServer {
let output_task = cx.background().spawn({
let response_handlers = response_handlers.clone();
async move {
let _clear_response_handlers = ClearResponseHandlers(response_handlers);
let _clear_response_handlers = util::defer({
let response_handlers = response_handlers.clone();
move || {
response_handlers.lock().take();
}
});
let mut content_len_buffer = Vec::new();
while let Ok(message) = outbound_rx.recv().await {
log::trace!("outgoing message:{}", String::from_utf8_lossy(&message));
@ -366,7 +381,7 @@ impl LanguageServer {
async move {
log::debug!("language server shutdown started");
shutdown_request.await?;
response_handlers.lock().clear();
response_handlers.lock().take();
exit?;
output_done.recv().await;
log::debug!("language server shutdown finished");
@ -521,7 +536,7 @@ impl LanguageServer {
fn request_internal<T: request::Request>(
next_id: &AtomicUsize,
response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
response_handlers: &Mutex<Option<HashMap<usize, ResponseHandler>>>,
outbound_tx: &channel::Sender<Vec<u8>>,
params: T::Params,
) -> impl 'static + Future<Output = Result<T::Result>>
@ -537,25 +552,31 @@ impl LanguageServer {
})
.unwrap();
let send = outbound_tx
.try_send(message)
.context("failed to write to language server's stdin");
let (tx, rx) = oneshot::channel();
response_handlers.lock().insert(
let handle_response = response_handlers
.lock()
.as_mut()
.ok_or_else(|| anyhow!("server shut down"))
.map(|handlers| {
handlers.insert(
id,
Box::new(move |result| {
let response = match result {
Ok(response) => {
serde_json::from_str(response).context("failed to deserialize response")
}
Ok(response) => serde_json::from_str(response)
.context("failed to deserialize response"),
Err(error) => Err(anyhow!("{}", error.message)),
};
let _ = tx.send(response);
}),
);
});
let send = outbound_tx
.try_send(message)
.context("failed to write to language server's stdin");
async move {
handle_response?;
send?;
rx.await?
}
@ -762,14 +783,6 @@ impl FakeLanguageServer {
}
}
struct ClearResponseHandlers(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
impl Drop for ClearResponseHandlers {
fn drop(&mut self) {
self.0.lock().clear();
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -37,6 +37,7 @@ util = { path = "../util" }
aho-corasick = "0.7"
anyhow = "1.0.57"
async-trait = "0.1"
backtrace = "0.3"
futures = "0.3"
ignore = "0.4"
lazy_static = "1.4.0"

View File

@ -524,7 +524,7 @@ async fn location_links_from_proto(
Some(origin) => {
let buffer = project
.update(&mut cx, |this, cx| {
this.wait_for_buffer(origin.buffer_id, cx)
this.wait_for_remote_buffer(origin.buffer_id, cx)
})
.await?;
let start = origin
@ -549,7 +549,7 @@ async fn location_links_from_proto(
let target = link.target.ok_or_else(|| anyhow!("missing target"))?;
let buffer = project
.update(&mut cx, |this, cx| {
this.wait_for_buffer(target.buffer_id, cx)
this.wait_for_remote_buffer(target.buffer_id, cx)
})
.await?;
let start = target
@ -814,7 +814,7 @@ impl LspCommand for GetReferences {
for location in message.locations {
let target_buffer = project
.update(&mut cx, |this, cx| {
this.wait_for_buffer(location.buffer_id, cx)
this.wait_for_remote_buffer(location.buffer_id, cx)
})
.await?;
let start = location

File diff suppressed because it is too large Load Diff

View File

@ -36,7 +36,7 @@ use std::{
any::Any,
cmp::{self, Ordering},
convert::TryFrom,
ffi::{OsStr, OsString},
ffi::OsStr,
fmt,
future::Future,
mem,
@ -94,7 +94,7 @@ pub struct Snapshot {
entries_by_path: SumTree<Entry>,
entries_by_id: SumTree<PathEntry>,
scan_id: usize,
is_complete: bool,
completed_scan_id: usize,
}
#[derive(Clone)]
@ -125,7 +125,6 @@ pub struct LocalSnapshot {
removed_entry_ids: HashMap<u64, ProjectEntryId>,
next_entry_id: Arc<AtomicUsize>,
snapshot: Snapshot,
extension_counts: HashMap<OsString, usize>,
}
impl Clone for LocalSnapshot {
@ -136,7 +135,6 @@ impl Clone for LocalSnapshot {
removed_entry_ids: self.removed_entry_ids.clone(),
next_entry_id: self.next_entry_id.clone(),
snapshot: self.snapshot.clone(),
extension_counts: self.extension_counts.clone(),
}
}
}
@ -168,6 +166,7 @@ enum ScanState {
struct ShareState {
project_id: u64,
snapshots_tx: watch::Sender<LocalSnapshot>,
resume_updates: watch::Sender<()>,
_maintain_remote_snapshot: Task<Option<()>>,
}
@ -231,7 +230,7 @@ impl Worktree {
entries_by_path: Default::default(),
entries_by_id: Default::default(),
scan_id: 0,
is_complete: false,
completed_scan_id: 0,
};
let (updates_tx, mut updates_rx) = mpsc::unbounded();
@ -345,6 +344,13 @@ impl Worktree {
}
}
pub fn completed_scan_id(&self) -> usize {
match self {
Worktree::Local(worktree) => worktree.snapshot.completed_scan_id,
Worktree::Remote(worktree) => worktree.snapshot.completed_scan_id,
}
}
pub fn is_visible(&self) -> bool {
match self {
Worktree::Local(worktree) => worktree.visible,
@ -425,9 +431,8 @@ impl LocalWorktree {
entries_by_path: Default::default(),
entries_by_id: Default::default(),
scan_id: 0,
is_complete: true,
completed_scan_id: 0,
},
extension_counts: Default::default(),
};
if let Some(metadata) = metadata {
let entry = Entry::new(
@ -957,8 +962,9 @@ impl LocalWorktree {
if let Some(old_path) = old_path {
snapshot.remove_path(&old_path);
}
snapshot.scan_started();
inserted_entry = snapshot.insert_entry(entry, fs.as_ref());
snapshot.scan_id += 1;
snapshot.scan_completed();
}
this.poll_snapshot(true, cx);
Ok(inserted_entry)
@ -969,10 +975,12 @@ impl LocalWorktree {
pub fn share(&mut self, project_id: u64, cx: &mut ModelContext<Worktree>) -> Task<Result<()>> {
let (share_tx, share_rx) = oneshot::channel();
if self.share.is_some() {
let _ = share_tx.send(Ok(()));
if let Some(share) = self.share.as_mut() {
let _ = share_tx.send(());
*share.resume_updates.borrow_mut() = ();
} else {
let (snapshots_tx, mut snapshots_rx) = watch::channel_with(self.snapshot());
let (resume_updates_tx, mut resume_updates_rx) = watch::channel();
let worktree_id = cx.model_id() as u64;
for (path, summary) in self.diagnostic_summaries.iter() {
@ -985,47 +993,49 @@ impl LocalWorktree {
}
}
let maintain_remote_snapshot = cx.background().spawn({
let rpc = self.client.clone();
let _maintain_remote_snapshot = cx.background().spawn({
let client = self.client.clone();
async move {
let mut prev_snapshot = match snapshots_rx.recv().await {
Some(snapshot) => {
let update = proto::UpdateWorktree {
project_id,
worktree_id,
abs_path: snapshot.abs_path().to_string_lossy().into(),
root_name: snapshot.root_name().to_string(),
updated_entries: snapshot
.entries_by_path
.iter()
.map(Into::into)
.collect(),
removed_entries: Default::default(),
scan_id: snapshot.scan_id as u64,
is_last_update: true,
let mut share_tx = Some(share_tx);
let mut prev_snapshot = LocalSnapshot {
ignores_by_parent_abs_path: Default::default(),
git_repositories: Default::default(),
removed_entry_ids: Default::default(),
next_entry_id: Default::default(),
snapshot: Snapshot {
id: WorktreeId(worktree_id as usize),
abs_path: Path::new("").into(),
root_name: Default::default(),
root_char_bag: Default::default(),
entries_by_path: Default::default(),
entries_by_id: Default::default(),
scan_id: 0,
completed_scan_id: 0,
},
};
if let Err(error) = send_worktree_update(&rpc, update).await {
let _ = share_tx.send(Err(error));
return Err(anyhow!("failed to send initial update worktree"));
} else {
let _ = share_tx.send(Ok(()));
snapshot
}
}
None => {
share_tx
.send(Err(anyhow!("worktree dropped before share completed")))
.ok();
return Err(anyhow!("failed to send initial update worktree"));
}
};
while let Some(snapshot) = snapshots_rx.recv().await {
send_worktree_update(
&rpc,
snapshot.build_update(&prev_snapshot, project_id, worktree_id, true),
)
.await?;
#[cfg(any(test, feature = "test-support"))]
const MAX_CHUNK_SIZE: usize = 2;
#[cfg(not(any(test, feature = "test-support")))]
const MAX_CHUNK_SIZE: usize = 256;
let update =
snapshot.build_update(&prev_snapshot, project_id, worktree_id, true);
for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) {
let _ = resume_updates_rx.try_recv();
while let Err(error) = client.request(update.clone()).await {
log::error!("failed to send worktree update: {}", error);
log::info!("waiting to resume updates");
if resume_updates_rx.next().await.is_none() {
return Ok(());
}
}
}
if let Some(share_tx) = share_tx.take() {
let _ = share_tx.send(());
}
prev_snapshot = snapshot;
}
@ -1037,15 +1047,13 @@ impl LocalWorktree {
self.share = Some(ShareState {
project_id,
snapshots_tx,
_maintain_remote_snapshot: maintain_remote_snapshot,
resume_updates: resume_updates_tx,
_maintain_remote_snapshot,
});
}
cx.foreground().spawn(async move {
share_rx
.await
.unwrap_or_else(|_| Err(anyhow!("share ended")))
})
cx.foreground()
.spawn(async move { share_rx.await.map_err(|_| anyhow!("share ended")) })
}
pub fn unshare(&mut self) {
@ -1083,7 +1091,7 @@ impl RemoteWorktree {
}
fn observed_snapshot(&self, scan_id: usize) -> bool {
self.scan_id > scan_id || (self.scan_id == scan_id && self.is_complete)
self.completed_scan_id >= scan_id
}
fn wait_for_snapshot(&mut self, scan_id: usize) -> impl Future<Output = Result<()>> {
@ -1246,7 +1254,9 @@ impl Snapshot {
self.entries_by_path.edit(entries_by_path_edits, &());
self.entries_by_id.edit(entries_by_id_edits, &());
self.scan_id = update.scan_id as usize;
self.is_complete = update.is_last_update;
if update.is_last_update {
self.completed_scan_id = update.scan_id as usize;
}
Ok(())
}
@ -1335,6 +1345,14 @@ impl Snapshot {
&self.root_name
}
pub fn scan_started(&mut self) {
self.scan_id += 1;
}
pub fn scan_completed(&mut self) {
self.completed_scan_id = self.scan_id;
}
pub fn scan_id(&self) -> usize {
self.scan_id
}
@ -1363,10 +1381,6 @@ impl Snapshot {
}
impl LocalSnapshot {
pub fn extension_counts(&self) -> &HashMap<OsString, usize> {
&self.extension_counts
}
// Gives the most specific git repository for a given path
pub(crate) fn repo_for(&self, path: &Path) -> Option<GitRepositoryEntry> {
self.git_repositories
@ -1462,7 +1476,7 @@ impl LocalSnapshot {
updated_entries,
removed_entries,
scan_id: self.scan_id as u64,
is_last_update: true,
is_last_update: self.completed_scan_id == self.scan_id,
}
}
@ -1496,9 +1510,9 @@ impl LocalSnapshot {
}
}
self.entries_by_path.insert_or_replace(entry.clone(), &());
let scan_id = self.scan_id;
let removed_entry = self.entries_by_id.insert_or_replace(
self.entries_by_path.insert_or_replace(entry.clone(), &());
self.entries_by_id.insert_or_replace(
PathEntry {
id: entry.id,
path: entry.path.clone(),
@ -1508,11 +1522,6 @@ impl LocalSnapshot {
&(),
);
if let Some(removed_entry) = removed_entry {
self.dec_extension_count(&removed_entry.path, removed_entry.is_ignored);
}
self.inc_extension_count(&entry.path, entry.is_ignored);
entry
}
@ -1573,7 +1582,6 @@ impl LocalSnapshot {
for mut entry in entries {
self.reuse_entry_id(&mut entry);
self.inc_extension_count(&entry.path, entry.is_ignored);
entries_by_id_edits.push(Edit::Insert(PathEntry {
id: entry.id,
path: entry.path.clone(),
@ -1584,33 +1592,7 @@ impl LocalSnapshot {
}
self.entries_by_path.edit(entries_by_path_edits, &());
let removed_entries = self.entries_by_id.edit(entries_by_id_edits, &());
for removed_entry in removed_entries {
self.dec_extension_count(&removed_entry.path, removed_entry.is_ignored);
}
}
fn inc_extension_count(&mut self, path: &Path, ignored: bool) {
if !ignored {
if let Some(extension) = path.extension() {
if let Some(count) = self.extension_counts.get_mut(extension) {
*count += 1;
} else {
self.extension_counts.insert(extension.into(), 1);
}
}
}
}
fn dec_extension_count(&mut self, path: &Path, ignored: bool) {
if !ignored {
if let Some(extension) = path.extension() {
if let Some(count) = self.extension_counts.get_mut(extension) {
*count -= 1;
}
}
}
self.entries_by_id.edit(entries_by_id_edits, &());
}
fn reuse_entry_id(&mut self, entry: &mut Entry) {
@ -1640,7 +1622,6 @@ impl LocalSnapshot {
.or_insert(entry.id);
*removed_entry_id = cmp::max(*removed_entry_id, entry.id);
entries_by_id_edits.push(Edit::Remove(entry.id));
self.dec_extension_count(&entry.path, entry.is_ignored);
}
self.entries_by_id.edit(entries_by_id_edits, &());
@ -2010,7 +1991,7 @@ impl File {
})
}
pub fn from_dyn(file: Option<&dyn language::File>) -> Option<&Self> {
pub fn from_dyn(file: Option<&Arc<dyn language::File>>) -> Option<&Self> {
file.and_then(|f| f.as_any().downcast_ref())
}
@ -2277,7 +2258,8 @@ impl BackgroundScanner {
let is_dir;
let next_entry_id;
{
let snapshot = self.snapshot.lock();
let mut snapshot = self.snapshot.lock();
snapshot.scan_started();
root_char_bag = snapshot.root_char_bag;
root_abs_path = snapshot.abs_path.clone();
root_inode = snapshot.root_entry().map(|e| e.inode);
@ -2343,6 +2325,8 @@ impl BackgroundScanner {
}
})
.await;
self.snapshot.lock().scan_completed();
}
Ok(())
@ -2470,7 +2454,8 @@ impl BackgroundScanner {
let root_abs_path;
let next_entry_id;
{
let snapshot = self.snapshot.lock();
let mut snapshot = self.snapshot.lock();
snapshot.scan_started();
root_char_bag = snapshot.root_char_bag;
root_abs_path = snapshot.abs_path.clone();
next_entry_id = snapshot.next_entry_id.clone();
@ -2495,7 +2480,6 @@ impl BackgroundScanner {
let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
{
let mut snapshot = self.snapshot.lock();
snapshot.scan_id += 1;
for event in &events {
if let Ok(path) = event.path.strip_prefix(&root_canonical_path) {
snapshot.remove_path(path);
@ -2582,6 +2566,7 @@ impl BackgroundScanner {
self.update_ignore_statuses().await;
self.update_git_repositories();
self.snapshot.lock().scan_completed();
true
}
@ -2976,19 +2961,6 @@ impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
}
}
async fn send_worktree_update(client: &Arc<Client>, update: proto::UpdateWorktree) -> Result<()> {
#[cfg(any(test, feature = "test-support"))]
const MAX_CHUNK_SIZE: usize = 2;
#[cfg(not(any(test, feature = "test-support")))]
const MAX_CHUNK_SIZE: usize = 256;
for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) {
client.request(update).await?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
@ -3479,9 +3451,8 @@ mod tests {
root_name: Default::default(),
root_char_bag: Default::default(),
scan_id: 0,
is_complete: true,
completed_scan_id: 0,
},
extension_counts: Default::default(),
};
initial_snapshot.insert_entry(
Entry::new(
@ -3763,15 +3734,6 @@ mod tests {
.entry_for_path(ignore_parent_path.join(&*GITIGNORE))
.is_some());
}
// Ensure extension counts are correct.
let mut expected_extension_counts = HashMap::default();
for extension in self.entries(false).filter_map(|e| e.path.extension()) {
*expected_extension_counts
.entry(extension.into())
.or_insert(0) += 1;
}
assert_eq!(self.extension_counts, expected_extension_counts);
}
fn to_vec(&self, include_ignored: bool) -> Vec<(&Path, u64, bool)> {

View File

@ -21,6 +21,8 @@ message Envelope {
CreateRoomResponse create_room_response = 10;
JoinRoom join_room = 11;
JoinRoomResponse join_room_response = 12;
RejoinRoom rejoin_room = 108;
RejoinRoomResponse rejoin_room_response = 109;
LeaveRoom leave_room = 13;
Call call = 14;
IncomingCall incoming_call = 15;
@ -37,6 +39,7 @@ message Envelope {
JoinProjectResponse join_project_response = 25;
LeaveProject leave_project = 26;
AddProjectCollaborator add_project_collaborator = 27;
UpdateProjectCollaborator update_project_collaborator = 110;
RemoveProjectCollaborator remove_project_collaborator = 28;
GetDefinition get_definition = 29;
@ -76,6 +79,8 @@ message Envelope {
BufferReloaded buffer_reloaded = 61;
ReloadBuffers reload_buffers = 62;
ReloadBuffersResponse reload_buffers_response = 63;
SynchronizeBuffers synchronize_buffers = 200;
SynchronizeBuffersResponse synchronize_buffers_response = 201;
FormatBuffers format_buffers = 64;
FormatBuffersResponse format_buffers_response = 65;
GetCompletions get_completions = 66;
@ -161,6 +166,40 @@ message JoinRoomResponse {
optional LiveKitConnectionInfo live_kit_connection_info = 2;
}
message RejoinRoom {
uint64 id = 1;
repeated UpdateProject reshared_projects = 2;
repeated RejoinProject rejoined_projects = 3;
}
message RejoinProject {
uint64 id = 1;
repeated RejoinWorktree worktrees = 2;
}
message RejoinWorktree {
uint64 id = 1;
uint64 scan_id = 2;
}
message RejoinRoomResponse {
Room room = 1;
repeated ResharedProject reshared_projects = 2;
repeated RejoinedProject rejoined_projects = 3;
}
message ResharedProject {
uint64 id = 1;
repeated Collaborator collaborators = 2;
}
message RejoinedProject {
uint64 id = 1;
repeated WorktreeMetadata worktrees = 2;
repeated Collaborator collaborators = 3;
repeated LanguageServer language_servers = 4;
}
message LeaveRoom {}
message Room {
@ -322,6 +361,12 @@ message AddProjectCollaborator {
Collaborator collaborator = 2;
}
message UpdateProjectCollaborator {
uint64 project_id = 1;
PeerId old_peer_id = 2;
PeerId new_peer_id = 3;
}
message RemoveProjectCollaborator {
uint64 project_id = 1;
PeerId peer_id = 2;
@ -494,6 +539,20 @@ message ReloadBuffersResponse {
ProjectTransaction transaction = 1;
}
message SynchronizeBuffers {
uint64 project_id = 1;
repeated BufferVersion buffers = 2;
}
message SynchronizeBuffersResponse {
repeated BufferVersion buffers = 1;
}
message BufferVersion {
uint64 id = 1;
repeated VectorClockEntry version = 2;
}
enum FormatTrigger {
Save = 0;
Manual = 1;

View File

@ -494,6 +494,27 @@ impl Peer {
Ok(())
}
pub fn respond_with_unhandled_message(
&self,
envelope: Box<dyn AnyTypedEnvelope>,
) -> Result<()> {
let connection = self.connection_state(envelope.sender_id())?;
let response = proto::Error {
message: format!("message {} was not handled", envelope.payload_type_name()),
};
let message_id = connection
.next_message_id
.fetch_add(1, atomic::Ordering::SeqCst);
connection
.outgoing_tx
.unbounded_send(proto::Message::Envelope(response.into_envelope(
message_id,
Some(envelope.message_id()),
None,
)))?;
Ok(())
}
fn connection_state(&self, connection_id: ConnectionId) -> Result<ConnectionState> {
let connections = self.connections.read();
let connection = connections

View File

@ -42,6 +42,8 @@ pub trait AnyTypedEnvelope: 'static + Send + Sync {
fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
fn is_background(&self) -> bool;
fn original_sender_id(&self) -> Option<PeerId>;
fn sender_id(&self) -> ConnectionId;
fn message_id(&self) -> u32;
}
pub enum MessagePriority {
@ -73,6 +75,14 @@ impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
fn original_sender_id(&self) -> Option<PeerId> {
self.original_sender_id
}
fn sender_id(&self) -> ConnectionId {
self.sender_id
}
fn message_id(&self) -> u32 {
self.message_id
}
}
impl PeerId {
@ -188,6 +198,8 @@ messages!(
(PrepareRename, Background),
(PrepareRenameResponse, Background),
(ProjectEntryResponse, Foreground),
(RejoinRoom, Foreground),
(RejoinRoomResponse, Foreground),
(RemoveContact, Foreground),
(ReloadBuffers, Foreground),
(ReloadBuffersResponse, Foreground),
@ -205,6 +217,8 @@ messages!(
(ShareProjectResponse, Foreground),
(ShowContacts, Foreground),
(StartLanguageServer, Foreground),
(SynchronizeBuffers, Foreground),
(SynchronizeBuffersResponse, Foreground),
(Test, Foreground),
(Unfollow, Foreground),
(UnshareProject, Foreground),
@ -217,6 +231,7 @@ messages!(
(UpdateLanguageServer, Foreground),
(UpdateParticipantLocation, Foreground),
(UpdateProject, Foreground),
(UpdateProjectCollaborator, Foreground),
(UpdateWorktree, Foreground),
(UpdateDiffBase, Background),
(GetPrivateUserInfo, Foreground),
@ -254,6 +269,7 @@ request_messages!(
(JoinChannel, JoinChannelResponse),
(JoinProject, JoinProjectResponse),
(JoinRoom, JoinRoomResponse),
(RejoinRoom, RejoinRoomResponse),
(IncomingCall, Ack),
(OpenBufferById, OpenBufferResponse),
(OpenBufferByPath, OpenBufferResponse),
@ -270,6 +286,7 @@ request_messages!(
(SearchProject, SearchProjectResponse),
(SendChannelMessage, SendChannelMessageResponse),
(ShareProject, ShareProjectResponse),
(SynchronizeBuffers, SynchronizeBuffersResponse),
(Test, Test),
(UpdateBuffer, Ack),
(UpdateParticipantLocation, Ack),
@ -311,6 +328,7 @@ entity_messages!(
SaveBuffer,
SearchProject,
StartLanguageServer,
SynchronizeBuffers,
Unfollow,
UnshareProject,
UpdateBuffer,
@ -319,6 +337,7 @@ entity_messages!(
UpdateFollowers,
UpdateLanguageServer,
UpdateProject,
UpdateProjectCollaborator,
UpdateWorktree,
UpdateDiffBase
);

View File

@ -45,7 +45,7 @@ fn test_random_edits(mut rng: StdRng) {
let mut buffer = Buffer::new(0, 0, reference_string.clone());
LineEnding::normalize(&mut reference_string);
buffer.history.group_interval = Duration::from_millis(rng.gen_range(0..=200));
buffer.set_group_interval(Duration::from_millis(rng.gen_range(0..=200)));
let mut buffer_versions = Vec::new();
log::info!(
"buffer text {:?}, version: {:?}",
@ -488,7 +488,7 @@ fn test_anchors_at_start_and_end() {
fn test_undo_redo() {
let mut buffer = Buffer::new(0, 0, "1234".into());
// Set group interval to zero so as to not group edits in the undo stack.
buffer.history.group_interval = Duration::from_secs(0);
buffer.set_group_interval(Duration::from_secs(0));
buffer.edit([(1..1, "abx")]);
buffer.edit([(3..4, "yzef")]);
@ -524,6 +524,7 @@ fn test_undo_redo() {
fn test_history() {
let mut now = Instant::now();
let mut buffer = Buffer::new(0, 0, "123456".into());
buffer.set_group_interval(Duration::from_millis(300));
let transaction_1 = buffer.start_transaction_at(now).unwrap();
buffer.edit([(2..4, "cd")]);
@ -535,7 +536,7 @@ fn test_history() {
buffer.end_transaction_at(now).unwrap();
assert_eq!(buffer.text(), "12cde6");
now += buffer.history.group_interval + Duration::from_millis(1);
now += buffer.transaction_group_interval() + Duration::from_millis(1);
buffer.start_transaction_at(now);
buffer.edit([(0..1, "a")]);
buffer.edit([(1..1, "b")]);

View File

@ -115,6 +115,10 @@ impl History {
undo_stack: Vec::new(),
redo_stack: Vec::new(),
transaction_depth: 0,
// Don't group transactions in tests unless we opt in, because it's a footgun.
#[cfg(any(test, feature = "test-support"))]
group_interval: Duration::ZERO,
#[cfg(not(any(test, feature = "test-support")))]
group_interval: Duration::from_millis(300),
}
}

View File

@ -216,7 +216,7 @@ impl WorkspaceDb {
let mut result = Vec::new();
let mut delete_tasks = Vec::new();
for (id, location) in self.recent_workspaces()? {
if location.paths().iter().all(|path| dbg!(path).exists()) {
if location.paths().iter().all(|path| path.exists()) {
result.push((id, location));
} else {
delete_tasks.push(self.delete_stale_workspace(id));