From 0dca67fc33f32a0795aa468393226e68d5a904a9 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 10 Jan 2024 13:04:26 -0800 Subject: [PATCH 1/3] Add --top flag to zed-local script, for making windows take up half the screen --- script/zed-local | 36 ++++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/script/zed-local b/script/zed-local index 8ba1561bbd..4519ede38c 100755 --- a/script/zed-local +++ b/script/zed-local @@ -4,20 +4,28 @@ const { spawn, execFileSync } = require("child_process"); const RESOLUTION_REGEX = /(\d+) x (\d+)/; const DIGIT_FLAG_REGEX = /^--?(\d+)$/; -const RELEASE_MODE = "--release"; - -const args = process.argv.slice(2); // Parse the number of Zed instances to spawn. let instanceCount = 1; -const digitMatch = args[0]?.match(DIGIT_FLAG_REGEX); -if (digitMatch) { - instanceCount = parseInt(digitMatch[1]); - args.shift(); -} -const isReleaseMode = args.some((arg) => arg === RELEASE_MODE); -if (instanceCount > 4) { - throw new Error("Cannot spawn more than 4 instances"); +let isReleaseMode = false; +let isTop = false; + +const args = process.argv.slice(2); +for (const arg of args) { + const digitMatch = arg.match(DIGIT_FLAG_REGEX); + if (digitMatch) { + instanceCount = parseInt(digitMatch[1]); + continue; + } + + if (arg == "--release") { + isReleaseMode = true; + continue; + } + + if (arg == "--top") { + isTop = true; + } } // Parse the resolution of the main screen @@ -34,7 +42,11 @@ if (!mainDisplayResolution) { throw new Error("Could not parse screen resolution"); } const screenWidth = parseInt(mainDisplayResolution[1]); -const screenHeight = parseInt(mainDisplayResolution[2]); +let screenHeight = parseInt(mainDisplayResolution[2]); + +if (isTop) { + screenHeight = Math.floor(screenHeight / 2); +} // Determine the window size for each instance let instanceWidth = screenWidth; From 2d1eb0c56c3038fcfa7fd1117312b889c9090184 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 10 Jan 2024 13:11:59 -0800 Subject: [PATCH 2/3] Expose a single `updates` stream from live_kit_client::Room Co-authored-by: Julia --- crates/call/src/room.rs | 80 ++++++------------- crates/live_kit_client/examples/test_app.rs | 35 ++++---- crates/live_kit_client/src/live_kit_client.rs | 20 +++++ crates/live_kit_client/src/prod.rs | 72 +++++------------ crates/live_kit_client/src/test.rs | 61 ++++---------- 5 files changed, 95 insertions(+), 173 deletions(-) diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 3d1f1e70c7..877afceff3 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -15,10 +15,7 @@ use gpui::{ AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel, }; use language::LanguageRegistry; -use live_kit_client::{ - LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate, - RemoteVideoTrackUpdate, -}; +use live_kit_client::{LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RoomUpdate}; use postage::{sink::Sink, stream::Stream, watch}; use project::Project; use settings::Settings as _; @@ -131,11 +128,11 @@ impl Room { } }); - let _maintain_video_tracks = cx.spawn({ + let _handle_updates = cx.spawn({ let room = room.clone(); move |this, mut cx| async move { - let mut track_video_changes = room.remote_video_track_updates(); - while let Some(track_change) = track_video_changes.next().await { + let mut updates = room.updates(); + while let Some(update) = updates.next().await { let this = if let Some(this) = this.upgrade() { this } else { @@ -143,26 +140,7 @@ impl Room { }; this.update(&mut cx, |this, cx| { - this.remote_video_track_updated(track_change, cx).log_err() - }) - .ok(); - } - } - }); - - let _maintain_audio_tracks = cx.spawn({ - let room = room.clone(); - |this, mut cx| async move { - let mut track_audio_changes = room.remote_audio_track_updates(); - while let Some(track_change) = track_audio_changes.next().await { - let this = if let Some(this) = this.upgrade() { - this - } else { - break; - }; - - this.update(&mut cx, |this, cx| { - this.remote_audio_track_updated(track_change, cx).log_err() + this.live_kit_room_updated(update, cx).log_err() }) .ok(); } @@ -195,7 +173,7 @@ impl Room { deafened: false, speaking: false, _maintain_room, - _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks], + _handle_updates, }) } else { None @@ -877,8 +855,8 @@ impl Room { .remote_audio_track_publications(&user.id.to_string()); for track in video_tracks { - this.remote_video_track_updated( - RemoteVideoTrackUpdate::Subscribed(track), + this.live_kit_room_updated( + RoomUpdate::SubscribedToRemoteVideoTrack(track), cx, ) .log_err(); @@ -887,8 +865,8 @@ impl Room { for (track, publication) in audio_tracks.iter().zip(publications.iter()) { - this.remote_audio_track_updated( - RemoteAudioTrackUpdate::Subscribed( + this.live_kit_room_updated( + RoomUpdate::SubscribedToRemoteAudioTrack( track.clone(), publication.clone(), ), @@ -979,13 +957,13 @@ impl Room { } } - fn remote_video_track_updated( + fn live_kit_room_updated( &mut self, - change: RemoteVideoTrackUpdate, + update: RoomUpdate, cx: &mut ModelContext, ) -> Result<()> { - match change { - RemoteVideoTrackUpdate::Subscribed(track) => { + match update { + RoomUpdate::SubscribedToRemoteVideoTrack(track) => { let user_id = track.publisher_id().parse()?; let track_id = track.sid().to_string(); let participant = self @@ -997,7 +975,8 @@ impl Room { participant_id: participant.peer_id, }); } - RemoteVideoTrackUpdate::Unsubscribed { + + RoomUpdate::UnsubscribedFromRemoteVideoTrack { publisher_id, track_id, } => { @@ -1011,19 +990,8 @@ impl Room { participant_id: participant.peer_id, }); } - } - cx.notify(); - Ok(()) - } - - fn remote_audio_track_updated( - &mut self, - change: RemoteAudioTrackUpdate, - cx: &mut ModelContext, - ) -> Result<()> { - match change { - RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => { + RoomUpdate::ActiveSpeakersChanged { speakers } => { let mut speaker_ids = speakers .into_iter() .filter_map(|speaker_sid| speaker_sid.parse().ok()) @@ -1045,9 +1013,9 @@ impl Room { } } } - cx.notify(); } - RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => { + + RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } => { let mut found = false; for participant in &mut self.remote_participants.values_mut() { for track in participant.audio_tracks.values() { @@ -1061,10 +1029,9 @@ impl Room { break; } } - - cx.notify(); } - RemoteAudioTrackUpdate::Subscribed(track, publication) => { + + RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => { let user_id = track.publisher_id().parse()?; let track_id = track.sid().to_string(); let participant = self @@ -1078,7 +1045,8 @@ impl Room { participant_id: participant.peer_id, }); } - RemoteAudioTrackUpdate::Unsubscribed { + + RoomUpdate::UnsubscribedFromRemoteAudioTrack { publisher_id, track_id, } => { @@ -1597,7 +1565,7 @@ struct LiveKitRoom { speaking: bool, next_publish_id: usize, _maintain_room: Task<()>, - _maintain_tracks: [Task<()>; 2], + _handle_updates: Task<()>, } impl LiveKitRoom { diff --git a/crates/live_kit_client/examples/test_app.rs b/crates/live_kit_client/examples/test_app.rs index 68a8a84209..9fc8aafd30 100644 --- a/crates/live_kit_client/examples/test_app.rs +++ b/crates/live_kit_client/examples/test_app.rs @@ -2,9 +2,7 @@ use std::{sync::Arc, time::Duration}; use futures::StreamExt; use gpui::{actions, KeyBinding, Menu, MenuItem}; -use live_kit_client::{ - LocalAudioTrack, LocalVideoTrack, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate, Room, -}; +use live_kit_client::{LocalAudioTrack, LocalVideoTrack, Room, RoomUpdate}; use live_kit_server::token::{self, VideoGrant}; use log::LevelFilter; use simplelog::SimpleLogger; @@ -60,12 +58,12 @@ fn main() { let room_b = Room::new(); room_b.connect(&live_kit_url, &user2_token).await.unwrap(); - let mut audio_track_updates = room_b.remote_audio_track_updates(); + let mut room_updates = room_b.updates(); let audio_track = LocalAudioTrack::create(); let audio_track_publication = room_a.publish_audio_track(audio_track).await.unwrap(); - if let RemoteAudioTrackUpdate::Subscribed(track, _) = - audio_track_updates.next().await.unwrap() + if let RoomUpdate::SubscribedToRemoteAudioTrack(track, _) = + room_updates.next().await.unwrap() { let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); assert_eq!(remote_tracks.len(), 1); @@ -78,8 +76,8 @@ fn main() { audio_track_publication.set_mute(true).await.unwrap(); println!("waiting for mute changed!"); - if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } = - audio_track_updates.next().await.unwrap() + if let RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } = + room_updates.next().await.unwrap() { let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); assert_eq!(remote_tracks[0].sid(), track_id); @@ -90,8 +88,8 @@ fn main() { audio_track_publication.set_mute(false).await.unwrap(); - if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } = - audio_track_updates.next().await.unwrap() + if let RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } = + room_updates.next().await.unwrap() { let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); assert_eq!(remote_tracks[0].sid(), track_id); @@ -110,13 +108,13 @@ fn main() { room_a.unpublish_track(audio_track_publication); // Clear out any active speakers changed messages - let mut next = audio_track_updates.next().await.unwrap(); - while let RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } = next { + let mut next = room_updates.next().await.unwrap(); + while let RoomUpdate::ActiveSpeakersChanged { speakers } = next { println!("Speakers changed: {:?}", speakers); - next = audio_track_updates.next().await.unwrap(); + next = room_updates.next().await.unwrap(); } - if let RemoteAudioTrackUpdate::Unsubscribed { + if let RoomUpdate::UnsubscribedFromRemoteAudioTrack { publisher_id, track_id, } = next @@ -128,7 +126,6 @@ fn main() { panic!("unexpected message"); } - let mut video_track_updates = room_b.remote_video_track_updates(); let displays = room_a.display_sources().await.unwrap(); let display = displays.into_iter().next().unwrap(); @@ -136,8 +133,8 @@ fn main() { let local_video_track_publication = room_a.publish_video_track(local_video_track).await.unwrap(); - if let RemoteVideoTrackUpdate::Subscribed(track) = - video_track_updates.next().await.unwrap() + if let RoomUpdate::SubscribedToRemoteVideoTrack(track) = + room_updates.next().await.unwrap() { let remote_video_tracks = room_b.remote_video_tracks("test-participant-1"); assert_eq!(remote_video_tracks.len(), 1); @@ -152,10 +149,10 @@ fn main() { .pop() .unwrap(); room_a.unpublish_track(local_video_track_publication); - if let RemoteVideoTrackUpdate::Unsubscribed { + if let RoomUpdate::UnsubscribedFromRemoteVideoTrack { publisher_id, track_id, - } = video_track_updates.next().await.unwrap() + } = room_updates.next().await.unwrap() { assert_eq!(publisher_id, "test-participant-1"); assert_eq!(remote_video_track.sid(), track_id); diff --git a/crates/live_kit_client/src/live_kit_client.rs b/crates/live_kit_client/src/live_kit_client.rs index 47cc3873ff..7052b107bc 100644 --- a/crates/live_kit_client/src/live_kit_client.rs +++ b/crates/live_kit_client/src/live_kit_client.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + #[cfg(not(any(test, feature = "test-support")))] pub mod prod; @@ -9,3 +11,21 @@ pub mod test; #[cfg(any(test, feature = "test-support"))] pub use test::*; + +pub type Sid = String; + +#[derive(Clone, Eq, PartialEq)] +pub enum ConnectionState { + Disconnected, + Connected { url: String, token: String }, +} + +#[derive(Clone)] +pub enum RoomUpdate { + ActiveSpeakersChanged { speakers: Vec }, + RemoteAudioTrackMuteChanged { track_id: Sid, muted: bool }, + SubscribedToRemoteVideoTrack(Arc), + SubscribedToRemoteAudioTrack(Arc, Arc), + UnsubscribedFromRemoteVideoTrack { publisher_id: Sid, track_id: Sid }, + UnsubscribedFromRemoteAudioTrack { publisher_id: Sid, track_id: Sid }, +} diff --git a/crates/live_kit_client/src/prod.rs b/crates/live_kit_client/src/prod.rs index 5d8ef9bf13..b9f5aa6aa8 100644 --- a/crates/live_kit_client/src/prod.rs +++ b/crates/live_kit_client/src/prod.rs @@ -1,3 +1,4 @@ +use crate::{ConnectionState, RoomUpdate, Sid}; use anyhow::{anyhow, Context, Result}; use core_foundation::{ array::{CFArray, CFArrayRef}, @@ -155,22 +156,13 @@ extern "C" { fn LKRemoteTrackPublicationGetSid(publication: swift::RemoteTrackPublication) -> CFStringRef; } -pub type Sid = String; - -#[derive(Clone, Eq, PartialEq)] -pub enum ConnectionState { - Disconnected, - Connected { url: String, token: String }, -} - pub struct Room { native_room: swift::Room, connection: Mutex<( watch::Sender, watch::Receiver, )>, - remote_audio_track_subscribers: Mutex>>, - remote_video_track_subscribers: Mutex>>, + update_subscribers: Mutex>>, _delegate: RoomDelegate, } @@ -181,8 +173,7 @@ impl Room { Self { native_room: unsafe { LKRoomCreate(delegate.native_delegate) }, connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)), - remote_audio_track_subscribers: Default::default(), - remote_video_track_subscribers: Default::default(), + update_subscribers: Default::default(), _delegate: delegate, } }) @@ -397,15 +388,9 @@ impl Room { } } - pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver { + pub fn updates(&self) -> mpsc::UnboundedReceiver { let (tx, rx) = mpsc::unbounded(); - self.remote_audio_track_subscribers.lock().push(tx); - rx - } - - pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver { - let (tx, rx) = mpsc::unbounded(); - self.remote_video_track_subscribers.lock().push(tx); + self.update_subscribers.lock().push(tx); rx } @@ -416,8 +401,8 @@ impl Room { ) { let track = Arc::new(track); let publication = Arc::new(publication); - self.remote_audio_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed( + self.update_subscribers.lock().retain(|tx| { + tx.unbounded_send(RoomUpdate::SubscribedToRemoteAudioTrack( track.clone(), publication.clone(), )) @@ -426,8 +411,8 @@ impl Room { } fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) { - self.remote_audio_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed { + self.update_subscribers.lock().retain(|tx| { + tx.unbounded_send(RoomUpdate::UnsubscribedFromRemoteAudioTrack { publisher_id: publisher_id.clone(), track_id: track_id.clone(), }) @@ -436,8 +421,8 @@ impl Room { } fn mute_changed_from_remote_audio_track(&self, track_id: String, muted: bool) { - self.remote_audio_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteAudioTrackUpdate::MuteChanged { + self.update_subscribers.lock().retain(|tx| { + tx.unbounded_send(RoomUpdate::RemoteAudioTrackMuteChanged { track_id: track_id.clone(), muted, }) @@ -445,29 +430,26 @@ impl Room { }); } - // A vec of publisher IDs fn active_speakers_changed(&self, speakers: Vec) { - self.remote_audio_track_subscribers - .lock() - .retain(move |tx| { - tx.unbounded_send(RemoteAudioTrackUpdate::ActiveSpeakersChanged { - speakers: speakers.clone(), - }) - .is_ok() - }); + self.update_subscribers.lock().retain(move |tx| { + tx.unbounded_send(RoomUpdate::ActiveSpeakersChanged { + speakers: speakers.clone(), + }) + .is_ok() + }); } fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) { let track = Arc::new(track); - self.remote_video_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone())) + self.update_subscribers.lock().retain(|tx| { + tx.unbounded_send(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone())) .is_ok() }); } fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) { - self.remote_video_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed { + self.update_subscribers.lock().retain(|tx| { + tx.unbounded_send(RoomUpdate::UnsubscribedFromRemoteVideoTrack { publisher_id: publisher_id.clone(), track_id: track_id.clone(), }) @@ -889,18 +871,6 @@ impl Drop for RemoteVideoTrack { } } -pub enum RemoteVideoTrackUpdate { - Subscribed(Arc), - Unsubscribed { publisher_id: Sid, track_id: Sid }, -} - -pub enum RemoteAudioTrackUpdate { - ActiveSpeakersChanged { speakers: Vec }, - MuteChanged { track_id: Sid, muted: bool }, - Subscribed(Arc, Arc), - Unsubscribed { publisher_id: Sid, track_id: Sid }, -} - pub struct MacOSDisplay(swift::MacOSDisplay); impl MacOSDisplay { diff --git a/crates/live_kit_client/src/test.rs b/crates/live_kit_client/src/test.rs index 4575fdd2c1..9c1a5ec59a 100644 --- a/crates/live_kit_client/src/test.rs +++ b/crates/live_kit_client/src/test.rs @@ -1,3 +1,4 @@ +use crate::{ConnectionState, RoomUpdate, Sid}; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use collections::{BTreeMap, HashMap}; @@ -104,9 +105,8 @@ impl TestServer { client_room .0 .lock() - .video_track_updates - .0 - .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone())) + .updates_tx + .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone())) .unwrap(); } room.client_rooms.insert(identity, client_room); @@ -211,9 +211,8 @@ impl TestServer { let _ = client_room .0 .lock() - .video_track_updates - .0 - .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone())) + .updates_tx + .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone())) .unwrap(); } } @@ -261,9 +260,8 @@ impl TestServer { let _ = client_room .0 .lock() - .audio_track_updates - .0 - .try_broadcast(RemoteAudioTrackUpdate::Subscribed( + .updates_tx + .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack( track.clone(), publication.clone(), )) @@ -369,39 +367,26 @@ impl live_kit_server::api::Client for TestApiClient { } } -pub type Sid = String; - struct RoomState { connection: ( watch::Sender, watch::Receiver, ), display_sources: Vec, - audio_track_updates: ( - async_broadcast::Sender, - async_broadcast::Receiver, - ), - video_track_updates: ( - async_broadcast::Sender, - async_broadcast::Receiver, - ), -} - -#[derive(Clone, Eq, PartialEq)] -pub enum ConnectionState { - Disconnected, - Connected { url: String, token: String }, + updates_tx: async_broadcast::Sender, + updates_rx: async_broadcast::Receiver, } pub struct Room(Mutex); impl Room { pub fn new() -> Arc { + let (updates_tx, updates_rx) = async_broadcast::broadcast(128); Arc::new(Self(Mutex::new(RoomState { connection: watch::channel_with(ConnectionState::Disconnected), display_sources: Default::default(), - video_track_updates: async_broadcast::broadcast(128), - audio_track_updates: async_broadcast::broadcast(128), + updates_tx, + updates_rx, }))) } @@ -505,12 +490,8 @@ impl Room { .collect() } - pub fn remote_audio_track_updates(&self) -> impl Stream { - self.0.lock().audio_track_updates.1.clone() - } - - pub fn remote_video_track_updates(&self) -> impl Stream { - self.0.lock().video_track_updates.1.clone() + pub fn updates(&self) -> impl Stream { + self.0.lock().updates_rx.clone() } pub fn set_display_sources(&self, sources: Vec) { @@ -646,20 +627,6 @@ impl RemoteAudioTrack { } } -#[derive(Clone)] -pub enum RemoteVideoTrackUpdate { - Subscribed(Arc), - Unsubscribed { publisher_id: Sid, track_id: Sid }, -} - -#[derive(Clone)] -pub enum RemoteAudioTrackUpdate { - ActiveSpeakersChanged { speakers: Vec }, - MuteChanged { track_id: Sid, muted: bool }, - Subscribed(Arc, Arc), - Unsubscribed { publisher_id: Sid, track_id: Sid }, -} - #[derive(Clone)] pub struct MacOSDisplay { frames: ( From 75fdaeb56f7e71fffa79b8652a0f80cf0440ffe3 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 10 Jan 2024 16:08:39 -0800 Subject: [PATCH 3/3] Detect when a track is unpublished due to reconnecting to livekit Co-authored-by: Julia --- crates/call/src/room.rs | 22 ++++++ .../Sources/LiveKitBridge/LiveKitBridge.swift | 50 ++++++++++++- crates/live_kit_client/src/live_kit_client.rs | 4 ++ crates/live_kit_client/src/prod.rs | 71 +++++++++++++++++++ crates/live_kit_client/src/test.rs | 65 +++++++++++++---- 5 files changed, 195 insertions(+), 17 deletions(-) diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 877afceff3..04e883e686 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -1060,6 +1060,28 @@ impl Room { participant_id: participant.peer_id, }); } + + RoomUpdate::LocalAudioTrackUnpublished { publication } => { + log::info!("unpublished audio track {}", publication.sid()); + if let Some(room) = &mut self.live_kit { + room.microphone_track = LocalTrack::None; + } + } + + RoomUpdate::LocalVideoTrackUnpublished { publication } => { + log::info!("unpublished video track {}", publication.sid()); + if let Some(room) = &mut self.live_kit { + room.screen_track = LocalTrack::None; + } + } + + RoomUpdate::LocalAudioTrackPublished { publication } => { + log::info!("published audio track {}", publication.sid()); + } + + RoomUpdate::LocalVideoTrackPublished { publication } => { + log::info!("published video track {}", publication.sid()); + } } cx.notify(); diff --git a/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift b/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift index 5f22acf581..db5da8e0e9 100644 --- a/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift +++ b/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift @@ -12,6 +12,8 @@ class LKRoomDelegate: RoomDelegate { var onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void var onDidSubscribeToRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void var onDidUnsubscribeFromRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void + var onDidPublishOrUnpublishLocalAudioTrack: @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void + var onDidPublishOrUnpublishLocalVideoTrack: @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void init( data: UnsafeRawPointer, @@ -21,7 +23,10 @@ class LKRoomDelegate: RoomDelegate { onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void, onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void, onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void, - onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void) + onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void, + onDidPublishOrUnpublishLocalAudioTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void, + onDidPublishOrUnpublishLocalVideoTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void + ) { self.data = data self.onDidDisconnect = onDidDisconnect @@ -31,6 +36,8 @@ class LKRoomDelegate: RoomDelegate { self.onDidUnsubscribeFromRemoteVideoTrack = onDidUnsubscribeFromRemoteVideoTrack self.onMuteChangedFromRemoteAudioTrack = onMuteChangedFromRemoteAudioTrack self.onActiveSpeakersChanged = onActiveSpeakersChanged + self.onDidPublishOrUnpublishLocalAudioTrack = onDidPublishOrUnpublishLocalAudioTrack + self.onDidPublishOrUnpublishLocalVideoTrack = onDidPublishOrUnpublishLocalVideoTrack } func room(_ room: Room, didUpdate connectionState: ConnectionState, oldValue: ConnectionState) { @@ -65,6 +72,22 @@ class LKRoomDelegate: RoomDelegate { self.onDidUnsubscribeFromRemoteAudioTrack(self.data, participant.identity as CFString, track.sid! as CFString) } } + + func room(_ room: Room, localParticipant: LocalParticipant, didPublish publication: LocalTrackPublication) { + if publication.kind == .video { + self.onDidPublishOrUnpublishLocalVideoTrack(self.data, Unmanaged.passUnretained(publication).toOpaque(), true) + } else if publication.kind == .audio { + self.onDidPublishOrUnpublishLocalAudioTrack(self.data, Unmanaged.passUnretained(publication).toOpaque(), true) + } + } + + func room(_ room: Room, localParticipant: LocalParticipant, didUnpublish publication: LocalTrackPublication) { + if publication.kind == .video { + self.onDidPublishOrUnpublishLocalVideoTrack(self.data, Unmanaged.passUnretained(publication).toOpaque(), false) + } else if publication.kind == .audio { + self.onDidPublishOrUnpublishLocalAudioTrack(self.data, Unmanaged.passUnretained(publication).toOpaque(), false) + } + } } class LKVideoRenderer: NSObject, VideoRenderer { @@ -109,7 +132,9 @@ public func LKRoomDelegateCreate( onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void, onActiveSpeakerChanged: @escaping @convention(c) (UnsafeRawPointer, CFArray) -> Void, onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void, - onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void + onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void, + onDidPublishOrUnpublishLocalAudioTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void, + onDidPublishOrUnpublishLocalVideoTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void ) -> UnsafeMutableRawPointer { let delegate = LKRoomDelegate( data: data, @@ -119,7 +144,9 @@ public func LKRoomDelegateCreate( onMuteChangedFromRemoteAudioTrack: onMuteChangedFromRemoteAudioTrack, onActiveSpeakersChanged: onActiveSpeakerChanged, onDidSubscribeToRemoteVideoTrack: onDidSubscribeToRemoteVideoTrack, - onDidUnsubscribeFromRemoteVideoTrack: onDidUnsubscribeFromRemoteVideoTrack + onDidUnsubscribeFromRemoteVideoTrack: onDidUnsubscribeFromRemoteVideoTrack, + onDidPublishOrUnpublishLocalAudioTrack: onDidPublishOrUnpublishLocalAudioTrack, + onDidPublishOrUnpublishLocalVideoTrack: onDidPublishOrUnpublishLocalVideoTrack ) return Unmanaged.passRetained(delegate).toOpaque() } @@ -292,6 +319,14 @@ public func LKLocalTrackPublicationSetMute( } } +@_cdecl("LKLocalTrackPublicationIsMuted") +public func LKLocalTrackPublicationIsMuted( + publication: UnsafeRawPointer +) -> Bool { + let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() + return publication.muted +} + @_cdecl("LKRemoteTrackPublicationSetEnabled") public func LKRemoteTrackPublicationSetEnabled( publication: UnsafeRawPointer, @@ -325,3 +360,12 @@ public func LKRemoteTrackPublicationGetSid( return publication.sid as CFString } + +@_cdecl("LKLocalTrackPublicationGetSid") +public func LKLocalTrackPublicationGetSid( + publication: UnsafeRawPointer +) -> CFString { + let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() + + return publication.sid as CFString +} diff --git a/crates/live_kit_client/src/live_kit_client.rs b/crates/live_kit_client/src/live_kit_client.rs index 7052b107bc..abec27462e 100644 --- a/crates/live_kit_client/src/live_kit_client.rs +++ b/crates/live_kit_client/src/live_kit_client.rs @@ -28,4 +28,8 @@ pub enum RoomUpdate { SubscribedToRemoteAudioTrack(Arc, Arc), UnsubscribedFromRemoteVideoTrack { publisher_id: Sid, track_id: Sid }, UnsubscribedFromRemoteAudioTrack { publisher_id: Sid, track_id: Sid }, + LocalAudioTrackPublished { publication: LocalTrackPublication }, + LocalAudioTrackUnpublished { publication: LocalTrackPublication }, + LocalVideoTrackPublished { publication: LocalTrackPublication }, + LocalVideoTrackUnpublished { publication: LocalTrackPublication }, } diff --git a/crates/live_kit_client/src/prod.rs b/crates/live_kit_client/src/prod.rs index b9f5aa6aa8..0827c0cbb4 100644 --- a/crates/live_kit_client/src/prod.rs +++ b/crates/live_kit_client/src/prod.rs @@ -77,6 +77,16 @@ extern "C" { publisher_id: CFStringRef, track_id: CFStringRef, ), + on_did_publish_or_unpublish_local_audio_track: extern "C" fn( + callback_data: *mut c_void, + publication: swift::LocalTrackPublication, + is_published: bool, + ), + on_did_publish_or_unpublish_local_video_track: extern "C" fn( + callback_data: *mut c_void, + publication: swift::LocalTrackPublication, + is_published: bool, + ), ) -> swift::RoomDelegate; fn LKRoomCreate(delegate: swift::RoomDelegate) -> swift::Room; @@ -152,7 +162,9 @@ extern "C" { callback_data: *mut c_void, ); + fn LKLocalTrackPublicationIsMuted(publication: swift::LocalTrackPublication) -> bool; fn LKRemoteTrackPublicationIsMuted(publication: swift::RemoteTrackPublication) -> bool; + fn LKLocalTrackPublicationGetSid(publication: swift::LocalTrackPublication) -> CFStringRef; fn LKRemoteTrackPublicationGetSid(publication: swift::RemoteTrackPublication) -> CFStringRef; } @@ -511,6 +523,8 @@ impl RoomDelegate { Self::on_active_speakers_changed, Self::on_did_subscribe_to_remote_video_track, Self::on_did_unsubscribe_from_remote_video_track, + Self::on_did_publish_or_unpublish_local_audio_track, + Self::on_did_publish_or_unpublish_local_video_track, ) }; Self { @@ -624,6 +638,46 @@ impl RoomDelegate { } let _ = Weak::into_raw(room); } + + extern "C" fn on_did_publish_or_unpublish_local_audio_track( + room: *mut c_void, + publication: swift::LocalTrackPublication, + is_published: bool, + ) { + let room = unsafe { Weak::from_raw(room as *mut Room) }; + if let Some(room) = room.upgrade() { + let publication = LocalTrackPublication::new(publication); + let update = if is_published { + RoomUpdate::LocalAudioTrackPublished { publication } + } else { + RoomUpdate::LocalAudioTrackUnpublished { publication } + }; + room.update_subscribers + .lock() + .retain(|tx| tx.unbounded_send(update.clone()).is_ok()); + } + let _ = Weak::into_raw(room); + } + + extern "C" fn on_did_publish_or_unpublish_local_video_track( + room: *mut c_void, + publication: swift::LocalTrackPublication, + is_published: bool, + ) { + let room = unsafe { Weak::from_raw(room as *mut Room) }; + if let Some(room) = room.upgrade() { + let publication = LocalTrackPublication::new(publication); + let update = if is_published { + RoomUpdate::LocalVideoTrackPublished { publication } + } else { + RoomUpdate::LocalVideoTrackUnpublished { publication } + }; + room.update_subscribers + .lock() + .retain(|tx| tx.unbounded_send(update.clone()).is_ok()); + } + let _ = Weak::into_raw(room); + } } impl Drop for RoomDelegate { @@ -673,6 +727,10 @@ impl LocalTrackPublication { Self(native_track_publication) } + pub fn sid(&self) -> String { + unsafe { CFString::wrap_under_get_rule(LKLocalTrackPublicationGetSid(self.0)).to_string() } + } + pub fn set_mute(&self, muted: bool) -> impl Future> { let (tx, rx) = futures::channel::oneshot::channel(); @@ -697,6 +755,19 @@ impl LocalTrackPublication { async move { rx.await.unwrap() } } + + pub fn is_muted(&self) -> bool { + unsafe { LKLocalTrackPublicationIsMuted(self.0) } + } +} + +impl Clone for LocalTrackPublication { + fn clone(&self) -> Self { + unsafe { + CFRetain(self.0 .0); + } + Self(self.0) + } } impl Drop for LocalTrackPublication { diff --git a/crates/live_kit_client/src/test.rs b/crates/live_kit_client/src/test.rs index 9c1a5ec59a..0716042ff1 100644 --- a/crates/live_kit_client/src/test.rs +++ b/crates/live_kit_client/src/test.rs @@ -8,7 +8,14 @@ use live_kit_server::{proto, token}; use media::core_video::CVImageBuffer; use parking_lot::Mutex; use postage::watch; -use std::{future::Future, mem, sync::Arc}; +use std::{ + future::Future, + mem, + sync::{ + atomic::{AtomicBool, Ordering::SeqCst}, + Arc, + }, +}; static SERVERS: Mutex>> = Mutex::new(BTreeMap::new()); @@ -176,7 +183,11 @@ impl TestServer { } } - async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> { + async fn publish_video_track( + &self, + token: String, + local_track: LocalVideoTrack, + ) -> Result { self.executor.simulate_random_delay().await; let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let identity = claims.sub.unwrap().to_string(); @@ -198,8 +209,9 @@ impl TestServer { return Err(anyhow!("user is not allowed to publish")); } + let sid = nanoid::nanoid!(17); let track = Arc::new(RemoteVideoTrack { - sid: nanoid::nanoid!(17), + sid: sid.clone(), publisher_id: identity.clone(), frames_rx: local_track.frames_rx.clone(), }); @@ -217,14 +229,14 @@ impl TestServer { } } - Ok(()) + Ok(sid) } async fn publish_audio_track( &self, token: String, _local_track: &LocalAudioTrack, - ) -> Result<()> { + ) -> Result { self.executor.simulate_random_delay().await; let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let identity = claims.sub.unwrap().to_string(); @@ -246,8 +258,9 @@ impl TestServer { return Err(anyhow!("user is not allowed to publish")); } + let sid = nanoid::nanoid!(17); let track = Arc::new(RemoteAudioTrack { - sid: nanoid::nanoid!(17), + sid: sid.clone(), publisher_id: identity.clone(), }); @@ -269,7 +282,7 @@ impl TestServer { } } - Ok(()) + Ok(sid) } fn video_tracks(&self, token: String) -> Result>> { @@ -425,10 +438,14 @@ impl Room { let this = self.clone(); let track = track.clone(); async move { - this.test_server() + let sid = this + .test_server() .publish_video_track(this.token(), track) .await?; - Ok(LocalTrackPublication) + Ok(LocalTrackPublication { + muted: Default::default(), + sid, + }) } } pub fn publish_audio_track( @@ -438,10 +455,14 @@ impl Room { let this = self.clone(); let track = track.clone(); async move { - this.test_server() + let sid = this + .test_server() .publish_audio_track(this.token(), &track) .await?; - Ok(LocalTrackPublication) + Ok(LocalTrackPublication { + muted: Default::default(), + sid, + }) } } @@ -536,11 +557,27 @@ impl Drop for Room { } } -pub struct LocalTrackPublication; +#[derive(Clone)] +pub struct LocalTrackPublication { + sid: String, + muted: Arc, +} impl LocalTrackPublication { - pub fn set_mute(&self, _mute: bool) -> impl Future> { - async { Ok(()) } + pub fn set_mute(&self, mute: bool) -> impl Future> { + let muted = self.muted.clone(); + async move { + muted.store(mute, SeqCst); + Ok(()) + } + } + + pub fn is_muted(&self) -> bool { + self.muted.load(SeqCst) + } + + pub fn sid(&self) -> String { + self.sid.clone() } }