Finish port of livekit client and call2 (#3198)

This commit is contained in:
Mikayla Maki 2023-11-01 09:37:40 -07:00 committed by GitHub
commit bd61d71018
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 2935 additions and 497 deletions

35
Cargo.lock generated
View File

@ -1169,7 +1169,7 @@ dependencies = [
"futures 0.3.28",
"gpui2",
"language2",
"live_kit_client",
"live_kit_client2",
"log",
"media",
"postage",
@ -4589,6 +4589,39 @@ dependencies = [
"simplelog",
]
[[package]]
name = "live_kit_client2"
version = "0.1.0"
dependencies = [
"anyhow",
"async-broadcast",
"async-trait",
"block",
"byteorder",
"bytes 1.5.0",
"cocoa",
"collections",
"core-foundation",
"core-graphics",
"foreign-types",
"futures 0.3.28",
"gpui2",
"hmac 0.12.1",
"jwt",
"live_kit_server",
"log",
"media",
"nanoid",
"objc",
"parking_lot 0.11.2",
"postage",
"serde",
"serde_derive",
"serde_json",
"sha2 0.10.7",
"simplelog",
]
[[package]]
name = "live_kit_server"
version = "0.1.0"

View File

@ -13,7 +13,7 @@ test-support = [
"client2/test-support",
"collections/test-support",
"gpui2/test-support",
"live_kit_client/test-support",
"live_kit_client2/test-support",
"project2/test-support",
"util/test-support"
]
@ -24,7 +24,7 @@ client2 = { path = "../client2" }
collections = { path = "../collections" }
gpui2 = { path = "../gpui2" }
log.workspace = true
live_kit_client = { path = "../live_kit_client" }
live_kit_client2 = { path = "../live_kit_client2" }
fs2 = { path = "../fs2" }
language2 = { path = "../language2" }
media = { path = "../media" }
@ -47,6 +47,6 @@ fs2 = { path = "../fs2", features = ["test-support"] }
language2 = { path = "../language2", features = ["test-support"] }
collections = { path = "../collections", features = ["test-support"] }
gpui2 = { path = "../gpui2", features = ["test-support"] }
live_kit_client = { path = "../live_kit_client", features = ["test-support"] }
live_kit_client2 = { path = "../live_kit_client2", features = ["test-support"] }
project2 = { path = "../project2", features = ["test-support"] }
util = { path = "../util", features = ["test-support"] }

View File

@ -1,10 +1,12 @@
use anyhow::{anyhow, Result};
use client2::ParticipantIndex;
use client2::{proto, User};
use collections::HashMap;
use gpui2::WeakModel;
pub use live_kit_client::Frame;
pub use live_kit_client2::Frame;
use live_kit_client2::{RemoteAudioTrack, RemoteVideoTrack};
use project2::Project;
use std::{fmt, sync::Arc};
use std::sync::Arc;
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum ParticipantLocation {
@ -45,27 +47,6 @@ pub struct RemoteParticipant {
pub participant_index: ParticipantIndex,
pub muted: bool,
pub speaking: bool,
// pub video_tracks: HashMap<live_kit_client::Sid, Arc<RemoteVideoTrack>>,
// pub audio_tracks: HashMap<live_kit_client::Sid, Arc<RemoteAudioTrack>>,
}
#[derive(Clone)]
pub struct RemoteVideoTrack {
pub(crate) live_kit_track: Arc<live_kit_client::RemoteVideoTrack>,
}
unsafe impl Send for RemoteVideoTrack {}
// todo!("remove this sync because it's not legit")
unsafe impl Sync for RemoteVideoTrack {}
impl fmt::Debug for RemoteVideoTrack {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RemoteVideoTrack").finish()
}
}
impl RemoteVideoTrack {
pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
self.live_kit_track.frames()
}
pub video_tracks: HashMap<live_kit_client2::Sid, Arc<RemoteVideoTrack>>,
pub audio_tracks: HashMap<live_kit_client2::Sid, Arc<RemoteAudioTrack>>,
}

File diff suppressed because it is too large Load Diff

View File

@ -42,8 +42,8 @@
"repositoryURL": "https://github.com/apple/swift-protobuf.git",
"state": {
"branch": null,
"revision": "ce20dc083ee485524b802669890291c0d8090170",
"version": "1.22.1"
"revision": "0af9125c4eae12a4973fb66574c53a54962a9e1e",
"version": "1.21.0"
}
}
]

View File

@ -0,0 +1,2 @@
[live_kit_client_test]
rustflags = ["-C", "link-args=-ObjC"]

View File

@ -0,0 +1,71 @@
[package]
name = "live_kit_client2"
version = "0.1.0"
edition = "2021"
description = "Bindings to LiveKit Swift client SDK"
publish = false
[lib]
path = "src/live_kit_client2.rs"
doctest = false
[[example]]
name = "test_app"
[features]
test-support = [
"async-trait",
"collections/test-support",
"gpui2/test-support",
"live_kit_server",
"nanoid",
]
[dependencies]
collections = { path = "../collections", optional = true }
gpui2 = { path = "../gpui2", optional = true }
live_kit_server = { path = "../live_kit_server", optional = true }
media = { path = "../media" }
anyhow.workspace = true
async-broadcast = "0.4"
core-foundation = "0.9.3"
core-graphics = "0.22.3"
futures.workspace = true
log.workspace = true
parking_lot.workspace = true
postage.workspace = true
async-trait = { workspace = true, optional = true }
nanoid = { version ="0.4", optional = true}
[dev-dependencies]
collections = { path = "../collections", features = ["test-support"] }
gpui2 = { path = "../gpui2", features = ["test-support"] }
live_kit_server = { path = "../live_kit_server" }
media = { path = "../media" }
nanoid = "0.4"
anyhow.workspace = true
async-trait.workspace = true
block = "0.1"
bytes = "1.2"
byteorder = "1.4"
cocoa = "0.24"
core-foundation = "0.9.3"
core-graphics = "0.22.3"
foreign-types = "0.3"
futures.workspace = true
hmac = "0.12"
jwt = "0.16"
objc = "0.2"
parking_lot.workspace = true
serde.workspace = true
serde_derive.workspace = true
sha2 = "0.10"
simplelog = "0.9"
[build-dependencies]
serde.workspace = true
serde_derive.workspace = true
serde_json.workspace = true

View File

@ -0,0 +1,52 @@
{
"object": {
"pins": [
{
"package": "LiveKit",
"repositoryURL": "https://github.com/livekit/client-sdk-swift.git",
"state": {
"branch": null,
"revision": "7331b813a5ab8a95cfb81fb2b4ed10519428b9ff",
"version": "1.0.12"
}
},
{
"package": "Promises",
"repositoryURL": "https://github.com/google/promises.git",
"state": {
"branch": null,
"revision": "ec957ccddbcc710ccc64c9dcbd4c7006fcf8b73a",
"version": "2.2.0"
}
},
{
"package": "WebRTC",
"repositoryURL": "https://github.com/webrtc-sdk/Specs.git",
"state": {
"branch": null,
"revision": "2f6bab30c8df0fe59ab3e58bc99097f757f85f65",
"version": "104.5112.17"
}
},
{
"package": "swift-log",
"repositoryURL": "https://github.com/apple/swift-log.git",
"state": {
"branch": null,
"revision": "32e8d724467f8fe623624570367e3d50c5638e46",
"version": "1.5.2"
}
},
{
"package": "SwiftProtobuf",
"repositoryURL": "https://github.com/apple/swift-protobuf.git",
"state": {
"branch": null,
"revision": "ce20dc083ee485524b802669890291c0d8090170",
"version": "1.22.1"
}
}
]
},
"version": 1
}

View File

@ -0,0 +1,27 @@
// swift-tools-version: 5.5
import PackageDescription
let package = Package(
name: "LiveKitBridge",
platforms: [
.macOS(.v10_15)
],
products: [
// Products define the executables and libraries a package produces, and make them visible to other packages.
.library(
name: "LiveKitBridge",
type: .static,
targets: ["LiveKitBridge"]),
],
dependencies: [
.package(url: "https://github.com/livekit/client-sdk-swift.git", .exact("1.0.12")),
],
targets: [
// Targets are the basic building blocks of a package. A target can define a module or a test suite.
// Targets can depend on other targets in this package, and on products in packages this package depends on.
.target(
name: "LiveKitBridge",
dependencies: [.product(name: "LiveKit", package: "client-sdk-swift")]),
]
)

View File

@ -0,0 +1,3 @@
# LiveKitBridge
A description of this package.

View File

