From 99aa1219d2177fe8c8ba869d4189635c77c7ef96 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 20 Oct 2022 09:51:55 +0200 Subject: [PATCH] Simplify renderer interface for live-kit-client --- crates/call/src/room.rs | 54 +++++++++---------- crates/live_kit_client/Cargo.toml | 5 +- .../Sources/LiveKitBridge/LiveKitBridge.swift | 16 ++++-- crates/live_kit_client/examples/test_app.rs | 1 - crates/live_kit_client/src/prod.rs | 44 +++++++++------ crates/live_kit_client/src/test.rs | 20 ++----- 6 files changed, 70 insertions(+), 70 deletions(-) diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 399b0e857e..98b223281c 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -8,7 +8,6 @@ use collections::{BTreeMap, HashSet}; use futures::StreamExt; use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task}; use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate}; -use postage::watch; use project::Project; use std::{mem, os::unix::prelude::OsStrExt, sync::Arc}; use util::{post_inc, ResultExt}; @@ -409,42 +408,39 @@ impl Room { .remote_participants .get_mut(&peer_id) .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?; - let (mut tx, mut rx) = watch::channel(); - track.add_renderer(move |frame| *tx.borrow_mut() = Some(frame)); + let mut frames = track.frames(); participant.tracks.insert( track_id.clone(), RemoteVideoTrack { frame: None, _live_kit_track: track, _maintain_frame: Arc::new(cx.spawn_weak(|this, mut cx| async move { - while let Some(frame) = rx.next().await { - if let Some(frame) = frame { - let this = if let Some(this) = this.upgrade(&cx) { - this + while let Some(frame) = frames.next().await { + let this = if let Some(this) = this.upgrade(&cx) { + this + } else { + break; + }; + + let done = this.update(&mut cx, |this, cx| { + if let Some(track) = + this.remote_participants.get_mut(&peer_id).and_then( + |participant| participant.tracks.get_mut(&track_id), + ) + { + track.frame = Some(frame); + cx.emit(Event::Frame { + participant_id: peer_id, + track_id: track_id.clone(), + }); + false } else { - break; - }; - - let done = this.update(&mut cx, |this, cx| { - if let Some(track) = - this.remote_participants.get_mut(&peer_id).and_then( - |participant| participant.tracks.get_mut(&track_id), - ) - { - track.frame = Some(frame); - cx.emit(Event::Frame { - participant_id: peer_id, - track_id: track_id.clone(), - }); - false - } else { - true - } - }); - - if done { - break; + true } + }); + + if done { + break; } } })), diff --git a/crates/live_kit_client/Cargo.toml b/crates/live_kit_client/Cargo.toml index cd8006314c..ce555ccbc3 100644 --- a/crates/live_kit_client/Cargo.toml +++ b/crates/live_kit_client/Cargo.toml @@ -13,7 +13,6 @@ name = "test_app" [features] test-support = [ - "async-broadcast", "async-trait", "collections/test-support", "gpui/test-support", @@ -29,12 +28,13 @@ live_kit_server = { path = "../live_kit_server", optional = true } media = { path = "../media" } anyhow = "1.0.38" +async-broadcast = "0.4" core-foundation = "0.9.3" core-graphics = "0.22.3" futures = "0.3" +log = { version = "0.4.16", features = ["kv_unstable_serde"] } parking_lot = "0.11.1" -async-broadcast = { version = "0.4", optional = true } async-trait = { version = "0.1", optional = true } lazy_static = { version = "1.4", optional = true } nanoid = { version ="0.4", optional = true} @@ -58,7 +58,6 @@ futures = "0.3" hmac = "0.12" jwt = "0.16" lazy_static = "1.4" -log = { version = "0.4.16", features = ["kv_unstable_serde"] } objc = "0.2" parking_lot = "0.11.1" postage = { version = "0.4.1", features = ["futures-traits"] } diff --git a/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift b/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift index 8bfa98b522..af6485ce69 100644 --- a/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift +++ b/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift @@ -28,12 +28,13 @@ class LKRoomDelegate: RoomDelegate { class LKVideoRenderer: NSObject, VideoRenderer { var data: UnsafeRawPointer - var onFrame: @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Void + var onFrame: @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool var onDrop: @convention(c) (UnsafeRawPointer) -> Void var adaptiveStreamIsEnabled: Bool = false var adaptiveStreamSize: CGSize = .zero + weak var track: VideoTrack? - init(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Void, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) { + init(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) { self.data = data self.onFrame = onFrame self.onDrop = onDrop @@ -50,7 +51,11 @@ class LKVideoRenderer: NSObject, VideoRenderer { func renderFrame(_ frame: RTCVideoFrame?) { let buffer = frame?.buffer as? RTCCVPixelBuffer if let pixelBuffer = buffer?.pixelBuffer { - self.onFrame(self.data, pixelBuffer) + if !self.onFrame(self.data, pixelBuffer) { + DispatchQueue.main.async { + self.track?.remove(videoRenderer: self) + } + } } } } @@ -99,7 +104,7 @@ public func LKRoomPublishVideoTrack(room: UnsafeRawPointer, track: UnsafeRawPoin public func LKRoomUnpublishTrack(room: UnsafeRawPointer, publication: UnsafeRawPointer) { let room = Unmanaged.fromOpaque(room).takeUnretainedValue() let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() - room.localParticipant?.unpublish(publication: publication) + let _ = room.localParticipant?.unpublish(publication: publication) } @_cdecl("LKRoomVideoTracksForRemoteParticipant") @@ -123,7 +128,7 @@ public func LKCreateScreenShareTrackForDisplay(display: UnsafeMutableRawPointer) } @_cdecl("LKVideoRendererCreate") -public func LKVideoRendererCreate(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Void, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) -> UnsafeMutableRawPointer { +public func LKVideoRendererCreate(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) -> UnsafeMutableRawPointer { Unmanaged.passRetained(LKVideoRenderer(data: data, onFrame: onFrame, onDrop: onDrop)).toOpaque() } @@ -131,6 +136,7 @@ public func LKVideoRendererCreate(data: UnsafeRawPointer, onFrame: @escaping @co public func LKVideoTrackAddRenderer(track: UnsafeRawPointer, renderer: UnsafeRawPointer) { let track = Unmanaged.fromOpaque(track).takeUnretainedValue() as! VideoTrack let renderer = Unmanaged.fromOpaque(renderer).takeRetainedValue() + renderer.track = track track.add(videoRenderer: renderer) } diff --git a/crates/live_kit_client/examples/test_app.rs b/crates/live_kit_client/examples/test_app.rs index 7ad1eee967..eddee785bc 100644 --- a/crates/live_kit_client/examples/test_app.rs +++ b/crates/live_kit_client/examples/test_app.rs @@ -60,7 +60,6 @@ fn main() { let remote_tracks = room_b.remote_video_tracks("test-participant-1"); assert_eq!(remote_tracks.len(), 1); assert_eq!(remote_tracks[0].publisher_id(), "test-participant-1"); - dbg!(track.sid()); assert_eq!(track.publisher_id(), "test-participant-1"); } else { panic!("unexpected message"); diff --git a/crates/live_kit_client/src/prod.rs b/crates/live_kit_client/src/prod.rs index 15f5faacca..35a5705a24 100644 --- a/crates/live_kit_client/src/prod.rs +++ b/crates/live_kit_client/src/prod.rs @@ -55,7 +55,7 @@ extern "C" { fn LKVideoRendererCreate( callback_data: *mut c_void, - on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef), + on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool, on_drop: extern "C" fn(callback_data: *mut c_void), ) -> *const c_void; @@ -364,32 +364,43 @@ impl RemoteVideoTrack { &self.publisher_id } - pub fn add_renderer(&self, callback: F) - where - F: 'static + Send + Sync + FnMut(Frame), - { - extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) - where - F: FnMut(Frame), - { + pub fn frames(&self) -> async_broadcast::Receiver { + extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool { unsafe { + let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender); let buffer = CVImageBuffer::wrap_under_get_rule(frame); - let callback = &mut *(callback_data as *mut F); - callback(Frame(buffer)); + let result = tx.try_broadcast(Frame(buffer)); + let _ = Box::into_raw(tx); + match result { + Ok(_) => true, + Err(async_broadcast::TrySendError::Closed(_)) + | Err(async_broadcast::TrySendError::Inactive(_)) => { + log::warn!("no active receiver for frame"); + false + } + Err(async_broadcast::TrySendError::Full(_)) => { + log::warn!("skipping frame as receiver is not keeping up"); + true + } + } } } - extern "C" fn on_drop(callback_data: *mut c_void) { + extern "C" fn on_drop(callback_data: *mut c_void) { unsafe { - let _ = Box::from_raw(callback_data as *mut F); + let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender); } } - let callback_data = Box::into_raw(Box::new(callback)); + let (tx, rx) = async_broadcast::broadcast(64); unsafe { - let renderer = - LKVideoRendererCreate(callback_data as *mut c_void, on_frame::, on_drop::); + let renderer = LKVideoRendererCreate( + Box::into_raw(Box::new(tx)) as *mut c_void, + on_frame, + on_drop, + ); LKVideoTrackAddRenderer(self.native_track, renderer); + rx } } } @@ -422,6 +433,7 @@ impl Drop for MacOSDisplay { } } +#[derive(Clone)] pub struct Frame(CVImageBuffer); impl Frame { diff --git a/crates/live_kit_client/src/test.rs b/crates/live_kit_client/src/test.rs index 6a1a6ee2d6..23076e13a5 100644 --- a/crates/live_kit_client/src/test.rs +++ b/crates/live_kit_client/src/test.rs @@ -1,8 +1,8 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use collections::HashMap; -use futures::{Stream, StreamExt}; -use gpui::executor::{self, Background}; +use futures::Stream; +use gpui::executor::Background; use lazy_static::lazy_static; use live_kit_server::token; use media::core_video::CVImageBuffer; @@ -160,7 +160,6 @@ impl TestServer { sid: nanoid::nanoid!(17), publisher_id: identity.clone(), frames_rx: local_track.frames_rx.clone(), - background: self.background.clone(), })); for (id, client_room) in &room.client_rooms { @@ -353,7 +352,6 @@ pub struct RemoteVideoTrack { sid: Sid, publisher_id: Sid, frames_rx: async_broadcast::Receiver, - background: Arc, } impl RemoteVideoTrack { @@ -365,18 +363,8 @@ impl RemoteVideoTrack { &self.publisher_id } - pub fn add_renderer(&self, mut callback: F) - where - F: 'static + Send + Sync + FnMut(Frame), - { - let mut frames_rx = self.frames_rx.clone(); - self.background - .spawn(async move { - while let Some(frame) = frames_rx.next().await { - callback(frame) - } - }) - .detach(); + pub fn frames(&self) -> async_broadcast::Receiver { + self.frames_rx.clone() } }