diff --git a/Cargo.lock b/Cargo.lock index 68a7ed2b82..fa167e245f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1066,6 +1066,7 @@ dependencies = [ "language", "lazy_static", "lipsum", + "live_kit_client", "live_kit_server", "log", "lsp", @@ -3163,6 +3164,7 @@ name = "live_kit_client" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "block", "byteorder", "bytes 1.2.1", @@ -3193,6 +3195,7 @@ name = "live_kit_server" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "futures 0.3.24", "hmac 0.12.1", "jwt", diff --git a/crates/collab/Cargo.toml b/crates/collab/Cargo.toml index 8ce42afb64..6145afad48 100644 --- a/crates/collab/Cargo.toml +++ b/crates/collab/Cargo.toml @@ -62,15 +62,17 @@ editor = { path = "../editor", features = ["test-support"] } language = { path = "../language", features = ["test-support"] } fs = { path = "../fs", features = ["test-support"] } git = { path = "../git", features = ["test-support"] } -log = { version = "0.4.16", features = ["kv_unstable_serde"] } +live_kit_client = { path = "../live_kit_client", features = ["test-support"] } lsp = { path = "../lsp", features = ["test-support"] } project = { path = "../project", features = ["test-support"] } rpc = { path = "../rpc", features = ["test-support"] } settings = { path = "../settings", features = ["test-support"] } theme = { path = "../theme" } workspace = { path = "../workspace", features = ["test-support"] } + ctor = "0.1" env_logger = "0.9" +log = { version = "0.4.16", features = ["kv_unstable_serde"] } util = { path = "../util" } lazy_static = "1.4" serde_json = { version = "1.0", features = ["preserve_order"] } diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index cd08dd8632..7cd964cd8a 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -47,7 +47,7 @@ use std::{ path::{Path, PathBuf}, rc::Rc, sync::{ - atomic::{AtomicBool, Ordering::SeqCst}, + atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst}, Arc, }, time::Duration, @@ -6138,6 +6138,7 @@ struct TestServer { connection_killers: Arc>>>, forbid_connections: Arc, _test_db: TestDb, + test_live_kit_server: Arc, } impl TestServer { @@ -6145,8 +6146,18 @@ impl TestServer { foreground: Rc, background: Arc, ) -> Self { + static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0); + let test_db = TestDb::fake(background.clone()); - let app_state = Self::build_app_state(&test_db).await; + let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst); + let live_kit_server = live_kit_client::TestServer::create( + format!("http://livekit.{}.test", live_kit_server_id), + format!("devkey-{}", live_kit_server_id), + format!("secret-{}", live_kit_server_id), + background.clone(), + ) + .unwrap(); + let app_state = Self::build_app_state(&test_db, &live_kit_server).await; let peer = Peer::new(); let notifications = mpsc::unbounded(); let server = Server::new(app_state.clone(), Some(notifications.0)); @@ -6159,6 +6170,7 @@ impl TestServer { connection_killers: Default::default(), forbid_connections: Default::default(), _test_db: test_db, + test_live_kit_server: live_kit_server, } } @@ -6354,10 +6366,13 @@ impl TestServer { } } - async fn build_app_state(test_db: &TestDb) -> Arc { + async fn build_app_state( + test_db: &TestDb, + fake_server: &live_kit_client::TestServer, + ) -> Arc { Arc::new(AppState { db: test_db.db().clone(), - live_kit_client: None, + live_kit_client: Some(Arc::new(fake_server.create_api_client())), api_token: Default::default(), invite_link_prefix: Default::default(), }) @@ -6390,6 +6405,7 @@ impl Deref for TestServer { impl Drop for TestServer { fn drop(&mut self) { self.peer.reset(); + self.test_live_kit_server.teardown().unwrap(); } } diff --git a/crates/collab/src/main.rs b/crates/collab/src/main.rs index 49a82bb926..bc860abf62 100644 --- a/crates/collab/src/main.rs +++ b/crates/collab/src/main.rs @@ -41,7 +41,7 @@ pub struct AppState { db: Arc, api_token: String, invite_link_prefix: String, - live_kit_client: Option, + live_kit_client: Option>, } impl AppState { @@ -53,11 +53,11 @@ impl AppState { .zip(config.live_kit_key.as_ref()) .zip(config.live_kit_secret.as_ref()) { - Some(live_kit_server::api::Client::new( + Some(Arc::new(live_kit_server::api::LiveKitClient::new( server.clone(), key.clone(), secret.clone(), - )) + )) as Arc) } else { None }; diff --git a/crates/live_kit_client/Cargo.toml b/crates/live_kit_client/Cargo.toml index 7b77386fde..ee2c3f88dc 100644 --- a/crates/live_kit_client/Cargo.toml +++ b/crates/live_kit_client/Cargo.toml @@ -12,7 +12,13 @@ doctest = false name = "test_app" [features] -test-support = ["collections/test-support", "gpui/test-support", "lazy_static", "live_kit_server"] +test-support = [ + "async-trait", + "collections/test-support", + "gpui/test-support", + "lazy_static", + "live_kit_server" +] [dependencies] collections = { path = "../collections", optional = true } @@ -21,6 +27,7 @@ live_kit_server = { path = "../live_kit_server", optional = true } media = { path = "../media" } anyhow = "1.0.38" +async-trait = { version = "0.1", optional = true } core-foundation = "0.9.3" core-graphics = "0.22.3" futures = "0.3" @@ -30,11 +37,11 @@ parking_lot = "0.11.1" [dev-dependencies] collections = { path = "../collections", features = ["test-support"] } gpui = { path = "../gpui", features = ["test-support"] } -lazy_static = "1.4" live_kit_server = { path = "../live_kit_server" } media = { path = "../media" } anyhow = "1.0.38" +async-trait = "0.1" block = "0.1" bytes = "1.2" byteorder = "1.4" @@ -45,6 +52,7 @@ foreign-types = "0.3" futures = "0.3" hmac = "0.12" jwt = "0.16" +lazy_static = "1.4" log = { version = "0.4.16", features = ["kv_unstable_serde"] } objc = "0.2" parking_lot = "0.11.1" diff --git a/crates/live_kit_client/examples/test_app.rs b/crates/live_kit_client/examples/test_app.rs index 63f15c2685..b4f037cead 100644 --- a/crates/live_kit_client/examples/test_app.rs +++ b/crates/live_kit_client/examples/test_app.rs @@ -7,7 +7,10 @@ use gpui::{ Menu, MenuItem, ViewContext, }; use live_kit_client::{LocalVideoTrack, RemoteVideoTrackUpdate, Room}; -use live_kit_server::token::{self, VideoGrant}; +use live_kit_server::{ + api::Client, + token::{self, VideoGrant}, +}; use log::LevelFilter; use media::core_video::CVImageBuffer; use postage::watch; diff --git a/crates/live_kit_client/src/test.rs b/crates/live_kit_client/src/test.rs index 50be090c80..050cfd0a47 100644 --- a/crates/live_kit_client/src/test.rs +++ b/crates/live_kit_client/src/test.rs @@ -1,35 +1,40 @@ use anyhow::{anyhow, Result}; +use async_trait::async_trait; use collections::HashMap; use futures::{channel::mpsc, future}; use gpui::executor::Background; use lazy_static::lazy_static; +use live_kit_server::token; use media::core_video::CVImageBuffer; use parking_lot::Mutex; use std::{future::Future, sync::Arc}; lazy_static! { - static ref SERVERS: Mutex>> = Default::default(); + static ref SERVERS: Mutex>> = Default::default(); } -pub struct FakeServer { - url: String, - secret_key: String, - rooms: Mutex>, +pub struct TestServer { + pub url: String, + pub api_key: String, + pub secret_key: String, + rooms: Mutex>, background: Arc, } -impl FakeServer { +impl TestServer { pub fn create( url: String, + api_key: String, secret_key: String, background: Arc, - ) -> Result> { + ) -> Result> { let mut servers = SERVERS.lock(); if servers.contains_key(&url) { Err(anyhow!("a server with url {:?} already exists", url)) } else { - let server = Arc::new(FakeServer { + let server = Arc::new(TestServer { url: url.clone(), + api_key, secret_key, rooms: Default::default(), background, @@ -39,7 +44,7 @@ impl FakeServer { } } - fn get(url: &str) -> Result> { + fn get(url: &str) -> Result> { Ok(SERVERS .lock() .get(url) @@ -55,33 +60,151 @@ impl FakeServer { Ok(()) } + pub fn create_api_client(&self) -> TestApiClient { + TestApiClient { + url: self.url.clone(), + } + } + + async fn create_room(&self, room: String) -> Result<()> { + self.background.simulate_random_delay().await; + let mut server_rooms = self.rooms.lock(); + if server_rooms.contains_key(&room) { + Err(anyhow!("room {:?} already exists", room)) + } else { + server_rooms.insert(room, Default::default()); + Ok(()) + } + } + + async fn delete_room(&self, room: String) -> Result<()> { + // TODO: clear state associated with all `Room`s. + self.background.simulate_random_delay().await; + let mut server_rooms = self.rooms.lock(); + server_rooms + .remove(&room) + .ok_or_else(|| anyhow!("room {:?} does not exist", room))?; + Ok(()) + } + async fn join_room(&self, token: String, client_room: Arc) -> Result<()> { self.background.simulate_random_delay().await; let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let identity = claims.sub.unwrap().to_string(); - let room = claims.video.room.unwrap(); + let room_name = claims.video.room.unwrap(); let mut server_rooms = self.rooms.lock(); let room = server_rooms - .get_mut(&*room) - .ok_or_else(|| anyhow!("room {} does not exist", room))?; - room.clients.insert(identity, client_room); + .get_mut(&*room_name) + .ok_or_else(|| anyhow!("room {:?} does not exist", room_name))?; + if room.clients.contains_key(&identity) { + Err(anyhow!( + "{:?} attempted to join room {:?} twice", + identity, + room_name + )) + } else { + room.clients.insert(identity, client_room); + Ok(()) + } + } + + async fn leave_room(&self, token: String) -> Result<()> { + self.background.simulate_random_delay().await; + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let identity = claims.sub.unwrap().to_string(); + let room_name = claims.video.room.unwrap(); + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&*room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + room.clients.remove(&identity).ok_or_else(|| { + anyhow!( + "{:?} attempted to leave room {:?} before joining it", + identity, + room_name + ) + })?; + Ok(()) + } + + async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> { + // TODO: clear state associated with the `Room`. + + self.background.simulate_random_delay().await; + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + room.clients.remove(&identity).ok_or_else(|| { + anyhow!( + "participant {:?} did not join room {:?}", + identity, + room_name + ) + })?; Ok(()) } } -struct FakeServerRoom { +#[derive(Default)] +struct TestServerRoom { clients: HashMap>, } -impl FakeServerRoom {} +impl TestServerRoom {} + +pub struct TestApiClient { + url: String, +} + +#[async_trait] +impl live_kit_server::api::Client for TestApiClient { + fn url(&self) -> &str { + &self.url + } + + async fn create_room(&self, name: String) -> Result<()> { + let server = TestServer::get(&self.url)?; + server.create_room(name).await?; + Ok(()) + } + + async fn delete_room(&self, name: String) -> Result<()> { + let server = TestServer::get(&self.url)?; + server.delete_room(name).await?; + Ok(()) + } + + async fn remove_participant(&self, room: String, identity: String) -> Result<()> { + let server = TestServer::get(&self.url)?; + server.remove_participant(room, identity).await?; + Ok(()) + } + + fn room_token(&self, room: &str, identity: &str) -> Result { + let server = TestServer::get(&self.url)?; + token::create( + &server.api_key, + &server.secret_key, + Some(identity), + token::VideoGrant::to_join(room), + ) + } +} pub type Sid = String; -pub struct Room; +#[derive(Default)] +struct RoomState { + token: Option, +} + +#[derive(Default)] +pub struct Room(Mutex); impl Room { pub fn new() -> Arc { - Arc::new(Self) + Default::default() } pub fn connect(self: &Arc, url: &str, token: &str) -> impl Future> { @@ -89,8 +212,9 @@ impl Room { let url = url.to_string(); let token = token.to_string(); async move { - let server = FakeServer::get(&url)?; - server.join_room(token, this).await?; + let server = TestServer::get(&url)?; + server.join_room(token.clone(), this.clone()).await?; + this.0.lock().token = Some(token); Ok(()) } } @@ -115,7 +239,14 @@ impl Room { impl Drop for Room { fn drop(&mut self) { - todo!() + if let Some(token) = self.0.lock().token.take() { + if let Ok(server) = TestServer::get(&token) { + let background = server.background.clone(); + background + .spawn(async move { server.leave_room(token).await.unwrap() }) + .detach(); + } + } } } diff --git a/crates/live_kit_server/Cargo.toml b/crates/live_kit_server/Cargo.toml index 8cd0b36c7c..64267f62d1 100644 --- a/crates/live_kit_server/Cargo.toml +++ b/crates/live_kit_server/Cargo.toml @@ -10,6 +10,7 @@ doctest = false [dependencies] anyhow = "1.0.38" +async-trait = "0.1" futures = "0.3" hmac = "0.12" log = "0.4" diff --git a/crates/live_kit_server/src/api.rs b/crates/live_kit_server/src/api.rs index 2dafef253b..43e01ce880 100644 --- a/crates/live_kit_server/src/api.rs +++ b/crates/live_kit_server/src/api.rs @@ -1,18 +1,28 @@ use crate::{proto, token}; use anyhow::{anyhow, Result}; +use async_trait::async_trait; use prost::Message; use reqwest::header::CONTENT_TYPE; use std::{future::Future, sync::Arc}; +#[async_trait] +pub trait Client: Send + Sync { + fn url(&self) -> &str; + async fn create_room(&self, name: String) -> Result<()>; + async fn delete_room(&self, name: String) -> Result<()>; + async fn remove_participant(&self, room: String, identity: String) -> Result<()>; + fn room_token(&self, room: &str, identity: &str) -> Result; +} + #[derive(Clone)] -pub struct Client { +pub struct LiveKitClient { http: reqwest::Client, url: Arc, key: Arc, secret: Arc, } -impl Client { +impl LiveKitClient { pub fn new(mut url: String, key: String, secret: String) -> Self { if url.ends_with('/') { url.pop(); @@ -26,67 +36,6 @@ impl Client { } } - pub fn url(&self) -> &str { - &self.url - } - - pub fn create_room(&self, name: String) -> impl Future> { - self.request( - "twirp/livekit.RoomService/CreateRoom", - token::VideoGrant { - room_create: Some(true), - ..Default::default() - }, - proto::CreateRoomRequest { - name, - ..Default::default() - }, - ) - } - - pub fn delete_room(&self, name: String) -> impl Future> { - let response = self.request( - "twirp/livekit.RoomService/DeleteRoom", - token::VideoGrant { - room_create: Some(true), - ..Default::default() - }, - proto::DeleteRoomRequest { room: name }, - ); - async move { - let _: proto::DeleteRoomResponse = response.await?; - Ok(()) - } - } - - pub fn remove_participant( - &self, - room: String, - identity: String, - ) -> impl Future> { - let response = self.request( - "twirp/livekit.RoomService/RemoveParticipant", - token::VideoGrant::to_admin(&room), - proto::RoomParticipantIdentity { - room: room.clone(), - identity, - }, - ); - async move { - let _: proto::RemoveParticipantResponse = response.await?; - Ok(()) - } - } - - pub fn room_token(&self, room: &str, identity: &str) -> Result { - token::create( - &self.key, - &self.secret, - Some(identity), - token::VideoGrant::to_join(room), - ) - } - fn request( &self, path: &str, @@ -126,3 +75,65 @@ impl Client { } } } + +#[async_trait] +impl Client for LiveKitClient { + fn url(&self) -> &str { + &self.url + } + + async fn create_room(&self, name: String) -> Result<()> { + let x: proto::Room = self + .request( + "twirp/livekit.RoomService/CreateRoom", + token::VideoGrant { + room_create: Some(true), + ..Default::default() + }, + proto::CreateRoomRequest { + name, + ..Default::default() + }, + ) + .await?; + dbg!(x); + Ok(()) + } + + async fn delete_room(&self, name: String) -> Result<()> { + let _: proto::DeleteRoomResponse = self + .request( + "twirp/livekit.RoomService/DeleteRoom", + token::VideoGrant { + room_create: Some(true), + ..Default::default() + }, + proto::DeleteRoomRequest { room: name }, + ) + .await?; + Ok(()) + } + + async fn remove_participant(&self, room: String, identity: String) -> Result<()> { + let _: proto::RemoveParticipantResponse = self + .request( + "twirp/livekit.RoomService/RemoveParticipant", + token::VideoGrant::to_admin(&room), + proto::RoomParticipantIdentity { + room: room.clone(), + identity, + }, + ) + .await?; + Ok(()) + } + + fn room_token(&self, room: &str, identity: &str) -> Result { + token::create( + &self.key, + &self.secret, + Some(identity), + token::VideoGrant::to_join(room), + ) + } +}