@ -0,0 +1,327 @@
import Foundation
import LiveKit
import WebRTC
import ScreenCaptureKit
class LKRoomDelegate: RoomDelegate {
var data: UnsafeRawPointer
var onDidDisconnect: @convention(c) (UnsafeRawPointer) -> Void
var onDidSubscribeToRemoteAudioTrack: @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer, UnsafeRawPointer) -> Void
var onDidUnsubscribeFromRemoteAudioTrack: @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void
var onMuteChangedFromRemoteAudioTrack: @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void
var onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void
var onDidSubscribeToRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void
var onDidUnsubscribeFromRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void
init(
data: UnsafeRawPointer,
onDidDisconnect: @escaping @convention(c) (UnsafeRawPointer) -> Void,
onDidSubscribeToRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer, UnsafeRawPointer) -> Void,
onDidUnsubscribeFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void,
onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void,
onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void,
onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void,
onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void)
{
self.data = data
self.onDidDisconnect = onDidDisconnect
self.onDidSubscribeToRemoteAudioTrack = onDidSubscribeToRemoteAudioTrack
self.onDidUnsubscribeFromRemoteAudioTrack = onDidUnsubscribeFromRemoteAudioTrack
self.onDidSubscribeToRemoteVideoTrack = onDidSubscribeToRemoteVideoTrack
self.onDidUnsubscribeFromRemoteVideoTrack = onDidUnsubscribeFromRemoteVideoTrack
self.onMuteChangedFromRemoteAudioTrack = onMuteChangedFromRemoteAudioTrack
self.onActiveSpeakersChanged = onActiveSpeakersChanged
}
func room(_ room: Room, didUpdate connectionState: ConnectionState, oldValue: ConnectionState) {
if connectionState.isDisconnected {
self.onDidDisconnect(self.data)
}
}
func room(_ room: Room, participant: RemoteParticipant, didSubscribe publication: RemoteTrackPublication, track: Track) {
if track.kind == .video {
self.onDidSubscribeToRemoteVideoTrack(self.data, participant.identity as CFString, track.sid! as CFString, Unmanaged.passUnretained(track).toOpaque())
} else if track.kind == .audio {
self.onDidSubscribeToRemoteAudioTrack(self.data, participant.identity as CFString, track.sid! as CFString, Unmanaged.passUnretained(track).toOpaque(), Unmanaged.passUnretained(publication).toOpaque())
}
}
func room(_ room: Room, participant: Participant, didUpdate publication: TrackPublication, muted: Bool) {
if publication.kind == .audio {
self.onMuteChangedFromRemoteAudioTrack(self.data, publication.sid as CFString, muted)
}
}
func room(_ room: Room, didUpdate speakers: [Participant]) {
guard let speaker_ids = speakers.compactMap({ $0.identity as CFString }) as CFArray? else { return }
self.onActiveSpeakersChanged(self.data, speaker_ids)
}
func room(_ room: Room, participant: RemoteParticipant, didUnsubscribe publication: RemoteTrackPublication, track: Track) {
if track.kind == .video {
self.onDidUnsubscribeFromRemoteVideoTrack(self.data, participant.identity as CFString, track.sid! as CFString)
} else if track.kind == .audio {
self.onDidUnsubscribeFromRemoteAudioTrack(self.data, participant.identity as CFString, track.sid! as CFString)
}
}
}
class LKVideoRenderer: NSObject, VideoRenderer {
var data: UnsafeRawPointer
var onFrame: @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool
var onDrop: @convention(c) (UnsafeRawPointer) -> Void
var adaptiveStreamIsEnabled: Bool = false
var adaptiveStreamSize: CGSize = .zero
weak var track: VideoTrack?
init(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) {
self.data = data
self.onFrame = onFrame
self.onDrop = onDrop
}
deinit {
self.onDrop(self.data)
}
func setSize(_ size: CGSize) {
}
func renderFrame(_ frame: RTCVideoFrame?) {
let buffer = frame?.buffer as? RTCCVPixelBuffer
if let pixelBuffer = buffer?.pixelBuffer {
if !self.onFrame(self.data, pixelBuffer) {
DispatchQueue.main.async {
self.track?.remove(videoRenderer: self)
}
}
}
}
}
@_cdecl("LKRoomDelegateCreate")
public func LKRoomDelegateCreate(
data: UnsafeRawPointer,
onDidDisconnect: @escaping @convention(c) (UnsafeRawPointer) -> Void,
onDidSubscribeToRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer, UnsafeRawPointer) -> Void,
onDidUnsubscribeFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void,
onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void,
onActiveSpeakerChanged: @escaping @convention(c) (UnsafeRawPointer, CFArray) -> Void,
onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void,
onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void
) -> UnsafeMutableRawPointer {
let delegate = LKRoomDelegate(
data: data,
onDidDisconnect: onDidDisconnect,
onDidSubscribeToRemoteAudioTrack: onDidSubscribeToRemoteAudioTrack,
onDidUnsubscribeFromRemoteAudioTrack: onDidUnsubscribeFromRemoteAudioTrack,
onMuteChangedFromRemoteAudioTrack: onMuteChangedFromRemoteAudioTrack,
onActiveSpeakersChanged: onActiveSpeakerChanged,
onDidSubscribeToRemoteVideoTrack: onDidSubscribeToRemoteVideoTrack,
onDidUnsubscribeFromRemoteVideoTrack: onDidUnsubscribeFromRemoteVideoTrack
)
return Unmanaged.passRetained(delegate).toOpaque()
}
@_cdecl("LKRoomCreate")
public func LKRoomCreate(delegate: UnsafeRawPointer) -> UnsafeMutableRawPointer {
let delegate = Unmanaged<LKRoomDelegate>.fromOpaque(delegate).takeUnretainedValue()
return Unmanaged.passRetained(Room(delegate: delegate)).toOpaque()
}
@_cdecl("LKRoomConnect")
public func LKRoomConnect(room: UnsafeRawPointer, url: CFString, token: CFString, callback: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, callback_data: UnsafeRawPointer) {
let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
room.connect(url as String, token as String).then { _ in
callback(callback_data, UnsafeRawPointer(nil) as! CFString?)
}.catch { error in
callback(callback_data, error.localizedDescription as CFString)
}
}
@_cdecl("LKRoomDisconnect")
public func LKRoomDisconnect(room: UnsafeRawPointer) {
let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
room.disconnect()
}
@_cdecl("LKRoomPublishVideoTrack")
public func LKRoomPublishVideoTrack(room: UnsafeRawPointer, track: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, UnsafeMutableRawPointer?, CFString?) -> Void, callback_data: UnsafeRawPointer) {
let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
let track = Unmanaged<LocalVideoTrack>.fromOpaque(track).takeUnretainedValue()
room.localParticipant?.publishVideoTrack(track: track).then { publication in
callback(callback_data, Unmanaged.passRetained(publication).toOpaque(), nil)
}.catch { error in
callback(callback_data, nil, error.localizedDescription as CFString)
}
}
@_cdecl("LKRoomPublishAudioTrack")
public func LKRoomPublishAudioTrack(room: UnsafeRawPointer, track: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, UnsafeMutableRawPointer?, CFString?) -> Void, callback_data: UnsafeRawPointer) {
let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
let track = Unmanaged<LocalAudioTrack>.fromOpaque(track).takeUnretainedValue()
room.localParticipant?.publishAudioTrack(track: track).then { publication in
callback(callback_data, Unmanaged.passRetained(publication).toOpaque(), nil)
}.catch { error in
callback(callback_data, nil, error.localizedDescription as CFString)
}
}
@_cdecl("LKRoomUnpublishTrack")
public func LKRoomUnpublishTrack(room: UnsafeRawPointer, publication: UnsafeRawPointer) {
let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
let publication = Unmanaged<LocalTrackPublication>.fromOpaque(publication).takeUnretainedValue()
let _ = room.localParticipant?.unpublish(publication: publication)
}
@_cdecl("LKRoomAudioTracksForRemoteParticipant")
public func LKRoomAudioTracksForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? {
let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
for (_, participant) in room.remoteParticipants {
if participant.identity == participantId as String {
return participant.audioTracks.compactMap { $0.track as? RemoteAudioTrack } as CFArray?
}
}
return nil;
}
@_cdecl("LKRoomAudioTrackPublicationsForRemoteParticipant")
public func LKRoomAudioTrackPublicationsForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? {
let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
for (_, participant) in room.remoteParticipants {
if participant.identity == participantId as String {
return participant.audioTracks.compactMap { $0 as? RemoteTrackPublication } as CFArray?
}
}
return nil;
}
@_cdecl("LKRoomVideoTracksForRemoteParticipant")
public func LKRoomVideoTracksForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? {
let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
for (_, participant) in room.remoteParticipants {
if participant.identity == participantId as String {
return participant.videoTracks.compactMap { $0.track as? RemoteVideoTrack } as CFArray?
}
}
return nil;
}
@_cdecl("LKLocalAudioTrackCreateTrack")
public func LKLocalAudioTrackCreateTrack() -> UnsafeMutableRawPointer {
let track = LocalAudioTrack.createTrack(options: AudioCaptureOptions(
echoCancellation: true,
noiseSuppression: true
))
return Unmanaged.passRetained(track).toOpaque()
}
@_cdecl("LKCreateScreenShareTrackForDisplay")
public func LKCreateScreenShareTrackForDisplay(display: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer {
let display = Unmanaged<MacOSDisplay>.fromOpaque(display).takeUnretainedValue()
let track = LocalVideoTrack.createMacOSScreenShareTrack(source: display, preferredMethod: .legacy)
return Unmanaged.passRetained(track).toOpaque()
}
@_cdecl("LKVideoRendererCreate")
public func LKVideoRendererCreate(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) -> UnsafeMutableRawPointer {
Unmanaged.passRetained(LKVideoRenderer(data: data, onFrame: onFrame, onDrop: onDrop)).toOpaque()
}
@_cdecl("LKVideoTrackAddRenderer")
public func LKVideoTrackAddRenderer(track: UnsafeRawPointer, renderer: UnsafeRawPointer) {
let track = Unmanaged<Track>.fromOpaque(track).takeUnretainedValue() as! VideoTrack
let renderer = Unmanaged<LKVideoRenderer>.fromOpaque(renderer).takeRetainedValue()
renderer.track = track
track.add(videoRenderer: renderer)
}
@_cdecl("LKRemoteVideoTrackGetSid")
public func LKRemoteVideoTrackGetSid(track: UnsafeRawPointer) -> CFString {
let track = Unmanaged<RemoteVideoTrack>.fromOpaque(track).takeUnretainedValue()
return track.sid! as CFString
}
@_cdecl("LKRemoteAudioTrackGetSid")
public func LKRemoteAudioTrackGetSid(track: UnsafeRawPointer) -> CFString {
let track = Unmanaged<RemoteAudioTrack>.fromOpaque(track).takeUnretainedValue()
return track.sid! as CFString
}
@_cdecl("LKDisplaySources")
public func LKDisplaySources(data: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, CFArray?, CFString?) -> Void) {
MacOSScreenCapturer.sources(for: .display, includeCurrentApplication: false, preferredMethod: .legacy).then { displaySources in
callback(data, displaySources as CFArray, nil)
}.catch { error in
callback(data, nil, error.localizedDescription as CFString)
}
}
@_cdecl("LKLocalTrackPublicationSetMute")
public func LKLocalTrackPublicationSetMute(
publication: UnsafeRawPointer,
muted: Bool,
on_complete: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void,
callback_data: UnsafeRawPointer
) {
let publication = Unmanaged<LocalTrackPublication>.fromOpaque(publication).takeUnretainedValue()
if muted {
publication.mute().then {
on_complete(callback_data, nil)
}.catch { error in
on_complete(callback_data, error.localizedDescription as CFString)
}
} else {
publication.unmute().then {
on_complete(callback_data, nil)
}.catch { error in
on_complete(callback_data, error.localizedDescription as CFString)
}
}
}
@_cdecl("LKRemoteTrackPublicationSetEnabled")
public func LKRemoteTrackPublicationSetEnabled(
publication: UnsafeRawPointer,
enabled: Bool,
on_complete: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void,
callback_data: UnsafeRawPointer
) {
let publication = Unmanaged<RemoteTrackPublication>.fromOpaque(publication).takeUnretainedValue()
publication.set(enabled: enabled).then {
on_complete(callback_data, nil)
}.catch { error in
on_complete(callback_data, error.localizedDescription as CFString)
}
}
@_cdecl("LKRemoteTrackPublicationIsMuted")
public func LKRemoteTrackPublicationIsMuted(
publication: UnsafeRawPointer
) -> Bool {
let publication = Unmanaged<RemoteTrackPublication>.fromOpaque(publication).takeUnretainedValue()
return publication.muted
}
@_cdecl("LKRemoteTrackPublicationGetSid")
public func LKRemoteTrackPublicationGetSid(
publication: UnsafeRawPointer
) -> CFString {
let publication = Unmanaged<RemoteTrackPublication>.fromOpaque(publication).takeUnretainedValue()
return publication.sid as CFString
}

