Move incoming calls into ActiveCall

This commit is contained in:
Antonio Scandurra 2022-10-06 09:50:26 +02:00
parent fa31c9659b
commit 55cc142319
11 changed files with 362 additions and 373 deletions

1
Cargo.lock generated
View File

@ -693,6 +693,7 @@ dependencies = [
"collections",
"futures",
"gpui",
"postage",
"project",
"util",
]

View File

@ -25,6 +25,7 @@ util = { path = "../util" }
anyhow = "1.0.38"
futures = "0.3"
postage = { version = "0.4.1", features = ["futures-traits"] }
[dev-dependencies]
client = { path = "../client", features = ["test-support"] }

View File

@ -2,22 +2,31 @@ mod participant;
pub mod room;
use anyhow::{anyhow, Result};
use client::{incoming_call::IncomingCall, Client, UserStore};
use gpui::{AppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Subscription, Task};
use client::{incoming_call::IncomingCall, proto, Client, TypedEnvelope, UserStore};
use gpui::{
AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
Subscription, Task,
};
pub use participant::ParticipantLocation;
use postage::watch;
use project::Project;
pub use room::Room;
use std::sync::Arc;
pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut MutableAppContext) {
let active_call = cx.add_model(|_| ActiveCall::new(client, user_store));
let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
cx.set_global(active_call);
}
pub struct ActiveCall {
room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
incoming_call: (
watch::Sender<Option<IncomingCall>>,
watch::Receiver<Option<IncomingCall>>,
),
client: Arc<Client>,
user_store: ModelHandle<UserStore>,
_subscriptions: Vec<client::Subscription>,
}
impl Entity for ActiveCall {
@ -25,14 +34,63 @@ impl Entity for ActiveCall {
}
impl ActiveCall {
fn new(client: Arc<Client>, user_store: ModelHandle<UserStore>) -> Self {
fn new(
client: Arc<Client>,
user_store: ModelHandle<UserStore>,
cx: &mut ModelContext<Self>,
) -> Self {
Self {
room: None,
incoming_call: watch::channel(),
_subscriptions: vec![
client.add_request_handler(cx.handle(), Self::handle_incoming_call),
client.add_message_handler(cx.handle(), Self::handle_cancel_call),
],
client,
user_store,
}
}
async fn handle_incoming_call(
this: ModelHandle<Self>,
envelope: TypedEnvelope<proto::IncomingCall>,
_: Arc<Client>,
mut cx: AsyncAppContext,
) -> Result<proto::Ack> {
let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
let call = IncomingCall {
room_id: envelope.payload.room_id,
participants: user_store
.update(&mut cx, |user_store, cx| {
user_store.get_users(envelope.payload.participant_user_ids, cx)
})
.await?,
caller: user_store
.update(&mut cx, |user_store, cx| {
user_store.get_user(envelope.payload.caller_user_id, cx)
})
.await?,
initial_project_id: envelope.payload.initial_project_id,
};
this.update(&mut cx, |this, _| {
*this.incoming_call.0.borrow_mut() = Some(call);
});
Ok(proto::Ack {})
}
async fn handle_cancel_call(
this: ModelHandle<Self>,
_: TypedEnvelope<proto::CancelCall>,
_: Arc<Client>,
mut cx: AsyncAppContext,
) -> Result<()> {
this.update(&mut cx, |this, _| {
*this.incoming_call.0.borrow_mut() = None;
});
Ok(())
}
pub fn global(cx: &AppContext) -> ModelHandle<Self> {
cx.global::<ModelHandle<Self>>().clone()
}
@ -74,12 +132,22 @@ impl ActiveCall {
})
}
pub fn join(&mut self, call: &IncomingCall, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
self.incoming_call.1.clone()
}
pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
if self.room.is_some() {
return Task::ready(Err(anyhow!("cannot join while on another call")));
}
let join = Room::join(call, self.client.clone(), self.user_store.clone(), cx);
let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
call
} else {
return Task::ready(Err(anyhow!("no incoming call")));
};
let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
cx.spawn(|this, mut cx| async move {
let room = join.await?;
this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx));
@ -87,6 +155,19 @@ impl ActiveCall {
})
}
pub fn decline_incoming(&mut self) -> Result<()> {
*self.incoming_call.0.borrow_mut() = None;
self.client.send(proto::DeclineCall {})?;
Ok(())
}
pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
if let Some((room, _)) = self.room.take() {
room.update(cx, |room, cx| room.leave(cx))?;
}
Ok(())
}
fn set_room(&mut self, room: Option<ModelHandle<Room>>, cx: &mut ModelContext<Self>) {
if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
if let Some(room) = room {

View File

@ -66,7 +66,7 @@ impl Room {
}
}
pub fn create(
pub(crate) fn create(
client: Arc<Client>,
user_store: ModelHandle<UserStore>,
cx: &mut MutableAppContext,
@ -77,7 +77,7 @@ impl Room {
})
}
pub fn join(
pub(crate) fn join(
call: &IncomingCall,
client: Arc<Client>,
user_store: ModelHandle<UserStore>,
@ -93,7 +93,7 @@ impl Room {
})
}
pub fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
if self.status.is_offline() {
return Err(anyhow!("room is offline"));
}
@ -213,7 +213,7 @@ impl Room {
Ok(())
}
pub fn call(
pub(crate) fn call(
&mut self,
recipient_user_id: u64,
initial_project_id: Option<u64>,

View File

@ -1,5 +1,4 @@
use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
use crate::incoming_call::IncomingCall;
use anyhow::{anyhow, Context, Result};
use collections::{hash_map::Entry, HashMap, HashSet};
use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
@ -59,10 +58,6 @@ pub struct UserStore {
outgoing_contact_requests: Vec<Arc<User>>,
pending_contact_requests: HashMap<u64, usize>,
invite_info: Option<InviteInfo>,
incoming_call: (
watch::Sender<Option<IncomingCall>>,
watch::Receiver<Option<IncomingCall>>,
),
client: Weak<Client>,
http: Arc<dyn HttpClient>,
_maintain_contacts: Task<()>,
@ -112,8 +107,6 @@ impl UserStore {
client.add_message_handler(cx.handle(), Self::handle_update_contacts),
client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
client.add_message_handler(cx.handle(), Self::handle_show_contacts),
client.add_request_handler(cx.handle(), Self::handle_incoming_call),
client.add_message_handler(cx.handle(), Self::handle_cancel_call),
];
Self {
users: Default::default(),
@ -122,7 +115,6 @@ impl UserStore {
incoming_contact_requests: Default::default(),
outgoing_contact_requests: Default::default(),
invite_info: None,
incoming_call: watch::channel(),
client: Arc::downgrade(&client),
update_contacts_tx,
http,
@ -194,60 +186,10 @@ impl UserStore {
Ok(())
}
async fn handle_incoming_call(
this: ModelHandle<Self>,
envelope: TypedEnvelope<proto::IncomingCall>,
_: Arc<Client>,
mut cx: AsyncAppContext,
) -> Result<proto::Ack> {
let call = IncomingCall {
room_id: envelope.payload.room_id,
participants: this
.update(&mut cx, |this, cx| {
this.get_users(envelope.payload.participant_user_ids, cx)
})
.await?,
caller: this
.update(&mut cx, |this, cx| {
this.get_user(envelope.payload.caller_user_id, cx)
})
.await?,
initial_project_id: envelope.payload.initial_project_id,
};
this.update(&mut cx, |this, _| {
*this.incoming_call.0.borrow_mut() = Some(call);
});
Ok(proto::Ack {})
}
async fn handle_cancel_call(
this: ModelHandle<Self>,
_: TypedEnvelope<proto::CancelCall>,
_: Arc<Client>,
mut cx: AsyncAppContext,
) -> Result<()> {
this.update(&mut cx, |this, _| {
*this.incoming_call.0.borrow_mut() = None;
});
Ok(())
}
pub fn invite_info(&self) -> Option<&InviteInfo> {
self.invite_info.as_ref()
}
pub fn incoming_call(&self) -> watch::Receiver<Option<IncomingCall>> {
self.incoming_call.1.clone()
}
pub fn decline_call(&mut self) -> Result<()> {
if let Some(client) = self.client.upgrade() {
client.send(proto::DeclineCall {})?;
}
Ok(())
}
async fn handle_update_contacts(
this: ModelHandle<Self>,
message: TypedEnvelope<proto::UpdateContacts>,

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,7 @@ use workspace::{AppState, JoinProject, ToggleFollow, Workspace};
pub fn init(app_state: Arc<AppState>, cx: &mut MutableAppContext) {
contacts_popover::init(cx);
collab_titlebar_item::init(cx);
incoming_call_notification::init(app_state.user_store.clone(), cx);
incoming_call_notification::init(cx);
project_shared_notification::init(cx);
cx.add_global_action(move |action: &JoinProject, cx| {

View File

@ -1,11 +1,11 @@
use call::ActiveCall;
use client::{incoming_call::IncomingCall, UserStore};
use client::incoming_call::IncomingCall;
use futures::StreamExt;
use gpui::{
elements::*,
geometry::{rect::RectF, vector::vec2f},
impl_internal_actions, Entity, ModelHandle, MouseButton, MutableAppContext, RenderContext,
View, ViewContext, WindowBounds, WindowKind, WindowOptions,
impl_internal_actions, Entity, MouseButton, MutableAppContext, RenderContext, View,
ViewContext, WindowBounds, WindowKind, WindowOptions,
};
use settings::Settings;
use util::ResultExt;
@ -13,10 +13,10 @@ use workspace::JoinProject;
impl_internal_actions!(incoming_call_notification, [RespondToCall]);
pub fn init(user_store: ModelHandle<UserStore>, cx: &mut MutableAppContext) {
pub fn init(cx: &mut MutableAppContext) {
cx.add_action(IncomingCallNotification::respond_to_call);
let mut incoming_call = user_store.read(cx).incoming_call();
let mut incoming_call = ActiveCall::global(cx).read(cx).incoming();
cx.spawn(|mut cx| async move {
let mut notification_window = None;
while let Some(incoming_call) = incoming_call.next().await {
@ -33,7 +33,7 @@ pub fn init(user_store: ModelHandle<UserStore>, cx: &mut MutableAppContext) {
kind: WindowKind::PopUp,
is_movable: false,
},
|_| IncomingCallNotification::new(incoming_call, user_store.clone()),
|_| IncomingCallNotification::new(incoming_call),
);
notification_window = Some(window_id);
}
@ -49,18 +49,17 @@ struct RespondToCall {
pub struct IncomingCallNotification {
call: IncomingCall,
user_store: ModelHandle<UserStore>,
}
impl IncomingCallNotification {
pub fn new(call: IncomingCall, user_store: ModelHandle<UserStore>) -> Self {
Self { call, user_store }
pub fn new(call: IncomingCall) -> Self {
Self { call }
}
fn respond_to_call(&mut self, action: &RespondToCall, cx: &mut ViewContext<Self>) {
let active_call = ActiveCall::global(cx);
if action.accept {
let join = ActiveCall::global(cx)
.update(cx, |active_call, cx| active_call.join(&self.call, cx));
let join = active_call.update(cx, |active_call, cx| active_call.accept_incoming(cx));
let caller_user_id = self.call.caller.id;
let initial_project_id = self.call.initial_project_id;
cx.spawn_weak(|_, mut cx| async move {
@ -77,12 +76,10 @@ impl IncomingCallNotification {
})
.detach_and_log_err(cx);
} else {
self.user_store
.update(cx, |user_store, _| user_store.decline_call().log_err());
active_call.update(cx, |active_call, _| {
active_call.decline_incoming().log_err();
});
}
let window_id = cx.window_id();
cx.remove_window(window_id);
}
fn render_caller(&self, cx: &mut RenderContext<Self>) -> ElementBox {

View File

@ -1906,6 +1906,10 @@ impl MutableAppContext {
})
}
pub fn clear_globals(&mut self) {
self.cx.globals.clear();
}
pub fn add_model<T, F>(&mut self, build_model: F) -> ModelHandle<T>
where
T: Entity,

View File

@ -91,7 +91,7 @@ pub fn run_test(
cx.update(|cx| cx.remove_all_windows());
deterministic.run_until_parked();
cx.update(|_| {}); // flush effects
cx.update(|cx| cx.clear_globals());
leak_detector.lock().detect();
if is_last_iteration {

View File

@ -122,7 +122,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
cx_teardowns.extend(quote!(
#cx_varname.update(|cx| cx.remove_all_windows());
deterministic.run_until_parked();
#cx_varname.update(|_| {}); // flush effects
#cx_varname.update(|cx| cx.clear_globals());
));
inner_fn_args.extend(quote!(&mut #cx_varname,));
continue;