diff --git a/Cargo.lock b/Cargo.lock index 6a548969d4..5b49bcd0d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1126,6 +1126,32 @@ dependencies = [ "util", ] +[[package]] +name = "call2" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-broadcast", + "audio2", + "client2", + "collections", + "fs", + "futures 0.3.28", + "gpui2", + "language2", + "live_kit_client", + "log", + "media", + "postage", + "project2", + "schemars", + "serde", + "serde_derive", + "serde_json", + "settings2", + "util", +] + [[package]] name = "cap-fs-ext" version = "0.24.4" diff --git a/Cargo.toml b/Cargo.toml index 4687a99c88..6bab473f2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "crates/auto_update", "crates/breadcrumbs", "crates/call", + "crates/call2", "crates/channel", "crates/cli", "crates/client", diff --git a/crates/audio2/src/audio2.rs b/crates/audio2/src/audio2.rs index 831ed59515..d04587d74e 100644 --- a/crates/audio2/src/audio2.rs +++ b/crates/audio2/src/audio2.rs @@ -78,12 +78,17 @@ impl Audio { Self { tx } } - pub fn play_sound(&self, sound: Sound, cx: &mut AppContext) { + pub fn play_sound(sound: Sound, cx: &mut AppContext) { + if !cx.has_global::() { + return; + } + let Some(source) = SoundRegistry::global(cx).get(sound.file()).log_err() else { return; }; - self.tx + let this = cx.global::(); + this.tx .unbounded_send(Box::new(move |state| { if let Some(output_handle) = state.ensure_output_exists() { output_handle.play_raw(source).log_err(); @@ -92,8 +97,14 @@ impl Audio { .ok(); } - pub fn end_call(&self) { - self.tx + pub fn end_call(cx: &AppContext) { + if !cx.has_global::() { + return; + } + + let this = cx.global::(); + + this.tx .unbounded_send(Box::new(move |state| state.take())) .ok(); } diff --git a/crates/call2/Cargo.toml b/crates/call2/Cargo.toml new file mode 100644 index 0000000000..efc7ab326e --- /dev/null +++ b/crates/call2/Cargo.toml @@ -0,0 +1,52 @@ +[package] +name = "call2" +version = "0.1.0" +edition = "2021" +publish = false + +[lib] +path = "src/call2.rs" +doctest = false + +[features] +test-support = [ + "client2/test-support", + "collections/test-support", + "gpui2/test-support", + "live_kit_client/test-support", + "project2/test-support", + "util/test-support" +] + +[dependencies] +audio2 = { path = "../audio2" } +client2 = { path = "../client2" } +collections = { path = "../collections" } +gpui2 = { path = "../gpui2" } +log.workspace = true +live_kit_client = { path = "../live_kit_client" } +fs = { path = "../fs" } +language2 = { path = "../language2" } +media = { path = "../media" } +project2 = { path = "../project2" } +settings2 = { path = "../settings2" } +util = { path = "../util" } + +anyhow.workspace = true +async-broadcast = "0.4" +futures.workspace = true +postage.workspace = true +schemars.workspace = true +serde.workspace = true +serde_json.workspace = true +serde_derive.workspace = true + +[dev-dependencies] +client2 = { path = "../client2", features = ["test-support"] } +fs = { path = "../fs", features = ["test-support"] } +language2 = { path = "../language2", features = ["test-support"] } +collections = { path = "../collections", features = ["test-support"] } +gpui2 = { path = "../gpui2", features = ["test-support"] } +live_kit_client = { path = "../live_kit_client", features = ["test-support"] } +project2 = { path = "../project2", features = ["test-support"] } +util = { path = "../util", features = ["test-support"] } diff --git a/crates/call2/src/call2.rs b/crates/call2/src/call2.rs new file mode 100644 index 0000000000..b1e6802089 --- /dev/null +++ b/crates/call2/src/call2.rs @@ -0,0 +1,463 @@ +pub mod call_settings; +pub mod participant; +pub mod room; + +use anyhow::{anyhow, Result}; +use audio2::Audio; +use call_settings::CallSettings; +use client2::{ + proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore, + ZED_ALWAYS_ACTIVE, +}; +use collections::HashSet; +use futures::{future::Shared, FutureExt}; +use gpui2::{ + AppContext, AsyncAppContext, Context, EventEmitter, Handle, ModelContext, Subscription, Task, + WeakHandle, +}; +use postage::watch; +use project2::Project; +use std::sync::Arc; + +pub use participant::ParticipantLocation; +pub use room::Room; + +pub fn init(client: Arc, user_store: Handle, cx: &mut AppContext) { + settings2::register::(cx); + + let active_call = cx.entity(|cx| ActiveCall::new(client, user_store, cx)); + cx.set_global(active_call); +} + +#[derive(Clone)] +pub struct IncomingCall { + pub room_id: u64, + pub calling_user: Arc, + pub participants: Vec>, + pub initial_project: Option, +} + +/// Singleton global maintaining the user's participation in a room across workspaces. +pub struct ActiveCall { + room: Option<(Handle, Vec)>, + pending_room_creation: Option, Arc>>>>, + location: Option>, + pending_invites: HashSet, + incoming_call: ( + watch::Sender>, + watch::Receiver>, + ), + client: Arc, + user_store: Handle, + _subscriptions: Vec, +} + +impl EventEmitter for ActiveCall { + type Event = room::Event; +} + +impl ActiveCall { + fn new( + client: Arc, + user_store: Handle, + cx: &mut ModelContext, + ) -> Self { + Self { + room: None, + pending_room_creation: None, + location: None, + pending_invites: Default::default(), + incoming_call: watch::channel(), + + _subscriptions: vec![ + client.add_request_handler(cx.weak_handle(), Self::handle_incoming_call), + client.add_message_handler(cx.weak_handle(), Self::handle_call_canceled), + ], + client, + user_store, + } + } + + pub fn channel_id(&self, cx: &AppContext) -> Option { + self.room()?.read(cx).channel_id() + } + + async fn handle_incoming_call( + this: Handle, + envelope: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result { + let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?; + let call = IncomingCall { + room_id: envelope.payload.room_id, + participants: user_store + .update(&mut cx, |user_store, cx| { + user_store.get_users(envelope.payload.participant_user_ids, cx) + })? + .await?, + calling_user: user_store + .update(&mut cx, |user_store, cx| { + user_store.get_user(envelope.payload.calling_user_id, cx) + })? + .await?, + initial_project: envelope.payload.initial_project, + }; + this.update(&mut cx, |this, _| { + *this.incoming_call.0.borrow_mut() = Some(call); + }); + + Ok(proto::Ack {}) + } + + async fn handle_call_canceled( + this: Handle, + envelope: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + this.update(&mut cx, |this, _| { + let mut incoming_call = this.incoming_call.0.borrow_mut(); + if incoming_call + .as_ref() + .map_or(false, |call| call.room_id == envelope.payload.room_id) + { + incoming_call.take(); + } + }); + Ok(()) + } + + pub fn global(cx: &AppContext) -> Handle { + cx.global::>().clone() + } + + pub fn invite( + &mut self, + called_user_id: u64, + initial_project: Option>, + cx: &mut ModelContext, + ) -> Task> { + if !self.pending_invites.insert(called_user_id) { + return Task::ready(Err(anyhow!("user was already invited"))); + } + cx.notify(); + + let room = if let Some(room) = self.room().cloned() { + Some(Task::ready(Ok(room)).shared()) + } else { + self.pending_room_creation.clone() + }; + + let invite = if let Some(room) = room { + cx.spawn(|_, mut cx| async move { + let room = room.await.map_err(|err| anyhow!("{:?}", err))?; + + let initial_project_id = if let Some(initial_project) = initial_project { + Some( + room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))? + .await?, + ) + } else { + None + }; + + room.update(&mut cx, move |room, cx| { + room.call(called_user_id, initial_project_id, cx) + })? + .await?; + + anyhow::Ok(()) + }) + } else { + let client = self.client.clone(); + let user_store = self.user_store.clone(); + let room = cx + .spawn(|this, mut cx| async move { + let create_room = async { + let room = cx + .update(|cx| { + Room::create( + called_user_id, + initial_project, + client, + user_store, + cx, + ) + })? + .await?; + + this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))? + .await?; + + anyhow::Ok(room) + }; + + let room = create_room.await; + this.update(&mut cx, |this, _| this.pending_room_creation = None)?; + room.map_err(Arc::new) + }) + .shared(); + self.pending_room_creation = Some(room.clone()); + cx.executor().spawn(async move { + room.await.map_err(|err| anyhow!("{:?}", err))?; + anyhow::Ok(()) + }) + }; + + cx.spawn(|this, mut cx| async move { + let result = invite.await; + if result.is_ok() { + this.update(&mut cx, |this, cx| this.report_call_event("invite", cx)); + } else { + // TODO: Resport collaboration error + } + + this.update(&mut cx, |this, cx| { + this.pending_invites.remove(&called_user_id); + cx.notify(); + }); + result + }) + } + + pub fn cancel_invite( + &mut self, + called_user_id: u64, + cx: &mut ModelContext, + ) -> Task> { + let room_id = if let Some(room) = self.room() { + room.read(cx).id() + } else { + return Task::ready(Err(anyhow!("no active call"))); + }; + + let client = self.client.clone(); + cx.executor().spawn(async move { + client + .request(proto::CancelCall { + room_id, + called_user_id, + }) + .await?; + anyhow::Ok(()) + }) + } + + pub fn incoming(&self) -> watch::Receiver> { + self.incoming_call.1.clone() + } + + pub fn accept_incoming(&mut self, cx: &mut ModelContext) -> Task> { + if self.room.is_some() { + return Task::ready(Err(anyhow!("cannot join while on another call"))); + } + + let call = if let Some(call) = self.incoming_call.1.borrow().clone() { + call + } else { + return Task::ready(Err(anyhow!("no incoming call"))); + }; + + let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx); + + cx.spawn(|this, mut cx| async move { + let room = join.await?; + this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))? + .await?; + this.update(&mut cx, |this, cx| { + this.report_call_event("accept incoming", cx) + }); + Ok(()) + }) + } + + pub fn decline_incoming(&mut self, cx: &mut ModelContext) -> Result<()> { + let call = self + .incoming_call + .0 + .borrow_mut() + .take() + .ok_or_else(|| anyhow!("no incoming call"))?; + report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx); + self.client.send(proto::DeclineCall { + room_id: call.room_id, + })?; + Ok(()) + } + + pub fn join_channel( + &mut self, + channel_id: u64, + cx: &mut ModelContext, + ) -> Task>> { + if let Some(room) = self.room().cloned() { + if room.read(cx).channel_id() == Some(channel_id) { + return Task::ready(Ok(room)); + } else { + room.update(cx, |room, cx| room.clear_state(cx)); + } + } + + let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx); + + cx.spawn(|this, mut cx| async move { + let room = join.await?; + this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))? + .await?; + this.update(&mut cx, |this, cx| { + this.report_call_event("join channel", cx) + }); + Ok(room) + }) + } + + pub fn hang_up(&mut self, cx: &mut ModelContext) -> Task> { + cx.notify(); + self.report_call_event("hang up", cx); + + Audio::end_call(cx); + if let Some((room, _)) = self.room.take() { + room.update(cx, |room, cx| room.leave(cx)) + } else { + Task::ready(Ok(())) + } + } + + pub fn share_project( + &mut self, + project: Handle, + cx: &mut ModelContext, + ) -> Task> { + if let Some((room, _)) = self.room.as_ref() { + self.report_call_event("share project", cx); + room.update(cx, |room, cx| room.share_project(project, cx)) + } else { + Task::ready(Err(anyhow!("no active call"))) + } + } + + pub fn unshare_project( + &mut self, + project: Handle, + cx: &mut ModelContext, + ) -> Result<()> { + if let Some((room, _)) = self.room.as_ref() { + self.report_call_event("unshare project", cx); + room.update(cx, |room, cx| room.unshare_project(project, cx)) + } else { + Err(anyhow!("no active call")) + } + } + + pub fn location(&self) -> Option<&WeakHandle> { + self.location.as_ref() + } + + pub fn set_location( + &mut self, + project: Option<&Handle>, + cx: &mut ModelContext, + ) -> Task> { + if project.is_some() || !*ZED_ALWAYS_ACTIVE { + self.location = project.map(|project| project.downgrade()); + if let Some((room, _)) = self.room.as_ref() { + return room.update(cx, |room, cx| room.set_location(project, cx)); + } + } + Task::ready(Ok(())) + } + + fn set_room( + &mut self, + room: Option>, + cx: &mut ModelContext, + ) -> Task> { + if room.as_ref() != self.room.as_ref().map(|room| &room.0) { + cx.notify(); + if let Some(room) = room { + if room.read(cx).status().is_offline() { + self.room = None; + Task::ready(Ok(())) + } else { + let subscriptions = vec![ + cx.observe(&room, |this, room, cx| { + if room.read(cx).status().is_offline() { + this.set_room(None, cx).detach_and_log_err(cx); + } + + cx.notify(); + }), + cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())), + ]; + self.room = Some((room.clone(), subscriptions)); + let location = self + .location + .as_ref() + .and_then(|location| location.upgrade()); + room.update(cx, |room, cx| room.set_location(location.as_ref(), cx)) + } + } else { + self.room = None; + Task::ready(Ok(())) + } + } else { + Task::ready(Ok(())) + } + } + + pub fn room(&self) -> Option<&Handle> { + self.room.as_ref().map(|(room, _)| room) + } + + pub fn client(&self) -> Arc { + self.client.clone() + } + + pub fn pending_invites(&self) -> &HashSet { + &self.pending_invites + } + + pub fn report_call_event(&self, operation: &'static str, cx: &AppContext) { + if let Some(room) = self.room() { + let room = room.read(cx); + report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx); + } + } +} + +pub fn report_call_event_for_room( + operation: &'static str, + room_id: u64, + channel_id: Option, + client: &Arc, + cx: &AppContext, +) { + let telemetry = client.telemetry(); + let telemetry_settings = *settings2::get::(cx); + let event = ClickhouseEvent::Call { + operation, + room_id: Some(room_id), + channel_id, + }; + telemetry.report_clickhouse_event(event, telemetry_settings); +} + +pub fn report_call_event_for_channel( + operation: &'static str, + channel_id: u64, + client: &Arc, + cx: &AppContext, +) { + let room = ActiveCall::global(cx).read(cx).room(); + + let telemetry = client.telemetry(); + let telemetry_settings = *settings2::get::(cx); + + let event = ClickhouseEvent::Call { + operation, + room_id: room.map(|r| r.read(cx).id()), + channel_id: Some(channel_id), + }; + telemetry.report_clickhouse_event(event, telemetry_settings); +} diff --git a/crates/call2/src/call_settings.rs b/crates/call2/src/call_settings.rs new file mode 100644 index 0000000000..4cec8c50a0 --- /dev/null +++ b/crates/call2/src/call_settings.rs @@ -0,0 +1,27 @@ +use schemars::JsonSchema; +use serde_derive::{Deserialize, Serialize}; +use settings2::Setting; + +#[derive(Deserialize, Debug)] +pub struct CallSettings { + pub mute_on_join: bool, +} + +#[derive(Clone, Default, Serialize, Deserialize, JsonSchema, Debug)] +pub struct CallSettingsContent { + pub mute_on_join: Option, +} + +impl Setting for CallSettings { + const KEY: Option<&'static str> = Some("calls"); + + type FileContent = CallSettingsContent; + + fn load( + default_value: &Self::FileContent, + user_values: &[&Self::FileContent], + _: &gpui2::AppContext, + ) -> anyhow::Result { + Self::load_via_json_merge(default_value, user_values) + } +} diff --git a/crates/call2/src/participant.rs b/crates/call2/src/participant.rs new file mode 100644 index 0000000000..5bc290b182 --- /dev/null +++ b/crates/call2/src/participant.rs @@ -0,0 +1,69 @@ +use anyhow::{anyhow, Result}; +use client2::ParticipantIndex; +use client2::{proto, User}; +use collections::HashMap; +use gpui2::WeakHandle; +pub use live_kit_client::Frame; +use live_kit_client::RemoteAudioTrack; +use project2::Project; +use std::{fmt, sync::Arc}; + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum ParticipantLocation { + SharedProject { project_id: u64 }, + UnsharedProject, + External, +} + +impl ParticipantLocation { + pub fn from_proto(location: Option) -> Result { + match location.and_then(|l| l.variant) { + Some(proto::participant_location::Variant::SharedProject(project)) => { + Ok(Self::SharedProject { + project_id: project.id, + }) + } + Some(proto::participant_location::Variant::UnsharedProject(_)) => { + Ok(Self::UnsharedProject) + } + Some(proto::participant_location::Variant::External(_)) => Ok(Self::External), + None => Err(anyhow!("participant location was not provided")), + } + } +} + +#[derive(Clone, Default)] +pub struct LocalParticipant { + pub projects: Vec, + pub active_project: Option>, +} + +#[derive(Clone, Debug)] +pub struct RemoteParticipant { + pub user: Arc, + pub peer_id: proto::PeerId, + pub projects: Vec, + pub location: ParticipantLocation, + pub participant_index: ParticipantIndex, + pub muted: bool, + pub speaking: bool, + pub video_tracks: HashMap>, + pub audio_tracks: HashMap>, +} + +#[derive(Clone)] +pub struct RemoteVideoTrack { + pub(crate) live_kit_track: Arc, +} + +impl fmt::Debug for RemoteVideoTrack { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RemoteVideoTrack").finish() + } +} + +impl RemoteVideoTrack { + pub fn frames(&self) -> async_broadcast::Receiver { + self.live_kit_track.frames() + } +} diff --git a/crates/call2/src/room.rs b/crates/call2/src/room.rs new file mode 100644 index 0000000000..6dea8adf7f --- /dev/null +++ b/crates/call2/src/room.rs @@ -0,0 +1,1601 @@ +use crate::{ + call_settings::CallSettings, + participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack}, + IncomingCall, +}; +use anyhow::{anyhow, Result}; +use audio2::{Audio, Sound}; +use client2::{ + proto::{self, PeerId}, + Client, ParticipantIndex, TypedEnvelope, User, UserStore, +}; +use collections::{BTreeMap, HashMap, HashSet}; +use fs::Fs; +use futures::{FutureExt, StreamExt}; +use gpui2::{ + AppContext, AsyncAppContext, Context, EventEmitter, Handle, ModelContext, Task, WeakHandle, +}; +use language2::LanguageRegistry; +use live_kit_client::{ + LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate, + RemoteVideoTrackUpdate, +}; +use postage::{sink::Sink, stream::Stream, watch}; +use project2::Project; +use std::{future::Future, mem, sync::Arc, time::Duration}; +use util::{post_inc, ResultExt, TryFutureExt}; + +pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30); + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Event { + ParticipantLocationChanged { + participant_id: proto::PeerId, + }, + RemoteVideoTracksChanged { + participant_id: proto::PeerId, + }, + RemoteAudioTracksChanged { + participant_id: proto::PeerId, + }, + RemoteProjectShared { + owner: Arc, + project_id: u64, + worktree_root_names: Vec, + }, + RemoteProjectUnshared { + project_id: u64, + }, + RemoteProjectJoined { + project_id: u64, + }, + RemoteProjectInvitationDiscarded { + project_id: u64, + }, + Left, +} + +pub struct Room { + id: u64, + channel_id: Option, + live_kit: Option, + status: RoomStatus, + shared_projects: HashSet>, + joined_projects: HashSet>, + local_participant: LocalParticipant, + remote_participants: BTreeMap, + pending_participants: Vec>, + participant_user_ids: HashSet, + pending_call_count: usize, + leave_when_empty: bool, + client: Arc, + user_store: Handle, + follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec>, + client_subscriptions: Vec, + subscriptions: Vec, + room_update_completed_tx: watch::Sender>, + room_update_completed_rx: watch::Receiver>, + pending_room_update: Option>, + maintain_connection: Option>>, +} + +impl EventEmitter for Room { + type Event = Event; +} + +impl Room { + pub fn channel_id(&self) -> Option { + self.channel_id + } + + pub fn is_sharing_project(&self) -> bool { + !self.shared_projects.is_empty() + } + + #[cfg(any(test, feature = "test-support"))] + pub fn is_connected(&self) -> bool { + if let Some(live_kit) = self.live_kit.as_ref() { + matches!( + *live_kit.room.status().borrow(), + live_kit_client::ConnectionState::Connected { .. } + ) + } else { + false + } + } + + fn new( + id: u64, + channel_id: Option, + live_kit_connection_info: Option, + client: Arc, + user_store: Handle, + cx: &mut ModelContext, + ) -> Self { + let live_kit_room = if let Some(connection_info) = live_kit_connection_info { + let room = live_kit_client::Room::new(); + let mut status = room.status(); + // Consume the initial status of the room. + let _ = status.try_recv(); + let _maintain_room = cx.spawn(|this, mut cx| async move { + while let Some(status) = status.next().await { + let this = if let Some(this) = this.upgrade() { + this + } else { + break; + }; + + if status == live_kit_client::ConnectionState::Disconnected { + this.update(&mut cx, |this, cx| this.leave(cx).log_err()) + .ok(); + break; + } + } + }); + + let mut track_video_changes = room.remote_video_track_updates(); + let _maintain_video_tracks = cx.spawn(|this, mut cx| async move { + while let Some(track_change) = track_video_changes.next().await { + let this = if let Some(this) = this.upgrade() { + this + } else { + break; + }; + + this.update(&mut cx, |this, cx| { + this.remote_video_track_updated(track_change, cx).log_err() + }) + .ok(); + } + }); + + let mut track_audio_changes = room.remote_audio_track_updates(); + let _maintain_audio_tracks = cx.spawn(|this, mut cx| async move { + while let Some(track_change) = track_audio_changes.next().await { + let this = if let Some(this) = this.upgrade() { + this + } else { + break; + }; + + this.update(&mut cx, |this, cx| { + this.remote_audio_track_updated(track_change, cx).log_err() + }) + .ok(); + } + }); + + let connect = room.connect(&connection_info.server_url, &connection_info.token); + cx.spawn(|this, mut cx| async move { + connect.await?; + + if !cx.update(|cx| Self::mute_on_join(cx))? { + this.update(&mut cx, |this, cx| this.share_microphone(cx))? + .await?; + } + + anyhow::Ok(()) + }) + .detach_and_log_err(cx); + + Some(LiveKitRoom { + room, + screen_track: LocalTrack::None, + microphone_track: LocalTrack::None, + next_publish_id: 0, + muted_by_user: false, + deafened: false, + speaking: false, + _maintain_room, + _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks], + }) + } else { + None + }; + + let maintain_connection = + cx.spawn(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()); + + Audio::play_sound(Sound::Joined, cx); + + let (room_update_completed_tx, room_update_completed_rx) = watch::channel(); + + Self { + id, + channel_id, + live_kit: live_kit_room, + status: RoomStatus::Online, + shared_projects: Default::default(), + joined_projects: Default::default(), + participant_user_ids: Default::default(), + local_participant: Default::default(), + remote_participants: Default::default(), + pending_participants: Default::default(), + pending_call_count: 0, + client_subscriptions: vec![ + client.add_message_handler(cx.weak_handle(), Self::handle_room_updated) + ], + subscriptions: vec![ + cx.on_release(Self::released), + cx.on_app_quit(Self::app_will_quit), + ], + leave_when_empty: false, + pending_room_update: None, + client, + user_store, + follows_by_leader_id_project_id: Default::default(), + maintain_connection: Some(maintain_connection), + room_update_completed_tx, + room_update_completed_rx, + } + } + + pub(crate) fn create( + called_user_id: u64, + initial_project: Option>, + client: Arc, + user_store: Handle, + cx: &mut AppContext, + ) -> Task>> { + cx.spawn(|mut cx| async move { + let response = client.request(proto::CreateRoom {}).await?; + let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?; + let room = cx.entity(|cx| { + Self::new( + room_proto.id, + None, + response.live_kit_connection_info, + client, + user_store, + cx, + ) + })?; + + let initial_project_id = if let Some(initial_project) = initial_project { + let initial_project_id = room + .update(&mut cx, |room, cx| { + room.share_project(initial_project.clone(), cx) + })? + .await?; + Some(initial_project_id) + } else { + None + }; + + match room + .update(&mut cx, |room, cx| { + room.leave_when_empty = true; + room.call(called_user_id, initial_project_id, cx) + })? + .await + { + Ok(()) => Ok(room), + Err(error) => Err(anyhow!("room creation failed: {:?}", error)), + } + }) + } + + pub(crate) fn join_channel( + channel_id: u64, + client: Arc, + user_store: Handle, + cx: &mut AppContext, + ) -> Task>> { + cx.spawn(|cx| async move { + Self::from_join_response( + client.request(proto::JoinChannel { channel_id }).await?, + client, + user_store, + cx, + ) + }) + } + + pub(crate) fn join( + call: &IncomingCall, + client: Arc, + user_store: Handle, + cx: &mut AppContext, + ) -> Task>> { + let id = call.room_id; + cx.spawn(|cx| async move { + Self::from_join_response( + client.request(proto::JoinRoom { id }).await?, + client, + user_store, + cx, + ) + }) + } + + fn released(&mut self, cx: &mut AppContext) { + if self.status.is_online() { + self.leave_internal(cx).detach_and_log_err(cx); + } + } + + fn app_will_quit(&mut self, cx: &mut ModelContext) -> impl Future { + let task = if self.status.is_online() { + let leave = self.leave_internal(cx); + Some(cx.executor().spawn(async move { + leave.await.log_err(); + })) + } else { + None + }; + + async move { + if let Some(task) = task { + task.await; + } + } + } + + pub fn mute_on_join(cx: &AppContext) -> bool { + settings2::get::(cx).mute_on_join || client2::IMPERSONATE_LOGIN.is_some() + } + + fn from_join_response( + response: proto::JoinRoomResponse, + client: Arc, + user_store: Handle, + mut cx: AsyncAppContext, + ) -> Result> { + let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?; + let room = cx.entity(|cx| { + Self::new( + room_proto.id, + response.channel_id, + response.live_kit_connection_info, + client, + user_store, + cx, + ) + })?; + room.update(&mut cx, |room, cx| { + room.leave_when_empty = room.channel_id.is_none(); + room.apply_room_update(room_proto, cx)?; + anyhow::Ok(()) + })?; + Ok(room) + } + + fn should_leave(&self) -> bool { + self.leave_when_empty + && self.pending_room_update.is_none() + && self.pending_participants.is_empty() + && self.remote_participants.is_empty() + && self.pending_call_count == 0 + } + + pub(crate) fn leave(&mut self, cx: &mut ModelContext) -> Task> { + cx.notify(); + cx.emit(Event::Left); + self.leave_internal(cx) + } + + fn leave_internal(&mut self, cx: &mut AppContext) -> Task> { + if self.status.is_offline() { + return Task::ready(Err(anyhow!("room is offline"))); + } + + log::info!("leaving room"); + Audio::play_sound(Sound::Leave, cx); + + self.clear_state(cx); + + let leave_room = self.client.request(proto::LeaveRoom {}); + cx.executor().spawn(async move { + leave_room.await?; + anyhow::Ok(()) + }) + } + + pub(crate) fn clear_state(&mut self, cx: &mut AppContext) { + for project in self.shared_projects.drain() { + if let Some(project) = project.upgrade() { + project.update(cx, |project, cx| { + project.unshare(cx).log_err(); + }); + } + } + for project in self.joined_projects.drain() { + if let Some(project) = project.upgrade() { + project.update(cx, |project, cx| { + project.disconnected_from_host(cx); + project.close(cx); + }); + } + } + + self.status = RoomStatus::Offline; + self.remote_participants.clear(); + self.pending_participants.clear(); + self.participant_user_ids.clear(); + self.client_subscriptions.clear(); + self.live_kit.take(); + self.pending_room_update.take(); + self.maintain_connection.take(); + } + + async fn maintain_connection( + this: WeakHandle, + client: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + let mut client_status = client.status(); + loop { + let _ = client_status.try_recv(); + let is_connected = client_status.borrow().is_connected(); + // Even if we're initially connected, any future change of the status means we momentarily disconnected. + if !is_connected || client_status.next().await.is_some() { + log::info!("detected client disconnection"); + + this.upgrade() + .ok_or_else(|| anyhow!("room was dropped"))? + .update(&mut cx, |this, cx| { + this.status = RoomStatus::Rejoining; + cx.notify(); + })?; + + // Wait for client to re-establish a connection to the server. + { + let mut reconnection_timeout = cx.executor().timer(RECONNECT_TIMEOUT).fuse(); + let client_reconnection = async { + let mut remaining_attempts = 3; + while remaining_attempts > 0 { + if client_status.borrow().is_connected() { + log::info!("client reconnected, attempting to rejoin room"); + + let Some(this) = this.upgrade() else { break }; + match this.update(&mut cx, |this, cx| this.rejoin(cx)) { + Ok(task) => { + if task.await.log_err().is_some() { + return true; + } else { + remaining_attempts -= 1; + } + } + Err(_app_dropped) => return false, + } + } else if client_status.borrow().is_signed_out() { + return false; + } + + log::info!( + "waiting for client status change, remaining attempts {}", + remaining_attempts + ); + client_status.next().await; + } + false + } + .fuse(); + futures::pin_mut!(client_reconnection); + + futures::select_biased! { + reconnected = client_reconnection => { + if reconnected { + log::info!("successfully reconnected to room"); + // If we successfully joined the room, go back around the loop + // waiting for future connection status changes. + continue; + } + } + _ = reconnection_timeout => { + log::info!("room reconnection timeout expired"); + } + } + } + + break; + } + } + + // The client failed to re-establish a connection to the server + // or an error occurred while trying to re-join the room. Either way + // we leave the room and return an error. + if let Some(this) = this.upgrade() { + log::info!("reconnection failed, leaving room"); + let _ = this.update(&mut cx, |this, cx| this.leave(cx))?; + } + Err(anyhow!( + "can't reconnect to room: client failed to re-establish connection" + )) + } + + fn rejoin(&mut self, cx: &mut ModelContext) -> Task> { + let mut projects = HashMap::default(); + let mut reshared_projects = Vec::new(); + let mut rejoined_projects = Vec::new(); + self.shared_projects.retain(|project| { + if let Some(handle) = project.upgrade() { + let project = handle.read(cx); + if let Some(project_id) = project.remote_id() { + projects.insert(project_id, handle.clone()); + reshared_projects.push(proto::UpdateProject { + project_id, + worktrees: project.worktree_metadata_protos(cx), + }); + return true; + } + } + false + }); + self.joined_projects.retain(|project| { + if let Some(handle) = project.upgrade() { + let project = handle.read(cx); + if let Some(project_id) = project.remote_id() { + projects.insert(project_id, handle.clone()); + rejoined_projects.push(proto::RejoinProject { + id: project_id, + worktrees: project + .worktrees() + .map(|worktree| { + let worktree = worktree.read(cx); + proto::RejoinWorktree { + id: worktree.id().to_proto(), + scan_id: worktree.completed_scan_id() as u64, + } + }) + .collect(), + }); + } + return true; + } + false + }); + + let response = self.client.request_envelope(proto::RejoinRoom { + id: self.id, + reshared_projects, + rejoined_projects, + }); + + cx.spawn(|this, mut cx| async move { + let response = response.await?; + let message_id = response.message_id; + let response = response.payload; + let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?; + this.update(&mut cx, |this, cx| { + this.status = RoomStatus::Online; + this.apply_room_update(room_proto, cx)?; + + for reshared_project in response.reshared_projects { + if let Some(project) = projects.get(&reshared_project.id) { + project.update(cx, |project, cx| { + project.reshared(reshared_project, cx).log_err(); + }); + } + } + + for rejoined_project in response.rejoined_projects { + if let Some(project) = projects.get(&rejoined_project.id) { + project.update(cx, |project, cx| { + project.rejoined(rejoined_project, message_id, cx).log_err(); + }); + } + } + + anyhow::Ok(()) + })? + }) + } + + pub fn id(&self) -> u64 { + self.id + } + + pub fn status(&self) -> RoomStatus { + self.status + } + + pub fn local_participant(&self) -> &LocalParticipant { + &self.local_participant + } + + pub fn remote_participants(&self) -> &BTreeMap { + &self.remote_participants + } + + pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> { + self.remote_participants + .values() + .find(|p| p.peer_id == peer_id) + } + + pub fn pending_participants(&self) -> &[Arc] { + &self.pending_participants + } + + pub fn contains_participant(&self, user_id: u64) -> bool { + self.participant_user_ids.contains(&user_id) + } + + pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] { + self.follows_by_leader_id_project_id + .get(&(leader_id, project_id)) + .map_or(&[], |v| v.as_slice()) + } + + /// Returns the most 'active' projects, defined as most people in the project + pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> { + let mut project_hosts_and_guest_counts = HashMap::, u32)>::default(); + for participant in self.remote_participants.values() { + match participant.location { + ParticipantLocation::SharedProject { project_id } => { + project_hosts_and_guest_counts + .entry(project_id) + .or_default() + .1 += 1; + } + ParticipantLocation::External | ParticipantLocation::UnsharedProject => {} + } + for project in &participant.projects { + project_hosts_and_guest_counts + .entry(project.id) + .or_default() + .0 = Some(participant.user.id); + } + } + + if let Some(user) = self.user_store.read(cx).current_user() { + for project in &self.local_participant.projects { + project_hosts_and_guest_counts + .entry(project.id) + .or_default() + .0 = Some(user.id); + } + } + + project_hosts_and_guest_counts + .into_iter() + .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count))) + .max_by_key(|(_, _, guest_count)| *guest_count) + .map(|(id, host, _)| (id, host)) + } + + async fn handle_room_updated( + this: Handle, + envelope: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + let room = envelope + .payload + .room + .ok_or_else(|| anyhow!("invalid room"))?; + this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))? + } + + fn apply_room_update( + &mut self, + mut room: proto::Room, + cx: &mut ModelContext, + ) -> Result<()> { + // Filter ourselves out from the room's participants. + let local_participant_ix = room + .participants + .iter() + .position(|participant| Some(participant.user_id) == self.client.user_id()); + let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix)); + + let pending_participant_user_ids = room + .pending_participants + .iter() + .map(|p| p.user_id) + .collect::>(); + + let remote_participant_user_ids = room + .participants + .iter() + .map(|p| p.user_id) + .collect::>(); + + let (remote_participants, pending_participants) = + self.user_store.update(cx, move |user_store, cx| { + ( + user_store.get_users(remote_participant_user_ids, cx), + user_store.get_users(pending_participant_user_ids, cx), + ) + }); + + self.pending_room_update = Some(cx.spawn(|this, mut cx| async move { + let (remote_participants, pending_participants) = + futures::join!(remote_participants, pending_participants); + + this.update(&mut cx, |this, cx| { + this.participant_user_ids.clear(); + + if let Some(participant) = local_participant { + this.local_participant.projects = participant.projects; + } else { + this.local_participant.projects.clear(); + } + + if let Some(participants) = remote_participants.log_err() { + for (participant, user) in room.participants.into_iter().zip(participants) { + let Some(peer_id) = participant.peer_id else { + continue; + }; + let participant_index = ParticipantIndex(participant.participant_index); + this.participant_user_ids.insert(participant.user_id); + + let old_projects = this + .remote_participants + .get(&participant.user_id) + .into_iter() + .flat_map(|existing| &existing.projects) + .map(|project| project.id) + .collect::>(); + let new_projects = participant + .projects + .iter() + .map(|project| project.id) + .collect::>(); + + for project in &participant.projects { + if !old_projects.contains(&project.id) { + cx.emit(Event::RemoteProjectShared { + owner: user.clone(), + project_id: project.id, + worktree_root_names: project.worktree_root_names.clone(), + }); + } + } + + for unshared_project_id in old_projects.difference(&new_projects) { + this.joined_projects.retain(|project| { + if let Some(project) = project.upgrade() { + project.update(cx, |project, cx| { + if project.remote_id() == Some(*unshared_project_id) { + project.disconnected_from_host(cx); + false + } else { + true + } + }) + } else { + false + } + }); + cx.emit(Event::RemoteProjectUnshared { + project_id: *unshared_project_id, + }); + } + + let location = ParticipantLocation::from_proto(participant.location) + .unwrap_or(ParticipantLocation::External); + if let Some(remote_participant) = + this.remote_participants.get_mut(&participant.user_id) + { + remote_participant.peer_id = peer_id; + remote_participant.projects = participant.projects; + remote_participant.participant_index = participant_index; + if location != remote_participant.location { + remote_participant.location = location; + cx.emit(Event::ParticipantLocationChanged { + participant_id: peer_id, + }); + } + } else { + this.remote_participants.insert( + participant.user_id, + RemoteParticipant { + user: user.clone(), + participant_index, + peer_id, + projects: participant.projects, + location, + muted: true, + speaking: false, + video_tracks: Default::default(), + audio_tracks: Default::default(), + }, + ); + + Audio::play_sound(Sound::Joined, cx); + + if let Some(live_kit) = this.live_kit.as_ref() { + let video_tracks = + live_kit.room.remote_video_tracks(&user.id.to_string()); + let audio_tracks = + live_kit.room.remote_audio_tracks(&user.id.to_string()); + let publications = live_kit + .room + .remote_audio_track_publications(&user.id.to_string()); + + for track in video_tracks { + this.remote_video_track_updated( + RemoteVideoTrackUpdate::Subscribed(track), + cx, + ) + .log_err(); + } + + for (track, publication) in + audio_tracks.iter().zip(publications.iter()) + { + this.remote_audio_track_updated( + RemoteAudioTrackUpdate::Subscribed( + track.clone(), + publication.clone(), + ), + cx, + ) + .log_err(); + } + } + } + } + + this.remote_participants.retain(|user_id, participant| { + if this.participant_user_ids.contains(user_id) { + true + } else { + for project in &participant.projects { + cx.emit(Event::RemoteProjectUnshared { + project_id: project.id, + }); + } + false + } + }); + } + + if let Some(pending_participants) = pending_participants.log_err() { + this.pending_participants = pending_participants; + for participant in &this.pending_participants { + this.participant_user_ids.insert(participant.id); + } + } + + this.follows_by_leader_id_project_id.clear(); + for follower in room.followers { + let project_id = follower.project_id; + let (leader, follower) = match (follower.leader_id, follower.follower_id) { + (Some(leader), Some(follower)) => (leader, follower), + + _ => { + log::error!("Follower message {follower:?} missing some state"); + continue; + } + }; + + let list = this + .follows_by_leader_id_project_id + .entry((leader, project_id)) + .or_insert(Vec::new()); + if !list.contains(&follower) { + list.push(follower); + } + } + + this.pending_room_update.take(); + if this.should_leave() { + log::info!("room is empty, leaving"); + let _ = this.leave(cx); + } + + this.user_store.update(cx, |user_store, cx| { + let participant_indices_by_user_id = this + .remote_participants + .iter() + .map(|(user_id, participant)| (*user_id, participant.participant_index)) + .collect(); + user_store.set_participant_indices(participant_indices_by_user_id, cx); + }); + + this.check_invariants(); + this.room_update_completed_tx.try_send(Some(())).ok(); + cx.notify(); + }) + .ok(); + })); + + cx.notify(); + Ok(()) + } + + pub fn room_update_completed(&mut self) -> impl Future { + let mut done_rx = self.room_update_completed_rx.clone(); + async move { + while let Some(result) = done_rx.next().await { + if result.is_some() { + break; + } + } + } + } + + fn remote_video_track_updated( + &mut self, + change: RemoteVideoTrackUpdate, + cx: &mut ModelContext, + ) -> Result<()> { + match change { + RemoteVideoTrackUpdate::Subscribed(track) => { + let user_id = track.publisher_id().parse()?; + let track_id = track.sid().to_string(); + let participant = self + .remote_participants + .get_mut(&user_id) + .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?; + participant.video_tracks.insert( + track_id.clone(), + Arc::new(RemoteVideoTrack { + live_kit_track: track, + }), + ); + cx.emit(Event::RemoteVideoTracksChanged { + participant_id: participant.peer_id, + }); + } + RemoteVideoTrackUpdate::Unsubscribed { + publisher_id, + track_id, + } => { + let user_id = publisher_id.parse()?; + let participant = self + .remote_participants + .get_mut(&user_id) + .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?; + participant.video_tracks.remove(&track_id); + cx.emit(Event::RemoteVideoTracksChanged { + participant_id: participant.peer_id, + }); + } + } + + cx.notify(); + Ok(()) + } + + fn remote_audio_track_updated( + &mut self, + change: RemoteAudioTrackUpdate, + cx: &mut ModelContext, + ) -> Result<()> { + match change { + RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => { + let mut speaker_ids = speakers + .into_iter() + .filter_map(|speaker_sid| speaker_sid.parse().ok()) + .collect::>(); + speaker_ids.sort_unstable(); + for (sid, participant) in &mut self.remote_participants { + if let Ok(_) = speaker_ids.binary_search(sid) { + participant.speaking = true; + } else { + participant.speaking = false; + } + } + if let Some(id) = self.client.user_id() { + if let Some(room) = &mut self.live_kit { + if let Ok(_) = speaker_ids.binary_search(&id) { + room.speaking = true; + } else { + room.speaking = false; + } + } + } + cx.notify(); + } + RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => { + let mut found = false; + for participant in &mut self.remote_participants.values_mut() { + for track in participant.audio_tracks.values() { + if track.sid() == track_id { + found = true; + break; + } + } + if found { + participant.muted = muted; + break; + } + } + + cx.notify(); + } + RemoteAudioTrackUpdate::Subscribed(track, publication) => { + let user_id = track.publisher_id().parse()?; + let track_id = track.sid().to_string(); + let participant = self + .remote_participants + .get_mut(&user_id) + .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?; + + participant.audio_tracks.insert(track_id.clone(), track); + participant.muted = publication.is_muted(); + + cx.emit(Event::RemoteAudioTracksChanged { + participant_id: participant.peer_id, + }); + } + RemoteAudioTrackUpdate::Unsubscribed { + publisher_id, + track_id, + } => { + let user_id = publisher_id.parse()?; + let participant = self + .remote_participants + .get_mut(&user_id) + .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?; + participant.audio_tracks.remove(&track_id); + cx.emit(Event::RemoteAudioTracksChanged { + participant_id: participant.peer_id, + }); + } + } + + cx.notify(); + Ok(()) + } + + fn check_invariants(&self) { + #[cfg(any(test, feature = "test-support"))] + { + for participant in self.remote_participants.values() { + assert!(self.participant_user_ids.contains(&participant.user.id)); + assert_ne!(participant.user.id, self.client.user_id().unwrap()); + } + + for participant in &self.pending_participants { + assert!(self.participant_user_ids.contains(&participant.id)); + assert_ne!(participant.id, self.client.user_id().unwrap()); + } + + assert_eq!( + self.participant_user_ids.len(), + self.remote_participants.len() + self.pending_participants.len() + ); + } + } + + pub(crate) fn call( + &mut self, + called_user_id: u64, + initial_project_id: Option, + cx: &mut ModelContext, + ) -> Task> { + if self.status.is_offline() { + return Task::ready(Err(anyhow!("room is offline"))); + } + + cx.notify(); + let client = self.client.clone(); + let room_id = self.id; + self.pending_call_count += 1; + cx.spawn(|this, mut cx| async move { + let result = client + .request(proto::Call { + room_id, + called_user_id, + initial_project_id, + }) + .await; + this.update(&mut cx, |this, cx| { + this.pending_call_count -= 1; + if this.should_leave() { + this.leave(cx).detach_and_log_err(cx); + } + })?; + result?; + Ok(()) + }) + } + + pub fn join_project( + &mut self, + id: u64, + language_registry: Arc, + fs: Arc, + cx: &mut ModelContext, + ) -> Task>> { + let client = self.client.clone(); + let user_store = self.user_store.clone(); + cx.emit(Event::RemoteProjectJoined { project_id: id }); + cx.spawn(|this, mut cx| async move { + let project = + Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?; + + this.update(&mut cx, |this, cx| { + this.joined_projects.retain(|project| { + if let Some(project) = project.upgrade() { + !project.read(cx).is_read_only() + } else { + false + } + }); + this.joined_projects.insert(project.downgrade()); + })?; + Ok(project) + }) + } + + pub(crate) fn share_project( + &mut self, + project: Handle, + cx: &mut ModelContext, + ) -> Task> { + if let Some(project_id) = project.read(cx).remote_id() { + return Task::ready(Ok(project_id)); + } + + let request = self.client.request(proto::ShareProject { + room_id: self.id(), + worktrees: project.read(cx).worktree_metadata_protos(cx), + }); + cx.spawn(|this, mut cx| async move { + let response = request.await?; + + project.update(&mut cx, |project, cx| { + project.shared(response.project_id, cx) + })?; + + // If the user's location is in this project, it changes from UnsharedProject to SharedProject. + this.update(&mut cx, |this, cx| { + this.shared_projects.insert(project.downgrade()); + let active_project = this.local_participant.active_project.as_ref(); + if active_project.map_or(false, |location| *location == project) { + this.set_location(Some(&project), cx) + } else { + Task::ready(Ok(())) + } + })? + .await?; + + Ok(response.project_id) + }) + } + + pub(crate) fn unshare_project( + &mut self, + project: Handle, + cx: &mut ModelContext, + ) -> Result<()> { + let project_id = match project.read(cx).remote_id() { + Some(project_id) => project_id, + None => return Ok(()), + }; + + self.client.send(proto::UnshareProject { project_id })?; + project.update(cx, |this, cx| this.unshare(cx)) + } + + pub(crate) fn set_location( + &mut self, + project: Option<&Handle>, + cx: &mut ModelContext, + ) -> Task> { + if self.status.is_offline() { + return Task::ready(Err(anyhow!("room is offline"))); + } + + let client = self.client.clone(); + let room_id = self.id; + let location = if let Some(project) = project { + self.local_participant.active_project = Some(project.downgrade()); + if let Some(project_id) = project.read(cx).remote_id() { + proto::participant_location::Variant::SharedProject( + proto::participant_location::SharedProject { id: project_id }, + ) + } else { + proto::participant_location::Variant::UnsharedProject( + proto::participant_location::UnsharedProject {}, + ) + } + } else { + self.local_participant.active_project = None; + proto::participant_location::Variant::External(proto::participant_location::External {}) + }; + + cx.notify(); + cx.executor().spawn_on_main(|| async move { + client + .request(proto::UpdateParticipantLocation { + room_id, + location: Some(proto::ParticipantLocation { + variant: Some(location), + }), + }) + .await?; + Ok(()) + }) + } + + pub fn is_screen_sharing(&self) -> bool { + self.live_kit.as_ref().map_or(false, |live_kit| { + !matches!(live_kit.screen_track, LocalTrack::None) + }) + } + + pub fn is_sharing_mic(&self) -> bool { + self.live_kit.as_ref().map_or(false, |live_kit| { + !matches!(live_kit.microphone_track, LocalTrack::None) + }) + } + + pub fn is_muted(&self, cx: &AppContext) -> bool { + self.live_kit + .as_ref() + .and_then(|live_kit| match &live_kit.microphone_track { + LocalTrack::None => Some(Self::mute_on_join(cx)), + LocalTrack::Pending { muted, .. } => Some(*muted), + LocalTrack::Published { muted, .. } => Some(*muted), + }) + .unwrap_or(false) + } + + pub fn is_speaking(&self) -> bool { + self.live_kit + .as_ref() + .map_or(false, |live_kit| live_kit.speaking) + } + + pub fn is_deafened(&self) -> Option { + self.live_kit.as_ref().map(|live_kit| live_kit.deafened) + } + + #[track_caller] + pub fn share_microphone(&mut self, cx: &mut ModelContext) -> Task> { + if self.status.is_offline() { + return Task::ready(Err(anyhow!("room is offline"))); + } else if self.is_sharing_mic() { + return Task::ready(Err(anyhow!("microphone was already shared"))); + } + + let publish_id = if let Some(live_kit) = self.live_kit.as_mut() { + let publish_id = post_inc(&mut live_kit.next_publish_id); + live_kit.microphone_track = LocalTrack::Pending { + publish_id, + muted: false, + }; + cx.notify(); + publish_id + } else { + return Task::ready(Err(anyhow!("live-kit was not initialized"))); + }; + + cx.spawn(|this, mut cx| async move { + let publish_track = async { + let track = LocalAudioTrack::create(); + this.upgrade() + .ok_or_else(|| anyhow!("room was dropped"))? + .update(&mut cx, |this, _| { + this.live_kit + .as_ref() + .map(|live_kit| live_kit.room.publish_audio_track(&track)) + })? + .ok_or_else(|| anyhow!("live-kit was not initialized"))? + .await + }; + + let publication = publish_track.await; + this.upgrade() + .ok_or_else(|| anyhow!("room was dropped"))? + .update(&mut cx, |this, cx| { + let live_kit = this + .live_kit + .as_mut() + .ok_or_else(|| anyhow!("live-kit was not initialized"))?; + + let (canceled, muted) = if let LocalTrack::Pending { + publish_id: cur_publish_id, + muted, + } = &live_kit.microphone_track + { + (*cur_publish_id != publish_id, *muted) + } else { + (true, false) + }; + + match publication { + Ok(publication) => { + if canceled { + live_kit.room.unpublish_track(publication); + } else { + if muted { + cx.executor().spawn(publication.set_mute(muted)).detach(); + } + live_kit.microphone_track = LocalTrack::Published { + track_publication: publication, + muted, + }; + cx.notify(); + } + Ok(()) + } + Err(error) => { + if canceled { + Ok(()) + } else { + live_kit.microphone_track = LocalTrack::None; + cx.notify(); + Err(error) + } + } + } + })? + }) + } + + pub fn share_screen(&mut self, cx: &mut ModelContext) -> Task> { + if self.status.is_offline() { + return Task::ready(Err(anyhow!("room is offline"))); + } else if self.is_screen_sharing() { + return Task::ready(Err(anyhow!("screen was already shared"))); + } + + let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() { + let publish_id = post_inc(&mut live_kit.next_publish_id); + live_kit.screen_track = LocalTrack::Pending { + publish_id, + muted: false, + }; + cx.notify(); + (live_kit.room.display_sources(), publish_id) + } else { + return Task::ready(Err(anyhow!("live-kit was not initialized"))); + }; + + cx.spawn(|this, mut cx| async move { + let publish_track = async { + let displays = displays.await?; + let display = displays + .first() + .ok_or_else(|| anyhow!("no display found"))?; + let track = LocalVideoTrack::screen_share_for_display(&display); + this.upgrade() + .ok_or_else(|| anyhow!("room was dropped"))? + .update(&mut cx, |this, _| { + this.live_kit + .as_ref() + .map(|live_kit| live_kit.room.publish_video_track(&track)) + })? + .ok_or_else(|| anyhow!("live-kit was not initialized"))? + .await + }; + + let publication = publish_track.await; + this.upgrade() + .ok_or_else(|| anyhow!("room was dropped"))? + .update(&mut cx, |this, cx| { + let live_kit = this + .live_kit + .as_mut() + .ok_or_else(|| anyhow!("live-kit was not initialized"))?; + + let (canceled, muted) = if let LocalTrack::Pending { + publish_id: cur_publish_id, + muted, + } = &live_kit.screen_track + { + (*cur_publish_id != publish_id, *muted) + } else { + (true, false) + }; + + match publication { + Ok(publication) => { + if canceled { + live_kit.room.unpublish_track(publication); + } else { + if muted { + cx.executor().spawn(publication.set_mute(muted)).detach(); + } + live_kit.screen_track = LocalTrack::Published { + track_publication: publication, + muted, + }; + cx.notify(); + } + + Audio::play_sound(Sound::StartScreenshare, cx); + + Ok(()) + } + Err(error) => { + if canceled { + Ok(()) + } else { + live_kit.screen_track = LocalTrack::None; + cx.notify(); + Err(error) + } + } + } + })? + }) + } + + pub fn toggle_mute(&mut self, cx: &mut ModelContext) -> Result>> { + let should_mute = !self.is_muted(cx); + if let Some(live_kit) = self.live_kit.as_mut() { + if matches!(live_kit.microphone_track, LocalTrack::None) { + return Ok(self.share_microphone(cx)); + } + + let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?; + live_kit.muted_by_user = should_mute; + + if old_muted == true && live_kit.deafened == true { + if let Some(task) = self.toggle_deafen(cx).ok() { + task.detach(); + } + } + + Ok(ret_task) + } else { + Err(anyhow!("LiveKit not started")) + } + } + + pub fn toggle_deafen(&mut self, cx: &mut ModelContext) -> Result>> { + if let Some(live_kit) = self.live_kit.as_mut() { + (*live_kit).deafened = !live_kit.deafened; + + let mut tasks = Vec::with_capacity(self.remote_participants.len()); + // Context notification is sent within set_mute itself. + let mut mute_task = None; + // When deafening, mute user's mic as well. + // When undeafening, unmute user's mic unless it was manually muted prior to deafening. + if live_kit.deafened || !live_kit.muted_by_user { + mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0); + }; + for participant in self.remote_participants.values() { + for track in live_kit + .room + .remote_audio_track_publications(&participant.user.id.to_string()) + { + tasks.push( + cx.executor() + .spawn_on_main(|| track.set_enabled(!live_kit.deafened)), + ); + } + } + + Ok(cx.executor().spawn_on_main(|| async { + if let Some(mute_task) = mute_task { + mute_task.await?; + } + for task in tasks { + task.await?; + } + Ok(()) + })) + } else { + Err(anyhow!("LiveKit not started")) + } + } + + pub fn unshare_screen(&mut self, cx: &mut ModelContext) -> Result<()> { + if self.status.is_offline() { + return Err(anyhow!("room is offline")); + } + + let live_kit = self + .live_kit + .as_mut() + .ok_or_else(|| anyhow!("live-kit was not initialized"))?; + match mem::take(&mut live_kit.screen_track) { + LocalTrack::None => Err(anyhow!("screen was not shared")), + LocalTrack::Pending { .. } => { + cx.notify(); + Ok(()) + } + LocalTrack::Published { + track_publication, .. + } => { + live_kit.room.unpublish_track(track_publication); + cx.notify(); + + Audio::play_sound(Sound::StopScreenshare, cx); + Ok(()) + } + } + } + + #[cfg(any(test, feature = "test-support"))] + pub fn set_display_sources(&self, sources: Vec) { + self.live_kit + .as_ref() + .unwrap() + .room + .set_display_sources(sources); + } +} + +struct LiveKitRoom { + room: Arc, + screen_track: LocalTrack, + microphone_track: LocalTrack, + /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user. + muted_by_user: bool, + deafened: bool, + speaking: bool, + next_publish_id: usize, + _maintain_room: Task<()>, + _maintain_tracks: [Task<()>; 2], +} + +impl LiveKitRoom { + fn set_mute( + self: &mut LiveKitRoom, + should_mute: bool, + cx: &mut ModelContext, + ) -> Result<(Task>, bool)> { + if !should_mute { + // clear user muting state. + self.muted_by_user = false; + } + + let (result, old_muted) = match &mut self.microphone_track { + LocalTrack::None => Err(anyhow!("microphone was not shared")), + LocalTrack::Pending { muted, .. } => { + let old_muted = *muted; + *muted = should_mute; + cx.notify(); + Ok((Task::Ready(Some(Ok(()))), old_muted)) + } + LocalTrack::Published { + track_publication, + muted, + } => { + let old_muted = *muted; + *muted = should_mute; + cx.notify(); + Ok(( + cx.executor().spawn(track_publication.set_mute(*muted)), + old_muted, + )) + } + }?; + + if old_muted != should_mute { + if should_mute { + Audio::play_sound(Sound::Mute, cx); + } else { + Audio::play_sound(Sound::Unmute, cx); + } + } + + Ok((result, old_muted)) + } +} + +enum LocalTrack { + None, + Pending { + publish_id: usize, + muted: bool, + }, + Published { + track_publication: LocalTrackPublication, + muted: bool, + }, +} + +impl Default for LocalTrack { + fn default() -> Self { + Self::None + } +} + +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum RoomStatus { + Online, + Rejoining, + Offline, +} + +impl RoomStatus { + pub fn is_offline(&self) -> bool { + matches!(self, RoomStatus::Offline) + } + + pub fn is_online(&self) -> bool { + matches!(self, RoomStatus::Online) + } +} diff --git a/crates/gpui2/src/app/entity_map.rs b/crates/gpui2/src/app/entity_map.rs index 8f254bd176..cf39a2b81f 100644 --- a/crates/gpui2/src/app/entity_map.rs +++ b/crates/gpui2/src/app/entity_map.rs @@ -323,6 +323,12 @@ impl PartialEq for Handle { impl Eq for Handle {} +impl PartialEq> for Handle { + fn eq(&self, other: &WeakHandle) -> bool { + self.entity_id() == other.entity_id() + } +} + #[derive(Clone)] pub struct AnyWeakHandle { pub(crate) entity_id: EntityId, @@ -444,3 +450,9 @@ impl PartialEq for WeakHandle { } impl Eq for WeakHandle {} + +impl PartialEq> for WeakHandle { + fn eq(&self, other: &Handle) -> bool { + self.entity_id() == other.entity_id() + } +} diff --git a/crates/zed2/src/main.rs b/crates/zed2/src/main.rs index fff3b5eae1..d7c01ed1dd 100644 --- a/crates/zed2/src/main.rs +++ b/crates/zed2/src/main.rs @@ -111,26 +111,26 @@ fn main() { handle_settings_file_changes(user_settings_file_rx, cx); // handle_keymap_file_changes(user_keymap_file_rx, cx); - // let client = client2::Client::new(http.clone(), cx); - let languages = LanguageRegistry::new(login_shell_env_loaded); + let client = client2::Client::new(http.clone(), cx); + let mut languages = LanguageRegistry::new(login_shell_env_loaded); let copilot_language_server_id = languages.next_language_server_id(); - // languages.set_executor(cx.background().clone()); - // languages.set_language_server_download_dir(paths::LANGUAGES_DIR.clone()); - // let languages = Arc::new(languages); + languages.set_executor(cx.executor().clone()); + languages.set_language_server_download_dir(paths::LANGUAGES_DIR.clone()); + let languages = Arc::new(languages); let node_runtime = RealNodeRuntime::new(http.clone()); - // languages::init(languages.clone(), node_runtime.clone(), cx); + languages2::init(languages.clone(), node_runtime.clone(), cx); // let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http.clone(), cx)); // let workspace_store = cx.add_model(|cx| WorkspaceStore::new(client.clone(), cx)); - // cx.set_global(client.clone()); + cx.set_global(client.clone()); theme2::init(cx); // context_menu::init(cx); - // project::Project::init(&client, cx); - // client::init(&client, cx); + project2::Project::init(&client, cx); + client2::init(&client, cx); // command_palette::init(cx); - // language::init(cx); + language2::init(cx); // editor::init(cx); // go_to_line::init(cx); // file_finder::init(cx); @@ -193,7 +193,7 @@ fn main() { // theme_selector::init(cx); // activity_indicator::init(cx); // language_tools::init(cx); - // call::init(app_state.client.clone(), app_state.user_store.clone(), cx); + call2::init(app_state.client.clone(), app_state.user_store.clone(), cx); // collab_ui::init(&app_state, cx); // feedback::init(cx); // welcome::init(cx);