View File

@ -0,0 +1,172 @@
use serde::Deserialize;
use std::{
env,
path::{Path, PathBuf},
process::Command,
};
const SWIFT_PACKAGE_NAME: &str = "LiveKitBridge";
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SwiftTargetInfo {
pub triple: String,
pub unversioned_triple: String,
pub module_triple: String,
pub swift_runtime_compatibility_version: String,
#[serde(rename = "librariesRequireRPath")]
pub libraries_require_rpath: bool,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SwiftPaths {
pub runtime_library_paths: Vec<String>,
pub runtime_library_import_paths: Vec<String>,
pub runtime_resource_path: String,
}
#[derive(Debug, Deserialize)]
pub struct SwiftTarget {
pub target: SwiftTargetInfo,
pub paths: SwiftPaths,
}
const MACOS_TARGET_VERSION: &str = "10.15.7";
fn main() {
if cfg!(not(any(test, feature = "test-support"))) {
let swift_target = get_swift_target();
build_bridge(&swift_target);
link_swift_stdlib(&swift_target);
link_webrtc_framework(&swift_target);
// Register exported Objective-C selectors, protocols, etc when building example binaries.
println!("cargo:rustc-link-arg=-Wl,-ObjC");
}
}
fn build_bridge(swift_target: &SwiftTarget) {
println!("cargo:rerun-if-env-changed=MACOSX_DEPLOYMENT_TARGET");
println!("cargo:rerun-if-changed={}/Sources", SWIFT_PACKAGE_NAME);
println!(
"cargo:rerun-if-changed={}/Package.swift",
SWIFT_PACKAGE_NAME
);
println!(
"cargo:rerun-if-changed={}/Package.resolved",
SWIFT_PACKAGE_NAME
);
let swift_package_root = swift_package_root();
let swift_target_folder = swift_target_folder();
if !Command::new("swift")
.arg("build")
.arg("--disable-automatic-resolution")
.args(["--configuration", &env::var("PROFILE").unwrap()])
.args(["--triple", &swift_target.target.triple])
.args(["--build-path".into(), swift_target_folder])
.current_dir(&swift_package_root)
.status()
.unwrap()
.success()
{
panic!(
"Failed to compile swift package in {}",
swift_package_root.display()
);
}
println!(
"cargo:rustc-link-search=native={}",
swift_target.out_dir_path().display()
);
println!("cargo:rustc-link-lib=static={}", SWIFT_PACKAGE_NAME);
}
fn link_swift_stdlib(swift_target: &SwiftTarget) {
for path in &swift_target.paths.runtime_library_paths {
println!("cargo:rustc-link-search=native={}", path);
}
}
fn link_webrtc_framework(swift_target: &SwiftTarget) {
let swift_out_dir_path = swift_target.out_dir_path();
println!("cargo:rustc-link-lib=framework=WebRTC");
println!(
"cargo:rustc-link-search=framework={}",
swift_out_dir_path.display()
);
// Find WebRTC.framework as a sibling of the executable when running tests.
println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path");
// Find WebRTC.framework in parent directory of the executable when running examples.
println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path/..");
let source_path = swift_out_dir_path.join("WebRTC.framework");
let deps_dir_path =
PathBuf::from(env::var("OUT_DIR").unwrap()).join("../../../deps/WebRTC.framework");
let target_dir_path =
PathBuf::from(env::var("OUT_DIR").unwrap()).join("../../../WebRTC.framework");
copy_dir(&source_path, &deps_dir_path);
copy_dir(&source_path, &target_dir_path);
}
fn get_swift_target() -> SwiftTarget {
let mut arch = env::var("CARGO_CFG_TARGET_ARCH").unwrap();
if arch == "aarch64" {
arch = "arm64".into();
}
let target = format!("{}-apple-macosx{}", arch, MACOS_TARGET_VERSION);
let swift_target_info_str = Command::new("swift")
.args(["-target", &target, "-print-target-info"])
.output()
.unwrap()
.stdout;
serde_json::from_slice(&swift_target_info_str).unwrap()
}
fn swift_package_root() -> PathBuf {
env::current_dir().unwrap().join(SWIFT_PACKAGE_NAME)
}
fn swift_target_folder() -> PathBuf {
env::current_dir()
.unwrap()
.join(format!("../../target/{SWIFT_PACKAGE_NAME}"))
}
fn copy_dir(source: &Path, destination: &Path) {
assert!(
Command::new("rm")
.arg("-rf")
.arg(destination)
.status()
.unwrap()
.success(),
"could not remove {:?} before copying",
destination
);
assert!(
Command::new("cp")
.arg("-R")
.args([source, destination])
.status()
.unwrap()
.success(),
"could not copy {:?} to {:?}",
source,
destination
);
}
impl SwiftTarget {
fn out_dir_path(&self) -> PathBuf {
swift_target_folder()
.join(&self.target.unversioned_triple)
.join(env::var("PROFILE").unwrap())
}
}

View File

@ -0,0 +1,178 @@
use std::{sync::Arc, time::Duration};
use futures::StreamExt;
use gpui2::KeyBinding;
use live_kit_client2::{
LocalAudioTrack, LocalVideoTrack, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate, Room,
};
use live_kit_server::token::{self, VideoGrant};
use log::LevelFilter;
use serde_derive::Deserialize;
use simplelog::SimpleLogger;
#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq, Default)]
struct Quit;
fn main() {
SimpleLogger::init(LevelFilter::Info, Default::default()).expect("could not initialize logger");
gpui2::App::production(Arc::new(())).run(|cx| {
#[cfg(any(test, feature = "test-support"))]
println!("USING TEST LIVEKIT");
#[cfg(not(any(test, feature = "test-support")))]
println!("USING REAL LIVEKIT");
cx.activate(true);
cx.on_action(quit);
cx.bind_keys([KeyBinding::new("cmd-q", Quit, None)]);
// todo!()
// cx.set_menus(vec![Menu {
// name: "Zed",
// items: vec![MenuItem::Action {
// name: "Quit",
// action: Box::new(Quit),
// os_action: None,
// }],
// }]);
let live_kit_url = std::env::var("LIVE_KIT_URL").unwrap_or("http://localhost:7880".into());
let live_kit_key = std::env::var("LIVE_KIT_KEY").unwrap_or("devkey".into());
let live_kit_secret = std::env::var("LIVE_KIT_SECRET").unwrap_or("secret".into());
cx.spawn_on_main(|cx| async move {
let user_a_token = token::create(
&live_kit_key,
&live_kit_secret,
Some("test-participant-1"),
VideoGrant::to_join("test-room"),
)
.unwrap();
let room_a = Room::new();
room_a.connect(&live_kit_url, &user_a_token).await.unwrap();
let user2_token = token::create(
&live_kit_key,
&live_kit_secret,
Some("test-participant-2"),
VideoGrant::to_join("test-room"),
)
.unwrap();
let room_b = Room::new();
room_b.connect(&live_kit_url, &user2_token).await.unwrap();
let mut audio_track_updates = room_b.remote_audio_track_updates();
let audio_track = LocalAudioTrack::create();
let audio_track_publication = room_a.publish_audio_track(audio_track).await.unwrap();
if let RemoteAudioTrackUpdate::Subscribed(track, _) =
audio_track_updates.next().await.unwrap()
{
let remote_tracks = room_b.remote_audio_tracks("test-participant-1");
assert_eq!(remote_tracks.len(), 1);
assert_eq!(remote_tracks[0].publisher_id(), "test-participant-1");
assert_eq!(track.publisher_id(), "test-participant-1");
} else {
panic!("unexpected message");
}
audio_track_publication.set_mute(true).await.unwrap();
println!("waiting for mute changed!");
if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } =
audio_track_updates.next().await.unwrap()
{
let remote_tracks = room_b.remote_audio_tracks("test-participant-1");
assert_eq!(remote_tracks[0].sid(), track_id);
assert_eq!(muted, true);
} else {
panic!("unexpected message");
}
audio_track_publication.set_mute(false).await.unwrap();
if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } =
audio_track_updates.next().await.unwrap()
{
let remote_tracks = room_b.remote_audio_tracks("test-participant-1");
assert_eq!(remote_tracks[0].sid(), track_id);
assert_eq!(muted, false);
} else {
panic!("unexpected message");
}
println!("Pausing for 5 seconds to test audio, make some noise!");
let timer = cx.executor().timer(Duration::from_secs(5));
timer.await;
let remote_audio_track = room_b
.remote_audio_tracks("test-participant-1")
.pop()
.unwrap();
room_a.unpublish_track(audio_track_publication);
// Clear out any active speakers changed messages
let mut next = audio_track_updates.next().await.unwrap();
while let RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } = next {
println!("Speakers changed: {:?}", speakers);
next = audio_track_updates.next().await.unwrap();
}
if let RemoteAudioTrackUpdate::Unsubscribed {
publisher_id,
track_id,
} = next
{
assert_eq!(publisher_id, "test-participant-1");
assert_eq!(remote_audio_track.sid(), track_id);
assert_eq!(room_b.remote_audio_tracks("test-participant-1").len(), 0);
} else {
panic!("unexpected message");
}
let mut video_track_updates = room_b.remote_video_track_updates();
let displays = room_a.display_sources().await.unwrap();
let display = displays.into_iter().next().unwrap();
let local_video_track = LocalVideoTrack::screen_share_for_display(&display);
let local_video_track_publication =
room_a.publish_video_track(local_video_track).await.unwrap();
if let RemoteVideoTrackUpdate::Subscribed(track) =
video_track_updates.next().await.unwrap()
{
let remote_video_tracks = room_b.remote_video_tracks("test-participant-1");
assert_eq!(remote_video_tracks.len(), 1);
assert_eq!(remote_video_tracks[0].publisher_id(), "test-participant-1");
assert_eq!(track.publisher_id(), "test-participant-1");
} else {
panic!("unexpected message");
}
let remote_video_track = room_b
.remote_video_tracks("test-participant-1")
.pop()
.unwrap();
room_a.unpublish_track(local_video_track_publication);
if let RemoteVideoTrackUpdate::Unsubscribed {
publisher_id,
track_id,
} = video_track_updates.next().await.unwrap()
{
assert_eq!(publisher_id, "test-participant-1");
assert_eq!(remote_video_track.sid(), track_id);
assert_eq!(room_b.remote_video_tracks("test-participant-1").len(), 0);
} else {
panic!("unexpected message");
}
cx.update(|cx| cx.quit()).ok();
})
.detach();
});
}
fn quit(_: &Quit, cx: &mut gpui2::AppContext) {
cx.quit();
}

