mirror of
https://github.com/zed-industries/zed.git
synced 2024-11-13 06:39:31 +03:00
Merge pull request #1965 from zed-industries/preserve-calls-during-server-restarts
Automatically re-join call when server is restarted
This commit is contained in:
commit
461c2400ad
@ -94,12 +94,18 @@ impl ActiveCall {
|
||||
|
||||
async fn handle_call_canceled(
|
||||
this: ModelHandle<Self>,
|
||||
_: TypedEnvelope<proto::CallCanceled>,
|
||||
envelope: TypedEnvelope<proto::CallCanceled>,
|
||||
_: Arc<Client>,
|
||||
mut cx: AsyncAppContext,
|
||||
) -> Result<()> {
|
||||
this.update(&mut cx, |this, _| {
|
||||
*this.incoming_call.0.borrow_mut() = None;
|
||||
let mut incoming_call = this.incoming_call.0.borrow_mut();
|
||||
if incoming_call
|
||||
.as_ref()
|
||||
.map_or(false, |call| call.room_id == envelope.payload.room_id)
|
||||
{
|
||||
incoming_call.take();
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ use project::Project;
|
||||
use std::{mem, sync::Arc, time::Duration};
|
||||
use util::{post_inc, ResultExt};
|
||||
|
||||
pub const RECONNECTION_TIMEOUT: Duration = client::RECEIVE_TIMEOUT;
|
||||
pub const RECONNECT_TIMEOUT: Duration = client::RECEIVE_TIMEOUT;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub enum Event {
|
||||
@ -50,7 +50,7 @@ pub struct Room {
|
||||
user_store: ModelHandle<UserStore>,
|
||||
subscriptions: Vec<client::Subscription>,
|
||||
pending_room_update: Option<Task<()>>,
|
||||
_maintain_connection: Task<Result<()>>,
|
||||
maintain_connection: Option<Task<Result<()>>>,
|
||||
}
|
||||
|
||||
impl Entity for Room {
|
||||
@ -121,7 +121,7 @@ impl Room {
|
||||
None
|
||||
};
|
||||
|
||||
let _maintain_connection =
|
||||
let maintain_connection =
|
||||
cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx));
|
||||
|
||||
Self {
|
||||
@ -138,7 +138,7 @@ impl Room {
|
||||
pending_room_update: None,
|
||||
client,
|
||||
user_store,
|
||||
_maintain_connection,
|
||||
maintain_connection: Some(maintain_connection),
|
||||
}
|
||||
}
|
||||
|
||||
@ -235,6 +235,8 @@ impl Room {
|
||||
self.participant_user_ids.clear();
|
||||
self.subscriptions.clear();
|
||||
self.live_kit.take();
|
||||
self.pending_room_update.take();
|
||||
self.maintain_connection.take();
|
||||
self.client.send(proto::LeaveRoom {})?;
|
||||
Ok(())
|
||||
}
|
||||
@ -262,45 +264,52 @@ impl Room {
|
||||
});
|
||||
|
||||
// Wait for client to re-establish a connection to the server.
|
||||
let mut reconnection_timeout = cx.background().timer(RECONNECTION_TIMEOUT).fuse();
|
||||
let client_reconnection = async {
|
||||
loop {
|
||||
if let Some(status) = client_status.next().await {
|
||||
if status.is_connected() {
|
||||
return true;
|
||||
{
|
||||
let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
|
||||
let client_reconnection = async {
|
||||
let mut remaining_attempts = 3;
|
||||
while remaining_attempts > 0 {
|
||||
if let Some(status) = client_status.next().await {
|
||||
if status.is_connected() {
|
||||
let rejoin_room = async {
|
||||
let response =
|
||||
client.request(proto::JoinRoom { id: room_id }).await?;
|
||||
let room_proto =
|
||||
response.room.ok_or_else(|| anyhow!("invalid room"))?;
|
||||
this.upgrade(&cx)
|
||||
.ok_or_else(|| anyhow!("room was dropped"))?
|
||||
.update(&mut cx, |this, cx| {
|
||||
this.status = RoomStatus::Online;
|
||||
this.apply_room_update(room_proto, cx)
|
||||
})?;
|
||||
anyhow::Ok(())
|
||||
};
|
||||
|
||||
if rejoin_room.await.is_ok() {
|
||||
return true;
|
||||
} else {
|
||||
remaining_attempts -= 1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
.fuse();
|
||||
futures::pin_mut!(client_reconnection);
|
||||
.fuse();
|
||||
futures::pin_mut!(client_reconnection);
|
||||
|
||||
futures::select_biased! {
|
||||
reconnected = client_reconnection => {
|
||||
if reconnected {
|
||||
// Client managed to reconnect to the server. Now attempt to join the room.
|
||||
let rejoin_room = async {
|
||||
let response = client.request(proto::JoinRoom { id: room_id }).await?;
|
||||
let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
|
||||
this.upgrade(&cx)
|
||||
.ok_or_else(|| anyhow!("room was dropped"))?
|
||||
.update(&mut cx, |this, cx| {
|
||||
this.status = RoomStatus::Online;
|
||||
this.apply_room_update(room_proto, cx)
|
||||
})?;
|
||||
anyhow::Ok(())
|
||||
};
|
||||
|
||||
// If we successfully joined the room, go back around the loop
|
||||
// waiting for future connection status changes.
|
||||
if rejoin_room.await.log_err().is_some() {
|
||||
futures::select_biased! {
|
||||
reconnected = client_reconnection => {
|
||||
if reconnected {
|
||||
// If we successfully joined the room, go back around the loop
|
||||
// waiting for future connection status changes.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
_ = reconnection_timeout => {}
|
||||
}
|
||||
_ = reconnection_timeout => {}
|
||||
}
|
||||
|
||||
// The client failed to re-establish a connection to the server
|
||||
|
@ -129,6 +129,7 @@ CREATE TABLE "room_participants" (
|
||||
"calling_connection_epoch" TEXT NOT NULL
|
||||
);
|
||||
CREATE UNIQUE INDEX "index_room_participants_on_user_id" ON "room_participants" ("user_id");
|
||||
CREATE INDEX "index_room_participants_on_room_id" ON "room_participants" ("room_id");
|
||||
CREATE INDEX "index_room_participants_on_answering_connection_epoch" ON "room_participants" ("answering_connection_epoch");
|
||||
CREATE INDEX "index_room_participants_on_calling_connection_epoch" ON "room_participants" ("calling_connection_epoch");
|
||||
CREATE INDEX "index_room_participants_on_answering_connection_id" ON "room_participants" ("answering_connection_id");
|
||||
|
@ -0,0 +1 @@
|
||||
CREATE INDEX "index_room_participants_on_room_id" ON "room_participants" ("room_id");
|
@ -21,6 +21,7 @@ use dashmap::DashMap;
|
||||
use futures::StreamExt;
|
||||
use hyper::StatusCode;
|
||||
use rpc::{proto, ConnectionId};
|
||||
use sea_orm::Condition;
|
||||
pub use sea_orm::ConnectOptions;
|
||||
use sea_orm::{
|
||||
entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
|
||||
@ -47,7 +48,7 @@ pub struct Database {
|
||||
background: Option<std::sync::Arc<gpui::executor::Background>>,
|
||||
#[cfg(test)]
|
||||
runtime: Option<tokio::runtime::Runtime>,
|
||||
epoch: Uuid,
|
||||
epoch: parking_lot::RwLock<Uuid>,
|
||||
}
|
||||
|
||||
impl Database {
|
||||
@ -60,10 +61,20 @@ impl Database {
|
||||
background: None,
|
||||
#[cfg(test)]
|
||||
runtime: None,
|
||||
epoch: Uuid::new_v4(),
|
||||
epoch: parking_lot::RwLock::new(Uuid::new_v4()),
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn reset(&self) {
|
||||
self.rooms.clear();
|
||||
*self.epoch.write() = Uuid::new_v4();
|
||||
}
|
||||
|
||||
fn epoch(&self) -> Uuid {
|
||||
*self.epoch.read()
|
||||
}
|
||||
|
||||
pub async fn migrate(
|
||||
&self,
|
||||
migrations_path: &Path,
|
||||
@ -105,34 +116,14 @@ impl Database {
|
||||
Ok(new_migrations)
|
||||
}
|
||||
|
||||
pub async fn clear_stale_data(&self) -> Result<()> {
|
||||
pub async fn delete_stale_projects(&self) -> Result<()> {
|
||||
self.transaction(|tx| async move {
|
||||
project_collaborator::Entity::delete_many()
|
||||
.filter(project_collaborator::Column::ConnectionEpoch.ne(self.epoch))
|
||||
.exec(&*tx)
|
||||
.await?;
|
||||
room_participant::Entity::delete_many()
|
||||
.filter(
|
||||
room_participant::Column::AnsweringConnectionEpoch
|
||||
.ne(self.epoch)
|
||||
.or(room_participant::Column::CallingConnectionEpoch.ne(self.epoch)),
|
||||
)
|
||||
.filter(project_collaborator::Column::ConnectionEpoch.ne(self.epoch()))
|
||||
.exec(&*tx)
|
||||
.await?;
|
||||
project::Entity::delete_many()
|
||||
.filter(project::Column::HostConnectionEpoch.ne(self.epoch))
|
||||
.exec(&*tx)
|
||||
.await?;
|
||||
room::Entity::delete_many()
|
||||
.filter(
|
||||
room::Column::Id.not_in_subquery(
|
||||
Query::select()
|
||||
.column(room_participant::Column::RoomId)
|
||||
.from(room_participant::Entity)
|
||||
.distinct()
|
||||
.to_owned(),
|
||||
),
|
||||
)
|
||||
.filter(project::Column::HostConnectionEpoch.ne(self.epoch()))
|
||||
.exec(&*tx)
|
||||
.await?;
|
||||
Ok(())
|
||||
@ -140,6 +131,74 @@ impl Database {
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn outdated_room_ids(&self) -> Result<Vec<RoomId>> {
|
||||
self.transaction(|tx| async move {
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
|
||||
enum QueryAs {
|
||||
RoomId,
|
||||
}
|
||||
|
||||
Ok(room_participant::Entity::find()
|
||||
.select_only()
|
||||
.column(room_participant::Column::RoomId)
|
||||
.distinct()
|
||||
.filter(room_participant::Column::AnsweringConnectionEpoch.ne(self.epoch()))
|
||||
.into_values::<_, QueryAs>()
|
||||
.all(&*tx)
|
||||
.await?)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn refresh_room(&self, room_id: RoomId) -> Result<RoomGuard<RefreshedRoom>> {
|
||||
self.room_transaction(|tx| async move {
|
||||
let stale_participant_filter = Condition::all()
|
||||
.add(room_participant::Column::RoomId.eq(room_id))
|
||||
.add(room_participant::Column::AnsweringConnectionId.is_not_null())
|
||||
.add(room_participant::Column::AnsweringConnectionEpoch.ne(self.epoch()));
|
||||
|
||||
let stale_participant_user_ids = room_participant::Entity::find()
|
||||
.filter(stale_participant_filter.clone())
|
||||
.all(&*tx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|participant| participant.user_id)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Delete participants who failed to reconnect.
|
||||
room_participant::Entity::delete_many()
|
||||
.filter(stale_participant_filter)
|
||||
.exec(&*tx)
|
||||
.await?;
|
||||
|
||||
let room = self.get_room(room_id, &tx).await?;
|
||||
let mut canceled_calls_to_user_ids = Vec::new();
|
||||
// Delete the room if it becomes empty and cancel pending calls.
|
||||
if room.participants.is_empty() {
|
||||
canceled_calls_to_user_ids.extend(
|
||||
room.pending_participants
|
||||
.iter()
|
||||
.map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
|
||||
);
|
||||
room_participant::Entity::delete_many()
|
||||
.filter(room_participant::Column::RoomId.eq(room_id))
|
||||
.exec(&*tx)
|
||||
.await?;
|
||||
room::Entity::delete_by_id(room_id).exec(&*tx).await?;
|
||||
}
|
||||
|
||||
Ok((
|
||||
room_id,
|
||||
RefreshedRoom {
|
||||
room,
|
||||
stale_participant_user_ids,
|
||||
canceled_calls_to_user_ids,
|
||||
},
|
||||
))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
// users
|
||||
|
||||
pub async fn create_user(
|
||||
@ -1033,11 +1092,11 @@ impl Database {
|
||||
room_id: ActiveValue::set(room_id),
|
||||
user_id: ActiveValue::set(user_id),
|
||||
answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
|
||||
answering_connection_epoch: ActiveValue::set(Some(self.epoch)),
|
||||
answering_connection_epoch: ActiveValue::set(Some(self.epoch())),
|
||||
answering_connection_lost: ActiveValue::set(false),
|
||||
calling_user_id: ActiveValue::set(user_id),
|
||||
calling_connection_id: ActiveValue::set(connection_id.0 as i32),
|
||||
calling_connection_epoch: ActiveValue::set(self.epoch),
|
||||
calling_connection_epoch: ActiveValue::set(self.epoch()),
|
||||
..Default::default()
|
||||
}
|
||||
.insert(&*tx)
|
||||
@ -1064,7 +1123,7 @@ impl Database {
|
||||
answering_connection_lost: ActiveValue::set(false),
|
||||
calling_user_id: ActiveValue::set(calling_user_id),
|
||||
calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32),
|
||||
calling_connection_epoch: ActiveValue::set(self.epoch),
|
||||
calling_connection_epoch: ActiveValue::set(self.epoch()),
|
||||
initial_project_id: ActiveValue::set(initial_project_id),
|
||||
..Default::default()
|
||||
}
|
||||
@ -1174,18 +1233,22 @@ impl Database {
|
||||
self.room_transaction(|tx| async move {
|
||||
let result = room_participant::Entity::update_many()
|
||||
.filter(
|
||||
room_participant::Column::RoomId
|
||||
.eq(room_id)
|
||||
.and(room_participant::Column::UserId.eq(user_id))
|
||||
.and(
|
||||
room_participant::Column::AnsweringConnectionId
|
||||
.is_null()
|
||||
.or(room_participant::Column::AnsweringConnectionLost.eq(true)),
|
||||
Condition::all()
|
||||
.add(room_participant::Column::RoomId.eq(room_id))
|
||||
.add(room_participant::Column::UserId.eq(user_id))
|
||||
.add(
|
||||
Condition::any()
|
||||
.add(room_participant::Column::AnsweringConnectionId.is_null())
|
||||
.add(room_participant::Column::AnsweringConnectionLost.eq(true))
|
||||
.add(
|
||||
room_participant::Column::AnsweringConnectionEpoch
|
||||
.ne(self.epoch()),
|
||||
),
|
||||
),
|
||||
)
|
||||
.set(room_participant::ActiveModel {
|
||||
answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
|
||||
answering_connection_epoch: ActiveValue::set(Some(self.epoch)),
|
||||
answering_connection_epoch: ActiveValue::set(Some(self.epoch())),
|
||||
answering_connection_lost: ActiveValue::set(false),
|
||||
..Default::default()
|
||||
})
|
||||
@ -1591,7 +1654,7 @@ impl Database {
|
||||
room_id: ActiveValue::set(participant.room_id),
|
||||
host_user_id: ActiveValue::set(participant.user_id),
|
||||
host_connection_id: ActiveValue::set(connection_id.0 as i32),
|
||||
host_connection_epoch: ActiveValue::set(self.epoch),
|
||||
host_connection_epoch: ActiveValue::set(self.epoch()),
|
||||
..Default::default()
|
||||
}
|
||||
.insert(&*tx)
|
||||
@ -1616,7 +1679,7 @@ impl Database {
|
||||
project_collaborator::ActiveModel {
|
||||
project_id: ActiveValue::set(project.id),
|
||||
connection_id: ActiveValue::set(connection_id.0 as i32),
|
||||
connection_epoch: ActiveValue::set(self.epoch),
|
||||
connection_epoch: ActiveValue::set(self.epoch()),
|
||||
user_id: ActiveValue::set(participant.user_id),
|
||||
replica_id: ActiveValue::set(ReplicaId(0)),
|
||||
is_host: ActiveValue::set(true),
|
||||
@ -1930,7 +1993,7 @@ impl Database {
|
||||
let new_collaborator = project_collaborator::ActiveModel {
|
||||
project_id: ActiveValue::set(project_id),
|
||||
connection_id: ActiveValue::set(connection_id.0 as i32),
|
||||
connection_epoch: ActiveValue::set(self.epoch),
|
||||
connection_epoch: ActiveValue::set(self.epoch()),
|
||||
user_id: ActiveValue::set(participant.user_id),
|
||||
replica_id: ActiveValue::set(replica_id),
|
||||
is_host: ActiveValue::set(false),
|
||||
@ -2553,6 +2616,12 @@ pub struct LeftRoom {
|
||||
pub canceled_calls_to_user_ids: Vec<UserId>,
|
||||
}
|
||||
|
||||
pub struct RefreshedRoom {
|
||||
pub room: proto::Room,
|
||||
pub stale_participant_user_ids: Vec<UserId>,
|
||||
pub canceled_calls_to_user_ids: Vec<UserId>,
|
||||
}
|
||||
|
||||
pub struct Project {
|
||||
pub collaborators: Vec<project_collaborator::Model>,
|
||||
pub worktrees: BTreeMap<u64, Worktree>,
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -2,7 +2,7 @@ pub mod api;
|
||||
pub mod auth;
|
||||
pub mod db;
|
||||
pub mod env;
|
||||
mod executor;
|
||||
pub mod executor;
|
||||
#[cfg(test)]
|
||||
mod integration_tests;
|
||||
pub mod rpc;
|
||||
|
@ -1,6 +1,6 @@
|
||||
use anyhow::anyhow;
|
||||
use axum::{routing::get, Router};
|
||||
use collab::{db, env, AppState, Config, MigrateConfig, Result};
|
||||
use collab::{db, env, executor::Executor, AppState, Config, MigrateConfig, Result};
|
||||
use db::Database;
|
||||
use std::{
|
||||
env::args,
|
||||
@ -52,12 +52,12 @@ async fn main() -> Result<()> {
|
||||
init_tracing(&config);
|
||||
|
||||
let state = AppState::new(config).await?;
|
||||
state.db.clear_stale_data().await?;
|
||||
|
||||
let listener = TcpListener::bind(&format!("0.0.0.0:{}", state.config.http_port))
|
||||
.expect("failed to bind TCP listener");
|
||||
|
||||
let rpc_server = collab::rpc::Server::new(state.clone());
|
||||
let rpc_server = collab::rpc::Server::new(state.clone(), Executor::Production);
|
||||
rpc_server.start().await?;
|
||||
|
||||
let app = collab::api::routes(rpc_server.clone(), state.clone())
|
||||
.merge(collab::rpc::routes(rpc_server.clone()))
|
||||
|
@ -53,7 +53,7 @@ use std::{
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::sync::{watch, Mutex, MutexGuard};
|
||||
use tokio::sync::watch;
|
||||
use tower::ServiceBuilder;
|
||||
use tracing::{info_span, instrument, Instrument};
|
||||
|
||||
@ -90,14 +90,14 @@ impl<R: RequestMessage> Response<R> {
|
||||
struct Session {
|
||||
user_id: UserId,
|
||||
connection_id: ConnectionId,
|
||||
db: Arc<Mutex<DbHandle>>,
|
||||
db: Arc<tokio::sync::Mutex<DbHandle>>,
|
||||
peer: Arc<Peer>,
|
||||
connection_pool: Arc<Mutex<ConnectionPool>>,
|
||||
connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
|
||||
live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
async fn db(&self) -> MutexGuard<DbHandle> {
|
||||
async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
|
||||
#[cfg(test)]
|
||||
tokio::task::yield_now().await;
|
||||
let guard = self.db.lock().await;
|
||||
@ -109,9 +109,7 @@ impl Session {
|
||||
async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
|
||||
#[cfg(test)]
|
||||
tokio::task::yield_now().await;
|
||||
let guard = self.connection_pool.lock().await;
|
||||
#[cfg(test)]
|
||||
tokio::task::yield_now().await;
|
||||
let guard = self.connection_pool.lock();
|
||||
ConnectionPoolGuard {
|
||||
guard,
|
||||
_not_send: PhantomData,
|
||||
@ -140,14 +138,15 @@ impl Deref for DbHandle {
|
||||
|
||||
pub struct Server {
|
||||
peer: Arc<Peer>,
|
||||
pub(crate) connection_pool: Arc<Mutex<ConnectionPool>>,
|
||||
pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
|
||||
app_state: Arc<AppState>,
|
||||
executor: Executor,
|
||||
handlers: HashMap<TypeId, MessageHandler>,
|
||||
teardown: watch::Sender<()>,
|
||||
}
|
||||
|
||||
pub(crate) struct ConnectionPoolGuard<'a> {
|
||||
guard: MutexGuard<'a, ConnectionPool>,
|
||||
guard: parking_lot::MutexGuard<'a, ConnectionPool>,
|
||||
_not_send: PhantomData<Rc<()>>,
|
||||
}
|
||||
|
||||
@ -168,10 +167,11 @@ where
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub fn new(app_state: Arc<AppState>) -> Arc<Self> {
|
||||
pub fn new(app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
|
||||
let mut server = Self {
|
||||
peer: Peer::new(),
|
||||
app_state,
|
||||
executor,
|
||||
connection_pool: Default::default(),
|
||||
handlers: Default::default(),
|
||||
teardown: watch::channel(()).0,
|
||||
@ -237,7 +237,96 @@ impl Server {
|
||||
Arc::new(server)
|
||||
}
|
||||
|
||||
pub async fn start(&self) -> Result<()> {
|
||||
self.app_state.db.delete_stale_projects().await?;
|
||||
let db = self.app_state.db.clone();
|
||||
let peer = self.peer.clone();
|
||||
let timeout = self.executor.sleep(RECONNECT_TIMEOUT);
|
||||
let pool = self.connection_pool.clone();
|
||||
let live_kit_client = self.app_state.live_kit_client.clone();
|
||||
self.executor.spawn_detached(async move {
|
||||
timeout.await;
|
||||
if let Some(room_ids) = db.outdated_room_ids().await.trace_err() {
|
||||
for room_id in room_ids {
|
||||
let mut contacts_to_update = HashSet::default();
|
||||
let mut canceled_calls_to_user_ids = Vec::new();
|
||||
let mut live_kit_room = String::new();
|
||||
let mut delete_live_kit_room = false;
|
||||
|
||||
if let Ok(mut refreshed_room) = db.refresh_room(room_id).await {
|
||||
room_updated(&refreshed_room.room, &peer);
|
||||
contacts_to_update
|
||||
.extend(refreshed_room.stale_participant_user_ids.iter().copied());
|
||||
contacts_to_update
|
||||
.extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
|
||||
canceled_calls_to_user_ids =
|
||||
mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
|
||||
live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
|
||||
delete_live_kit_room = refreshed_room.room.participants.is_empty();
|
||||
}
|
||||
|
||||
{
|
||||
let pool = pool.lock();
|
||||
for canceled_user_id in canceled_calls_to_user_ids {
|
||||
for connection_id in pool.user_connection_ids(canceled_user_id) {
|
||||
peer.send(
|
||||
connection_id,
|
||||
proto::CallCanceled {
|
||||
room_id: room_id.to_proto(),
|
||||
},
|
||||
)
|
||||
.trace_err();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for user_id in contacts_to_update {
|
||||
let busy = db.is_user_busy(user_id).await.trace_err();
|
||||
let contacts = db.get_contacts(user_id).await.trace_err();
|
||||
if let Some((busy, contacts)) = busy.zip(contacts) {
|
||||
let pool = pool.lock();
|
||||
let updated_contact = contact_for_user(user_id, false, busy, &pool);
|
||||
for contact in contacts {
|
||||
if let db::Contact::Accepted {
|
||||
user_id: contact_user_id,
|
||||
..
|
||||
} = contact
|
||||
{
|
||||
for contact_conn_id in pool.user_connection_ids(contact_user_id)
|
||||
{
|
||||
peer.send(
|
||||
contact_conn_id,
|
||||
proto::UpdateContacts {
|
||||
contacts: vec![updated_contact.clone()],
|
||||
remove_contacts: Default::default(),
|
||||
incoming_requests: Default::default(),
|
||||
remove_incoming_requests: Default::default(),
|
||||
outgoing_requests: Default::default(),
|
||||
remove_outgoing_requests: Default::default(),
|
||||
},
|
||||
)
|
||||
.trace_err();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(live_kit) = live_kit_client.as_ref() {
|
||||
if delete_live_kit_room {
|
||||
live_kit.delete_room(live_kit_room).await.trace_err();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn teardown(&self) {
|
||||
self.peer.reset();
|
||||
self.connection_pool.lock().reset();
|
||||
let _ = self.teardown.send(());
|
||||
}
|
||||
|
||||
@ -339,7 +428,7 @@ impl Server {
|
||||
let user_id = user.id;
|
||||
let login = user.github_login;
|
||||
let span = info_span!("handle connection", %user_id, %login, %address);
|
||||
let teardown = self.teardown.subscribe();
|
||||
let mut teardown = self.teardown.subscribe();
|
||||
async move {
|
||||
let (connection_id, handle_io, mut incoming_rx) = this
|
||||
.peer
|
||||
@ -367,7 +456,7 @@ impl Server {
|
||||
).await?;
|
||||
|
||||
{
|
||||
let mut pool = this.connection_pool.lock().await;
|
||||
let mut pool = this.connection_pool.lock();
|
||||
pool.add_connection(connection_id, user_id, user.admin);
|
||||
this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
|
||||
|
||||
@ -386,7 +475,7 @@ impl Server {
|
||||
let session = Session {
|
||||
user_id,
|
||||
connection_id,
|
||||
db: Arc::new(Mutex::new(DbHandle(this.app_state.db.clone()))),
|
||||
db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
|
||||
peer: this.peer.clone(),
|
||||
connection_pool: this.connection_pool.clone(),
|
||||
live_kit_client: this.app_state.live_kit_client.clone()
|
||||
@ -409,6 +498,7 @@ impl Server {
|
||||
let next_message = incoming_rx.next().fuse();
|
||||
futures::pin_mut!(next_message);
|
||||
futures::select_biased! {
|
||||
_ = teardown.changed().fuse() => return Ok(()),
|
||||
result = handle_io => {
|
||||
if let Err(error) = result {
|
||||
tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
|
||||
@ -460,7 +550,7 @@ impl Server {
|
||||
) -> Result<()> {
|
||||
if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
|
||||
if let Some(code) = &user.invite_code {
|
||||
let pool = self.connection_pool.lock().await;
|
||||
let pool = self.connection_pool.lock();
|
||||
let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
|
||||
for connection_id in pool.user_connection_ids(inviter_id) {
|
||||
self.peer.send(
|
||||
@ -486,7 +576,7 @@ impl Server {
|
||||
pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
|
||||
if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
|
||||
if let Some(invite_code) = &user.invite_code {
|
||||
let pool = self.connection_pool.lock().await;
|
||||
let pool = self.connection_pool.lock();
|
||||
for connection_id in pool.user_connection_ids(user_id) {
|
||||
self.peer.send(
|
||||
connection_id,
|
||||
@ -507,7 +597,7 @@ impl Server {
|
||||
pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
|
||||
ServerSnapshot {
|
||||
connection_pool: ConnectionPoolGuard {
|
||||
guard: self.connection_pool.lock().await,
|
||||
guard: self.connection_pool.lock(),
|
||||
_not_send: PhantomData,
|
||||
},
|
||||
peer: &self.peer,
|
||||
@ -628,7 +718,6 @@ pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result
|
||||
let connections = server
|
||||
.connection_pool
|
||||
.lock()
|
||||
.await
|
||||
.connections()
|
||||
.filter(|connection| !connection.admin)
|
||||
.count();
|
||||
@ -681,7 +770,7 @@ async fn sign_out(
|
||||
{
|
||||
let db = session.db().await;
|
||||
if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() {
|
||||
room_updated(&room, &session);
|
||||
room_updated(&room, &session.peer);
|
||||
}
|
||||
}
|
||||
update_user_contacts(session.user_id, &session).await?;
|
||||
@ -749,17 +838,14 @@ async fn join_room(
|
||||
response: Response<proto::JoinRoom>,
|
||||
session: Session,
|
||||
) -> Result<()> {
|
||||
let room_id = RoomId::from_proto(request.id);
|
||||
let room = {
|
||||
let room = session
|
||||
.db()
|
||||
.await
|
||||
.join_room(
|
||||
RoomId::from_proto(request.id),
|
||||
session.user_id,
|
||||
session.connection_id,
|
||||
)
|
||||
.join_room(room_id, session.user_id, session.connection_id)
|
||||
.await?;
|
||||
room_updated(&room, &session);
|
||||
room_updated(&room, &session.peer);
|
||||
room.clone()
|
||||
};
|
||||
|
||||
@ -770,7 +856,12 @@ async fn join_room(
|
||||
{
|
||||
session
|
||||
.peer
|
||||
.send(connection_id, proto::CallCanceled {})
|
||||
.send(
|
||||
connection_id,
|
||||
proto::CallCanceled {
|
||||
room_id: room_id.to_proto(),
|
||||
},
|
||||
)
|
||||
.trace_err();
|
||||
}
|
||||
|
||||
@ -834,7 +925,7 @@ async fn call(
|
||||
initial_project_id,
|
||||
)
|
||||
.await?;
|
||||
room_updated(&room, &session);
|
||||
room_updated(&room, &session.peer);
|
||||
mem::take(incoming_call)
|
||||
};
|
||||
update_user_contacts(called_user_id, &session).await?;
|
||||
@ -864,7 +955,7 @@ async fn call(
|
||||
.await
|
||||
.call_failed(room_id, called_user_id)
|
||||
.await?;
|
||||
room_updated(&room, &session);
|
||||
room_updated(&room, &session.peer);
|
||||
}
|
||||
update_user_contacts(called_user_id, &session).await?;
|
||||
|
||||
@ -884,7 +975,7 @@ async fn cancel_call(
|
||||
.await
|
||||
.cancel_call(Some(room_id), session.connection_id, called_user_id)
|
||||
.await?;
|
||||
room_updated(&room, &session);
|
||||
room_updated(&room, &session.peer);
|
||||
}
|
||||
|
||||
for connection_id in session
|
||||
@ -894,7 +985,12 @@ async fn cancel_call(
|
||||
{
|
||||
session
|
||||
.peer
|
||||
.send(connection_id, proto::CallCanceled {})
|
||||
.send(
|
||||
connection_id,
|
||||
proto::CallCanceled {
|
||||
room_id: room_id.to_proto(),
|
||||
},
|
||||
)
|
||||
.trace_err();
|
||||
}
|
||||
response.send(proto::Ack {})?;
|
||||
@ -911,7 +1007,7 @@ async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<(
|
||||
.await
|
||||
.decline_call(Some(room_id), session.user_id)
|
||||
.await?;
|
||||
room_updated(&room, &session);
|
||||
room_updated(&room, &session.peer);
|
||||
}
|
||||
|
||||
for connection_id in session
|
||||
@ -921,7 +1017,12 @@ async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<(
|
||||
{
|
||||
session
|
||||
.peer
|
||||
.send(connection_id, proto::CallCanceled {})
|
||||
.send(
|
||||
connection_id,
|
||||
proto::CallCanceled {
|
||||
room_id: room_id.to_proto(),
|
||||
},
|
||||
)
|
||||
.trace_err();
|
||||
}
|
||||
update_user_contacts(session.user_id, &session).await?;
|
||||
@ -942,7 +1043,7 @@ async fn update_participant_location(
|
||||
.await
|
||||
.update_room_participant_location(room_id, session.connection_id, location)
|
||||
.await?;
|
||||
room_updated(&room, &session);
|
||||
room_updated(&room, &session.peer);
|
||||
response.send(proto::Ack {})?;
|
||||
Ok(())
|
||||
}
|
||||
@ -964,7 +1065,7 @@ async fn share_project(
|
||||
response.send(proto::ShareProjectResponse {
|
||||
project_id: project_id.to_proto(),
|
||||
})?;
|
||||
room_updated(&room, &session);
|
||||
room_updated(&room, &session.peer);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -983,7 +1084,7 @@ async fn unshare_project(message: proto::UnshareProject, session: Session) -> Re
|
||||
guest_connection_ids.iter().copied(),
|
||||
|conn_id| session.peer.send(conn_id, message.clone()),
|
||||
);
|
||||
room_updated(&room, &session);
|
||||
room_updated(&room, &session.peer);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -1142,7 +1243,7 @@ async fn update_project(
|
||||
.forward_send(session.connection_id, connection_id, request.clone())
|
||||
},
|
||||
);
|
||||
room_updated(&room, &session);
|
||||
room_updated(&room, &session.peer);
|
||||
response.send(proto::Ack {})?;
|
||||
|
||||
Ok(())
|
||||
@ -1789,17 +1890,15 @@ fn contact_for_user(
|
||||
}
|
||||
}
|
||||
|
||||
fn room_updated(room: &proto::Room, session: &Session) {
|
||||
fn room_updated(room: &proto::Room, peer: &Peer) {
|
||||
for participant in &room.participants {
|
||||
session
|
||||
.peer
|
||||
.send(
|
||||
ConnectionId(participant.peer_id),
|
||||
proto::RoomUpdated {
|
||||
room: Some(room.clone()),
|
||||
},
|
||||
)
|
||||
.trace_err();
|
||||
peer.send(
|
||||
ConnectionId(participant.peer_id),
|
||||
proto::RoomUpdated {
|
||||
room: Some(room.clone()),
|
||||
},
|
||||
)
|
||||
.trace_err();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1840,6 +1939,7 @@ async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()>
|
||||
async fn leave_room_for_session(session: &Session) -> Result<()> {
|
||||
let mut contacts_to_update = HashSet::default();
|
||||
|
||||
let room_id;
|
||||
let canceled_calls_to_user_ids;
|
||||
let live_kit_room;
|
||||
let delete_live_kit_room;
|
||||
@ -1851,7 +1951,8 @@ async fn leave_room_for_session(session: &Session) -> Result<()> {
|
||||
project_left(project, session);
|
||||
}
|
||||
|
||||
room_updated(&left_room.room, &session);
|
||||
room_updated(&left_room.room, &session.peer);
|
||||
room_id = RoomId::from_proto(left_room.room.id);
|
||||
canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
|
||||
live_kit_room = mem::take(&mut left_room.room.live_kit_room);
|
||||
delete_live_kit_room = left_room.room.participants.is_empty();
|
||||
@ -1863,7 +1964,12 @@ async fn leave_room_for_session(session: &Session) -> Result<()> {
|
||||
for connection_id in pool.user_connection_ids(canceled_user_id) {
|
||||
session
|
||||
.peer
|
||||
.send(connection_id, proto::CallCanceled {})
|
||||
.send(
|
||||
connection_id,
|
||||
proto::CallCanceled {
|
||||
room_id: room_id.to_proto(),
|
||||
},
|
||||
)
|
||||
.trace_err();
|
||||
}
|
||||
contacts_to_update.insert(canceled_user_id);
|
||||
|
@ -23,6 +23,11 @@ pub struct Connection {
|
||||
}
|
||||
|
||||
impl ConnectionPool {
|
||||
pub fn reset(&mut self) {
|
||||
self.connections.clear();
|
||||
self.connected_users.clear();
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) {
|
||||
self.connections
|
||||
|
@ -212,7 +212,9 @@ message IncomingCall {
|
||||
optional ParticipantProject initial_project = 4;
|
||||
}
|
||||
|
||||
message CallCanceled {}
|
||||
message CallCanceled {
|
||||
uint64 room_id = 1;
|
||||
}
|
||||
|
||||
message CancelCall {
|
||||
uint64 room_id = 1;
|
||||
|
@ -6,4 +6,4 @@ pub use conn::Connection;
|
||||
pub use peer::*;
|
||||
mod macros;
|
||||
|
||||
pub const PROTOCOL_VERSION: u32 = 40;
|
||||
pub const PROTOCOL_VERSION: u32 = 41;
|
||||
|
Loading…
Reference in New Issue
Block a user