View File

@ -0,0 +1,11 @@
#[cfg(not(any(test, feature = "test-support")))]
pub mod prod;
#[cfg(not(any(test, feature = "test-support")))]
pub use prod::*;
#[cfg(any(test, feature = "test-support"))]
pub mod test;
#[cfg(any(test, feature = "test-support"))]
pub use test::*;

View File

@ -0,0 +1,947 @@
use anyhow::{anyhow, Context, Result};
use core_foundation::{
array::{CFArray, CFArrayRef},
base::{CFRelease, CFRetain, TCFType},
string::{CFString, CFStringRef},
};
use futures::{
channel::{mpsc, oneshot},
Future,
};
pub use media::core_video::CVImageBuffer;
use media::core_video::CVImageBufferRef;
use parking_lot::Mutex;
use postage::watch;
use std::{
ffi::c_void,
sync::{Arc, Weak},
};
// SAFETY: Most live kit types are threadsafe:
// https://github.com/livekit/client-sdk-swift#thread-safety
macro_rules! pointer_type {
($pointer_name:ident) => {
#[repr(transparent)]
#[derive(Copy, Clone, Debug)]
pub struct $pointer_name(pub *const std::ffi::c_void);
unsafe impl Send for $pointer_name {}
};
}
mod swift {
pointer_type!(Room);
pointer_type!(LocalAudioTrack);
pointer_type!(RemoteAudioTrack);
pointer_type!(LocalVideoTrack);
pointer_type!(RemoteVideoTrack);
pointer_type!(LocalTrackPublication);
pointer_type!(RemoteTrackPublication);
pointer_type!(MacOSDisplay);
pointer_type!(RoomDelegate);
}
extern "C" {
fn LKRoomDelegateCreate(
callback_data: *mut c_void,
on_did_disconnect: extern "C" fn(callback_data: *mut c_void),
on_did_subscribe_to_remote_audio_track: extern "C" fn(
callback_data: *mut c_void,
publisher_id: CFStringRef,
track_id: CFStringRef,
remote_track: swift::RemoteAudioTrack,
remote_publication: swift::RemoteTrackPublication,
),
on_did_unsubscribe_from_remote_audio_track: extern "C" fn(
callback_data: *mut c_void,
publisher_id: CFStringRef,
track_id: CFStringRef,
),
on_mute_changed_from_remote_audio_track: extern "C" fn(
callback_data: *mut c_void,
track_id: CFStringRef,
muted: bool,
),
on_active_speakers_changed: extern "C" fn(
callback_data: *mut c_void,
participants: CFArrayRef,
),
on_did_subscribe_to_remote_video_track: extern "C" fn(
callback_data: *mut c_void,
publisher_id: CFStringRef,
track_id: CFStringRef,
remote_track: swift::RemoteVideoTrack,
),
on_did_unsubscribe_from_remote_video_track: extern "C" fn(
callback_data: *mut c_void,
publisher_id: CFStringRef,
track_id: CFStringRef,
),
) -> swift::RoomDelegate;
fn LKRoomCreate(delegate: swift::RoomDelegate) -> swift::Room;
fn LKRoomConnect(
room: swift::Room,
url: CFStringRef,
token: CFStringRef,
callback: extern "C" fn(*mut c_void, CFStringRef),
callback_data: *mut c_void,
);
fn LKRoomDisconnect(room: swift::Room);
fn LKRoomPublishVideoTrack(
room: swift::Room,
track: swift::LocalVideoTrack,
callback: extern "C" fn(*mut c_void, swift::LocalTrackPublication, CFStringRef),
callback_data: *mut c_void,
);
fn LKRoomPublishAudioTrack(
room: swift::Room,
track: swift::LocalAudioTrack,
callback: extern "C" fn(*mut c_void, swift::LocalTrackPublication, CFStringRef),
callback_data: *mut c_void,
);
fn LKRoomUnpublishTrack(room: swift::Room, publication: swift::LocalTrackPublication);
fn LKRoomAudioTracksForRemoteParticipant(
room: swift::Room,
participant_id: CFStringRef,
) -> CFArrayRef;
fn LKRoomAudioTrackPublicationsForRemoteParticipant(
room: swift::Room,
participant_id: CFStringRef,
) -> CFArrayRef;
fn LKRoomVideoTracksForRemoteParticipant(
room: swift::Room,
participant_id: CFStringRef,
) -> CFArrayRef;
fn LKVideoRendererCreate(
callback_data: *mut c_void,
on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
on_drop: extern "C" fn(callback_data: *mut c_void),
) -> *const c_void;
fn LKRemoteAudioTrackGetSid(track: swift::RemoteAudioTrack) -> CFStringRef;
fn LKVideoTrackAddRenderer(track: swift::RemoteVideoTrack, renderer: *const c_void);
fn LKRemoteVideoTrackGetSid(track: swift::RemoteVideoTrack) -> CFStringRef;
fn LKDisplaySources(
callback_data: *mut c_void,
callback: extern "C" fn(
callback_data: *mut c_void,
sources: CFArrayRef,
error: CFStringRef,
),
);
fn LKCreateScreenShareTrackForDisplay(display: swift::MacOSDisplay) -> swift::LocalVideoTrack;
fn LKLocalAudioTrackCreateTrack() -> swift::LocalAudioTrack;
fn LKLocalTrackPublicationSetMute(
publication: swift::LocalTrackPublication,
muted: bool,
on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef),
callback_data: *mut c_void,
);
fn LKRemoteTrackPublicationSetEnabled(
publication: swift::RemoteTrackPublication,
enabled: bool,
on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef),
callback_data: *mut c_void,
);
fn LKRemoteTrackPublicationIsMuted(publication: swift::RemoteTrackPublication) -> bool;
fn LKRemoteTrackPublicationGetSid(publication: swift::RemoteTrackPublication) -> CFStringRef;
}
pub type Sid = String;
#[derive(Clone, Eq, PartialEq)]
pub enum ConnectionState {
Disconnected,
Connected { url: String, token: String },
}
pub struct Room {
native_room: Mutex<swift::Room>,
connection: Mutex<(
watch::Sender<ConnectionState>,
watch::Receiver<ConnectionState>,
)>,
remote_audio_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteAudioTrackUpdate>>>,
remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
_delegate: Mutex<RoomDelegate>,
}
trait AssertSendSync: Send {}
impl AssertSendSync for Room {}
impl Room {
pub fn new() -> Arc<Self> {
Arc::new_cyclic(|weak_room| {
let delegate = RoomDelegate::new(weak_room.clone());
Self {
native_room: Mutex::new(unsafe { LKRoomCreate(delegate.native_delegate) }),
connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
remote_audio_track_subscribers: Default::default(),
remote_video_track_subscribers: Default::default(),
_delegate: Mutex::new(delegate),
}
})
}
pub fn status(&self) -> watch::Receiver<ConnectionState> {
self.connection.lock().1.clone()
}
pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
let url = CFString::new(url);
let token = CFString::new(token);
let (did_connect, tx, rx) = Self::build_done_callback();
unsafe {
LKRoomConnect(
*self.native_room.lock(),
url.as_concrete_TypeRef(),
token.as_concrete_TypeRef(),
did_connect,
tx,
)
}
let this = self.clone();
let url = url.to_string();
let token = token.to_string();
async move {
rx.await.unwrap().context("error connecting to room")?;
*this.connection.lock().0.borrow_mut() = ConnectionState::Connected { url, token };
Ok(())
}
}
fn did_disconnect(&self) {
*self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected;
}
pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
unsafe {
let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
if sources.is_null() {
let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
} else {
let sources = CFArray::wrap_under_get_rule(sources)
.into_iter()
.map(|source| MacOSDisplay::new(swift::MacOSDisplay(*source)))
.collect();
let _ = tx.send(Ok(sources));
}
}
}
let (tx, rx) = oneshot::channel();
unsafe {
LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
}
async move { rx.await.unwrap() }
}
pub fn publish_video_track(
self: &Arc<Self>,
track: LocalVideoTrack,
) -> impl Future<Output = Result<LocalTrackPublication>> {
let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
extern "C" fn callback(
tx: *mut c_void,
publication: swift::LocalTrackPublication,
error: CFStringRef,
) {
let tx =
unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
if error.is_null() {
let _ = tx.send(Ok(LocalTrackPublication::new(publication)));
} else {
let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
let _ = tx.send(Err(anyhow!(error)));
}
}
unsafe {
LKRoomPublishVideoTrack(
*self.native_room.lock(),
track.0,
callback,
Box::into_raw(Box::new(tx)) as *mut c_void,
);
}
async { rx.await.unwrap().context("error publishing video track") }
}
pub fn publish_audio_track(
self: &Arc<Self>,
track: LocalAudioTrack,
) -> impl Future<Output = Result<LocalTrackPublication>> {
let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
extern "C" fn callback(
tx: *mut c_void,
publication: swift::LocalTrackPublication,
error: CFStringRef,
) {
let tx =
unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
if error.is_null() {
let _ = tx.send(Ok(LocalTrackPublication::new(publication)));
} else {
let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
let _ = tx.send(Err(anyhow!(error)));
}
}
unsafe {
LKRoomPublishAudioTrack(
*self.native_room.lock(),
track.0,
callback,
Box::into_raw(Box::new(tx)) as *mut c_void,
);
}
async { rx.await.unwrap().context("error publishing audio track") }
}
pub fn unpublish_track(&self, publication: LocalTrackPublication) {
unsafe {
LKRoomUnpublishTrack(*self.native_room.lock(), publication.0);
}
}
pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
unsafe {
let tracks = LKRoomVideoTracksForRemoteParticipant(
*self.native_room.lock(),
CFString::new(participant_id).as_concrete_TypeRef(),
);
if tracks.is_null() {
Vec::new()
} else {
let tracks = CFArray::wrap_under_get_rule(tracks);
tracks
.into_iter()
.map(|native_track| {
let native_track = swift::RemoteVideoTrack(*native_track);
let id =
CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
.to_string();
Arc::new(RemoteVideoTrack::new(
native_track,
id,
participant_id.into(),
))
})
.collect()
}
}
}
pub fn remote_audio_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
unsafe {
let tracks = LKRoomAudioTracksForRemoteParticipant(
*self.native_room.lock(),
CFString::new(participant_id).as_concrete_TypeRef(),
);
if tracks.is_null() {
Vec::new()
} else {
let tracks = CFArray::wrap_under_get_rule(tracks);
tracks
.into_iter()
.map(|native_track| {
let native_track = swift::RemoteAudioTrack(*native_track);
let id =
CFString::wrap_under_get_rule(LKRemoteAudioTrackGetSid(native_track))
.to_string();
Arc::new(RemoteAudioTrack::new(
native_track,
id,
participant_id.into(),
))
})
.collect()
}
}
}
pub fn remote_audio_track_publications(
&self,
participant_id: &str,
) -> Vec<Arc<RemoteTrackPublication>> {
unsafe {
let tracks = LKRoomAudioTrackPublicationsForRemoteParticipant(
*self.native_room.lock(),
CFString::new(participant_id).as_concrete_TypeRef(),
);
if tracks.is_null() {
Vec::new()
} else {
let tracks = CFArray::wrap_under_get_rule(tracks);
tracks
.into_iter()
.map(|native_track_publication| {
let native_track_publication =
swift::RemoteTrackPublication(*native_track_publication);
Arc::new(RemoteTrackPublication::new(native_track_publication))
})
.collect()
}
}
}
pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteAudioTrackUpdate> {
let (tx, rx) = mpsc::unbounded();
self.remote_audio_track_subscribers.lock().push(tx);
rx
}
pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
let (tx, rx) = mpsc::unbounded();
self.remote_video_track_subscribers.lock().push(tx);
rx
}
fn did_subscribe_to_remote_audio_track(
&self,
track: RemoteAudioTrack,
publication: RemoteTrackPublication,
) {
let track = Arc::new(track);
let publication = Arc::new(publication);
self.remote_audio_track_subscribers.lock().retain(|tx| {
tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed(
track.clone(),
publication.clone(),
))
.is_ok()
});
}
fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) {
self.remote_audio_track_subscribers.lock().retain(|tx| {
tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed {
publisher_id: publisher_id.clone(),
track_id: track_id.clone(),
})
.is_ok()
});
}
fn mute_changed_from_remote_audio_track(&self, track_id: String, muted: bool) {
self.remote_audio_track_subscribers.lock().retain(|tx| {
tx.unbounded_send(RemoteAudioTrackUpdate::MuteChanged {
track_id: track_id.clone(),
muted,
})
.is_ok()
});
}
// A vec of publisher IDs
fn active_speakers_changed(&self, speakers: Vec<String>) {
self.remote_audio_track_subscribers
.lock()
.retain(move |tx| {
tx.unbounded_send(RemoteAudioTrackUpdate::ActiveSpeakersChanged {
speakers: speakers.clone(),
})
.is_ok()
});
}
fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
let track = Arc::new(track);
self.remote_video_track_subscribers.lock().retain(|tx| {
tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
.is_ok()
});
}
fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
self.remote_video_track_subscribers.lock().retain(|tx| {
tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
publisher_id: publisher_id.clone(),
track_id: track_id.clone(),
})
.is_ok()
});
}
fn build_done_callback() -> (
extern "C" fn(*mut c_void, CFStringRef),
*mut c_void,
oneshot::Receiver<Result<()>>,
) {
let (tx, rx) = oneshot::channel();
extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
if error.is_null() {
let _ = tx.send(Ok(()));
} else {
let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
let _ = tx.send(Err(anyhow!(error)));
}
}
(
done_callback,
Box::into_raw(Box::new(tx)) as *mut c_void,
rx,
)
}
pub fn set_display_sources(&self, _: Vec<MacOSDisplay>) {
unreachable!("This is a test-only function")
}
}
impl Drop for Room {
fn drop(&mut self) {
unsafe {
let native_room = &*self.native_room.lock();
LKRoomDisconnect(*native_room);
CFRelease(native_room.0);
}
}
}
struct RoomDelegate {
native_delegate: swift::RoomDelegate,
_weak_room: Weak<Room>,
}
impl RoomDelegate {
fn new(weak_room: Weak<Room>) -> Self {
let native_delegate = unsafe {
LKRoomDelegateCreate(
weak_room.as_ptr() as *mut c_void,
Self::on_did_disconnect,
Self::on_did_subscribe_to_remote_audio_track,
Self::on_did_unsubscribe_from_remote_audio_track,
Self::on_mute_change_from_remote_audio_track,
Self::on_active_speakers_changed,
Self::on_did_subscribe_to_remote_video_track,
Self::on_did_unsubscribe_from_remote_video_track,
)
};
Self {
native_delegate,
_weak_room: weak_room,
}
}
extern "C" fn on_did_disconnect(room: *mut c_void) {
let room = unsafe { Weak::from_raw(room as *mut Room) };
if let Some(room) = room.upgrade() {
room.did_disconnect();
}
let _ = Weak::into_raw(room);
}
extern "C" fn on_did_subscribe_to_remote_audio_track(
room: *mut c_void,
publisher_id: CFStringRef,
track_id: CFStringRef,
track: swift::RemoteAudioTrack,
publication: swift::RemoteTrackPublication,
) {
let room = unsafe { Weak::from_raw(room as *mut Room) };
let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
let track = RemoteAudioTrack::new(track, track_id, publisher_id);
let publication = RemoteTrackPublication::new(publication);
if let Some(room) = room.upgrade() {
room.did_subscribe_to_remote_audio_track(track, publication);
}
let _ = Weak::into_raw(room);
}
extern "C" fn on_did_unsubscribe_from_remote_audio_track(
room: *mut c_void,
publisher_id: CFStringRef,
track_id: CFStringRef,
) {
let room = unsafe { Weak::from_raw(room as *mut Room) };
let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
if let Some(room) = room.upgrade() {
room.did_unsubscribe_from_remote_audio_track(publisher_id, track_id);
}
let _ = Weak::into_raw(room);
}
extern "C" fn on_mute_change_from_remote_audio_track(
room: *mut c_void,
track_id: CFStringRef,
muted: bool,
) {
let room = unsafe { Weak::from_raw(room as *mut Room) };
let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
if let Some(room) = room.upgrade() {
room.mute_changed_from_remote_audio_track(track_id, muted);
}
let _ = Weak::into_raw(room);
}
extern "C" fn on_active_speakers_changed(room: *mut c_void, participants: CFArrayRef) {
if participants.is_null() {
return;
}
let room = unsafe { Weak::from_raw(room as *mut Room) };
let speakers = unsafe {
CFArray::wrap_under_get_rule(participants)
.into_iter()
.map(
|speaker: core_foundation::base::ItemRef<'_, *const c_void>| {
CFString::wrap_under_get_rule(*speaker as CFStringRef).to_string()
},
)
.collect()
};
if let Some(room) = room.upgrade() {
room.active_speakers_changed(speakers);
}
let _ = Weak::into_raw(room);
}
extern "C" fn on_did_subscribe_to_remote_video_track(
room: *mut c_void,
publisher_id: CFStringRef,
track_id: CFStringRef,
track: swift::RemoteVideoTrack,
) {
let room = unsafe { Weak::from_raw(room as *mut Room) };
let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
let track = RemoteVideoTrack::new(track, track_id, publisher_id);
if let Some(room) = room.upgrade() {
room.did_subscribe_to_remote_video_track(track);
}
let _ = Weak::into_raw(room);
}
extern "C" fn on_did_unsubscribe_from_remote_video_track(
room: *mut c_void,
publisher_id: CFStringRef,
track_id: CFStringRef,
) {
let room = unsafe { Weak::from_raw(room as *mut Room) };
let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
if let Some(room) = room.upgrade() {
room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
}
let _ = Weak::into_raw(room);
}
}
impl Drop for RoomDelegate {
fn drop(&mut self) {
unsafe {
CFRelease(self.native_delegate.0);
}
}
}
pub struct LocalAudioTrack(swift::LocalAudioTrack);
impl LocalAudioTrack {
pub fn create() -> Self {
Self(unsafe { LKLocalAudioTrackCreateTrack() })
}
}
impl Drop for LocalAudioTrack {
fn drop(&mut self) {
unsafe { CFRelease(self.0 .0) }
}
}
pub struct LocalVideoTrack(swift::LocalVideoTrack);
impl LocalVideoTrack {
pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
}
}
impl Drop for LocalVideoTrack {
fn drop(&mut self) {
unsafe { CFRelease(self.0 .0) }
}
}
pub struct LocalTrackPublication(swift::LocalTrackPublication);
impl LocalTrackPublication {
pub fn new(native_track_publication: swift::LocalTrackPublication) -> Self {
unsafe {
CFRetain(native_track_publication.0);
}
Self(native_track_publication)
}
pub fn set_mute(&self, muted: bool) -> impl Future<Output = Result<()>> {
let (tx, rx) = futures::channel::oneshot::channel();
extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) {
let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender<Result<()>>) };
if error.is_null() {
tx.send(Ok(())).ok();
} else {
let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
tx.send(Err(anyhow!(error))).ok();
}
}
unsafe {
LKLocalTrackPublicationSetMute(
self.0,
muted,
complete_callback,
Box::into_raw(Box::new(tx)) as *mut c_void,
)
}
async move { rx.await.unwrap() }
}
}
impl Drop for LocalTrackPublication {
fn drop(&mut self) {
unsafe { CFRelease(self.0 .0) }
}
}
pub struct RemoteTrackPublication {
native_publication: Mutex<swift::RemoteTrackPublication>,
}
impl RemoteTrackPublication {
pub fn new(native_track_publication: swift::RemoteTrackPublication) -> Self {
unsafe {
CFRetain(native_track_publication.0);
}
Self {
native_publication: Mutex::new(native_track_publication),
}
}
pub fn sid(&self) -> String {
unsafe {
CFString::wrap_under_get_rule(LKRemoteTrackPublicationGetSid(
*self.native_publication.lock(),
))
.to_string()
}
}
pub fn is_muted(&self) -> bool {
unsafe { LKRemoteTrackPublicationIsMuted(*self.native_publication.lock()) }
}
pub fn set_enabled(&self, enabled: bool) -> impl Future<Output = Result<()>> {
let (tx, rx) = futures::channel::oneshot::channel();
extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) {
let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender<Result<()>>) };
if error.is_null() {
tx.send(Ok(())).ok();
} else {
let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
tx.send(Err(anyhow!(error))).ok();
}
}
unsafe {
LKRemoteTrackPublicationSetEnabled(
*self.native_publication.lock(),
enabled,
complete_callback,
Box::into_raw(Box::new(tx)) as *mut c_void,
)
}
async move { rx.await.unwrap() }
}
}
impl Drop for RemoteTrackPublication {
fn drop(&mut self) {
unsafe { CFRelease((*self.native_publication.lock()).0) }
}
}
#[derive(Debug)]
pub struct RemoteAudioTrack {
native_track: Mutex<swift::RemoteAudioTrack>,
sid: Sid,
publisher_id: String,
}
impl RemoteAudioTrack {
fn new(native_track: swift::RemoteAudioTrack, sid: Sid, publisher_id: String) -> Self {
unsafe {
CFRetain(native_track.0);
}
Self {
native_track: Mutex::new(native_track),
sid,
publisher_id,
}
}
pub fn sid(&self) -> &str {
&self.sid
}
pub fn publisher_id(&self) -> &str {
&self.publisher_id
}
pub fn enable(&self) -> impl Future<Output = Result<()>> {
async { Ok(()) }
}
pub fn disable(&self) -> impl Future<Output = Result<()>> {
async { Ok(()) }
}
}
impl Drop for RemoteAudioTrack {
fn drop(&mut self) {
unsafe { CFRelease(self.native_track.lock().0) }
}
}
#[derive(Debug)]
pub struct RemoteVideoTrack {
native_track: Mutex<swift::RemoteVideoTrack>,
sid: Sid,
publisher_id: String,
}
impl RemoteVideoTrack {
fn new(native_track: swift::RemoteVideoTrack, sid: Sid, publisher_id: String) -> Self {
unsafe {
CFRetain(native_track.0);
}
Self {
native_track: Mutex::new(native_track),
sid,
publisher_id,
}
}
pub fn sid(&self) -> &str {
&self.sid
}
pub fn publisher_id(&self) -> &str {
&self.publisher_id
}
pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
unsafe {
let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
let buffer = CVImageBuffer::wrap_under_get_rule(frame);
let result = tx.try_broadcast(Frame(buffer));
let _ = Box::into_raw(tx);
match result {
Ok(_) => true,
Err(async_broadcast::TrySendError::Closed(_))
| Err(async_broadcast::TrySendError::Inactive(_)) => {
log::warn!("no active receiver for frame");
false
}
Err(async_broadcast::TrySendError::Full(_)) => {
log::warn!("skipping frame as receiver is not keeping up");
true
}
}
}
}
extern "C" fn on_drop(callback_data: *mut c_void) {
unsafe {
let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
}
}
let (tx, rx) = async_broadcast::broadcast(64);
unsafe {
let renderer = LKVideoRendererCreate(
Box::into_raw(Box::new(tx)) as *mut c_void,
on_frame,
on_drop,
);
LKVideoTrackAddRenderer(*self.native_track.lock(), renderer);
rx
}
}
}
impl Drop for RemoteVideoTrack {
fn drop(&mut self) {
unsafe { CFRelease(self.native_track.lock().0) }
}
}
pub enum RemoteVideoTrackUpdate {
Subscribed(Arc<RemoteVideoTrack>),
Unsubscribed { publisher_id: Sid, track_id: Sid },
}
pub enum RemoteAudioTrackUpdate {
ActiveSpeakersChanged { speakers: Vec<Sid> },
MuteChanged { track_id: Sid, muted: bool },
Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
Unsubscribed { publisher_id: Sid, track_id: Sid },
}
pub struct MacOSDisplay(swift::MacOSDisplay);
impl MacOSDisplay {
fn new(ptr: swift::MacOSDisplay) -> Self {
unsafe {
CFRetain(ptr.0);
}
Self(ptr)
}
}
impl Drop for MacOSDisplay {
fn drop(&mut self) {
unsafe { CFRelease(self.0 .0) }
}
}
#[derive(Clone)]
pub struct Frame(CVImageBuffer);
impl Frame {
pub fn width(&self) -> usize {
self.0.width()
}
pub fn height(&self) -> usize {
self.0.height()
}
pub fn image(&self) -> CVImageBuffer {
self.0.clone()
}
}

View File

@ -0,0 +1,651 @@
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use collections::{BTreeMap, HashMap};
use futures::Stream;
use gpui2::Executor;
use live_kit_server::token;
use media::core_video::CVImageBuffer;
use parking_lot::Mutex;
use postage::watch;
use std::{future::Future, mem, sync::Arc};
static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
pub struct TestServer {
pub url: String,
pub api_key: String,
pub secret_key: String,
rooms: Mutex<HashMap<String, TestServerRoom>>,
executor: Arc<Executor>,
}
impl TestServer {
pub fn create(
url: String,
api_key: String,
secret_key: String,
executor: Arc<Executor>,
) -> Result<Arc<TestServer>> {
let mut servers = SERVERS.lock();
if servers.contains_key(&url) {
Err(anyhow!("a server with url {:?} already exists", url))
} else {
let server = Arc::new(TestServer {
url: url.clone(),
api_key,
secret_key,
rooms: Default::default(),
executor,
});
servers.insert(url, server.clone());
Ok(server)
}
}
fn get(url: &str) -> Result<Arc<TestServer>> {
Ok(SERVERS
.lock()
.get(url)
.ok_or_else(|| anyhow!("no server found for url"))?
.clone())
}
pub fn teardown(&self) -> Result<()> {
SERVERS
.lock()
.remove(&self.url)
.ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
Ok(())
}
pub fn create_api_client(&self) -> TestApiClient {
TestApiClient {
url: self.url.clone(),
}
}
pub async fn create_room(&self, room: String) -> Result<()> {
self.executor.simulate_random_delay().await;
let mut server_rooms = self.rooms.lock();
if server_rooms.contains_key(&room) {
Err(anyhow!("room {:?} already exists", room))
} else {
server_rooms.insert(room, Default::default());
Ok(())
}
}
async fn delete_room(&self, room: String) -> Result<()> {
// TODO: clear state associated with all `Room`s.
self.executor.simulate_random_delay().await;
let mut server_rooms = self.rooms.lock();
server_rooms
.remove(&room)
.ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
Ok(())
}
async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
self.executor.simulate_random_delay().await;
let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let identity = claims.sub.unwrap().to_string();
let room_name = claims.video.room.unwrap();
let mut server_rooms = self.rooms.lock();
let room = (*server_rooms).entry(room_name.to_string()).or_default();
if room.client_rooms.contains_key(&identity) {
Err(anyhow!(
"{:?} attempted to join room {:?} twice",
identity,
room_name
))
} else {
for track in &room.video_tracks {
client_room
.0
.lock()
.video_track_updates
.0
.try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
.unwrap();
}
room.client_rooms.insert(identity, client_room);
Ok(())
}
}
async fn leave_room(&self, token: String) -> Result<()> {
self.executor.simulate_random_delay().await;
let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let identity = claims.sub.unwrap().to_string();
let room_name = claims.video.room.unwrap();
let mut server_rooms = self.rooms.lock();
let room = server_rooms
.get_mut(&*room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
room.client_rooms.remove(&identity).ok_or_else(|| {
anyhow!(
"{:?} attempted to leave room {:?} before joining it",
identity,
room_name
)
})?;
Ok(())
}
async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
// TODO: clear state associated with the `Room`.
self.executor.simulate_random_delay().await;
let mut server_rooms = self.rooms.lock();
let room = server_rooms
.get_mut(&room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
room.client_rooms.remove(&identity).ok_or_else(|| {
anyhow!(
"participant {:?} did not join room {:?}",
identity,
room_name
)
})?;
Ok(())
}
pub async fn disconnect_client(&self, client_identity: String) {
self.executor.simulate_random_delay().await;
let mut server_rooms = self.rooms.lock();
for room in server_rooms.values_mut() {
if let Some(room) = room.client_rooms.remove(&client_identity) {
*room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
}
}
}
async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
self.executor.simulate_random_delay().await;
let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let identity = claims.sub.unwrap().to_string();
let room_name = claims.video.room.unwrap();
let mut server_rooms = self.rooms.lock();
let room = server_rooms
.get_mut(&*room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
let track = Arc::new(RemoteVideoTrack {
sid: nanoid::nanoid!(17),
publisher_id: identity.clone(),
frames_rx: local_track.frames_rx.clone(),
});
room.video_tracks.push(track.clone());
for (id, client_room) in &room.client_rooms {
if *id != identity {
let _ = client_room
.0
.lock()
.video_track_updates
.0
.try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
.unwrap();
}
}
Ok(())
}
async fn publish_audio_track(
&self,
token: String,
_local_track: &LocalAudioTrack,
) -> Result<()> {
self.executor.simulate_random_delay().await;
let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let identity = claims.sub.unwrap().to_string();
let room_name = claims.video.room.unwrap();
let mut server_rooms = self.rooms.lock();
let room = server_rooms
.get_mut(&*room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
let track = Arc::new(RemoteAudioTrack {
sid: nanoid::nanoid!(17),
publisher_id: identity.clone(),
});
let publication = Arc::new(RemoteTrackPublication);
room.audio_tracks.push(track.clone());
for (id, client_room) in &room.client_rooms {
if *id != identity {
let _ = client_room
.0
.lock()
.audio_track_updates
.0
.try_broadcast(RemoteAudioTrackUpdate::Subscribed(
track.clone(),
publication.clone(),
))
.unwrap();
}
}
Ok(())
}
fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let room_name = claims.video.room.unwrap();
let mut server_rooms = self.rooms.lock();
let room = server_rooms
.get_mut(&*room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
Ok(room.video_tracks.clone())
}
fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let room_name = claims.video.room.unwrap();
let mut server_rooms = self.rooms.lock();
let room = server_rooms
.get_mut(&*room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
Ok(room.audio_tracks.clone())
}
}
#[derive(Default)]
struct TestServerRoom {
client_rooms: HashMap<Sid, Arc<Room>>,
video_tracks: Vec<Arc<RemoteVideoTrack>>,
audio_tracks: Vec<Arc<RemoteAudioTrack>>,
}
impl TestServerRoom {}
pub struct TestApiClient {
url: String,
}
#[async_trait]
impl live_kit_server::api::Client for TestApiClient {
fn url(&self) -> &str {
&self.url
}
async fn create_room(&self, name: String) -> Result<()> {
let server = TestServer::get(&self.url)?;
server.create_room(name).await?;
Ok(())
}
async fn delete_room(&self, name: String) -> Result<()> {
let server = TestServer::get(&self.url)?;
server.delete_room(name).await?;
Ok(())
}
async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
let server = TestServer::get(&self.url)?;
server.remove_participant(room, identity).await?;
Ok(())
}
fn room_token(&self, room: &str, identity: &str) -> Result<String> {
let server = TestServer::get(&self.url)?;
token::create(
&server.api_key,
&server.secret_key,
Some(identity),
token::VideoGrant::to_join(room),
)
}
fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
let server = TestServer::get(&self.url)?;
token::create(
&server.api_key,
&server.secret_key,
Some(identity),
token::VideoGrant::for_guest(room),
)
}
}
pub type Sid = String;
struct RoomState {
connection: (
watch::Sender<ConnectionState>,
watch::Receiver<ConnectionState>,
),
display_sources: Vec<MacOSDisplay>,
audio_track_updates: (
async_broadcast::Sender<RemoteAudioTrackUpdate>,
async_broadcast::Receiver<RemoteAudioTrackUpdate>,
),
video_track_updates: (
async_broadcast::Sender<RemoteVideoTrackUpdate>,
async_broadcast::Receiver<RemoteVideoTrackUpdate>,
),
}
#[derive(Clone, Eq, PartialEq)]
pub enum ConnectionState {
Disconnected,
Connected { url: String, token: String },
}
pub struct Room(Mutex<RoomState>);
impl Room {
pub fn new() -> Arc<Self> {
Arc::new(Self(Mutex::new(RoomState {
connection: watch::channel_with(ConnectionState::Disconnected),
display_sources: Default::default(),
video_track_updates: async_broadcast::broadcast(128),
audio_track_updates: async_broadcast::broadcast(128),
})))
}
pub fn status(&self) -> watch::Receiver<ConnectionState> {
self.0.lock().connection.1.clone()
}
pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
let this = self.clone();
let url = url.to_string();
let token = token.to_string();
async move {
let server = TestServer::get(&url)?;
server
.join_room(token.clone(), this.clone())
.await
.context("room join")?;
*this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
Ok(())
}
}
pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
let this = self.clone();
async move {
let server = this.test_server();
server.executor.simulate_random_delay().await;
Ok(this.0.lock().display_sources.clone())
}
}
pub fn publish_video_track(
self: &Arc<Self>,
track: LocalVideoTrack,
) -> impl Future<Output = Result<LocalTrackPublication>> {
let this = self.clone();
let track = track.clone();
async move {
this.test_server()
.publish_video_track(this.token(), track)
.await?;
Ok(LocalTrackPublication)
}
}
pub fn publish_audio_track(
self: &Arc<Self>,
track: LocalAudioTrack,
) -> impl Future<Output = Result<LocalTrackPublication>> {
let this = self.clone();
let track = track.clone();
async move {
this.test_server()
.publish_audio_track(this.token(), &track)
.await?;
Ok(LocalTrackPublication)
}
}
pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
if !self.is_connected() {
return Vec::new();
}
self.test_server()
.audio_tracks(self.token())
.unwrap()
.into_iter()
.filter(|track| track.publisher_id() == publisher_id)
.collect()
}
pub fn remote_audio_track_publications(
&self,
publisher_id: &str,
) -> Vec<Arc<RemoteTrackPublication>> {
if !self.is_connected() {
return Vec::new();
}
self.test_server()
.audio_tracks(self.token())
.unwrap()
.into_iter()
.filter(|track| track.publisher_id() == publisher_id)
.map(|_track| Arc::new(RemoteTrackPublication {}))
.collect()
}
pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
if !self.is_connected() {
return Vec::new();
}
self.test_server()
.video_tracks(self.token())
.unwrap()
.into_iter()
.filter(|track| track.publisher_id() == publisher_id)
.collect()
}
pub fn remote_audio_track_updates(&self) -> impl Stream<Item = RemoteAudioTrackUpdate> {
self.0.lock().audio_track_updates.1.clone()
}
pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
self.0.lock().video_track_updates.1.clone()
}
pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
self.0.lock().display_sources = sources;
}
fn test_server(&self) -> Arc<TestServer> {
match self.0.lock().connection.1.borrow().clone() {
ConnectionState::Disconnected => panic!("must be connected to call this method"),
ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
}
}
fn token(&self) -> String {
match self.0.lock().connection.1.borrow().clone() {
ConnectionState::Disconnected => panic!("must be connected to call this method"),
ConnectionState::Connected { token, .. } => token,
}
}
fn is_connected(&self) -> bool {
match *self.0.lock().connection.1.borrow() {
ConnectionState::Disconnected => false,
ConnectionState::Connected { .. } => true,
}
}
}
impl Drop for Room {
fn drop(&mut self) {
if let ConnectionState::Connected { token, .. } = mem::replace(
&mut *self.0.lock().connection.0.borrow_mut(),
ConnectionState::Disconnected,
) {
if let Ok(server) = TestServer::get(&token) {
let executor = server.executor.clone();
executor
.spawn(async move { server.leave_room(token).await.unwrap() })
.detach();
}
}
}
}
pub struct LocalTrackPublication;
impl LocalTrackPublication {
pub fn set_mute(&self, _mute: bool) -> impl Future<Output = Result<()>> {
async { Ok(()) }
}
}
pub struct RemoteTrackPublication;
impl RemoteTrackPublication {
pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
async { Ok(()) }
}
pub fn is_muted(&self) -> bool {
false
}
pub fn sid(&self) -> String {
"".to_string()
}
}
#[derive(Clone)]
pub struct LocalVideoTrack {
frames_rx: async_broadcast::Receiver<Frame>,
}
impl LocalVideoTrack {
pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
Self {
frames_rx: display.frames.1.clone(),
}
}
}
#[derive(Clone)]
pub struct LocalAudioTrack;
impl LocalAudioTrack {
pub fn create() -> Self {
Self
}
}
#[derive(Debug)]
pub struct RemoteVideoTrack {
sid: Sid,
publisher_id: Sid,
frames_rx: async_broadcast::Receiver<Frame>,
}
impl RemoteVideoTrack {
pub fn sid(&self) -> &str {
&self.sid
}
pub fn publisher_id(&self) -> &str {
&self.publisher_id
}
pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
self.frames_rx.clone()
}
}
#[derive(Debug)]
pub struct RemoteAudioTrack {
sid: Sid,
publisher_id: Sid,
}
impl RemoteAudioTrack {
pub fn sid(&self) -> &str {
&self.sid
}
pub fn publisher_id(&self) -> &str {
&self.publisher_id
}
pub fn enable(&self) -> impl Future<Output = Result<()>> {
async { Ok(()) }
}
pub fn disable(&self) -> impl Future<Output = Result<()>> {
async { Ok(()) }
}
}
#[derive(Clone)]
pub enum RemoteVideoTrackUpdate {
Subscribed(Arc<RemoteVideoTrack>),
Unsubscribed { publisher_id: Sid, track_id: Sid },
}
#[derive(Clone)]
pub enum RemoteAudioTrackUpdate {
ActiveSpeakersChanged { speakers: Vec<Sid> },
MuteChanged { track_id: Sid, muted: bool },
Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
Unsubscribed { publisher_id: Sid, track_id: Sid },
}
#[derive(Clone)]
pub struct MacOSDisplay {
frames: (
async_broadcast::Sender<Frame>,
async_broadcast::Receiver<Frame>,
),
}
impl MacOSDisplay {
pub fn new() -> Self {
Self {
frames: async_broadcast::broadcast(128),
}
}
pub fn send_frame(&self, frame: Frame) {
self.frames.0.try_broadcast(frame).unwrap();
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Frame {
pub label: String,
pub width: usize,
pub height: usize,
}
impl Frame {
pub fn width(&self) -> usize {
self.width
}
pub fn height(&self) -> usize {
self.height
}
pub fn image(&self) -> CVImageBuffer {
unimplemented!("you can't call this in test mode")
}
}