From d7cea646fc66e3fc97086d7f8b025a04fe2522d6 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 7 Oct 2022 12:21:56 +0200 Subject: [PATCH] Include a `busy` field in `proto::Contact` --- crates/client/src/user.rs | 2 + crates/collab/src/integration_tests.rs | 200 +++++++++++++++++++++++-- crates/collab/src/rpc.rs | 148 ++++++++++-------- crates/collab/src/rpc/store.rs | 9 ++ crates/rpc/proto/zed.proto | 3 +- 5 files changed, 291 insertions(+), 71 deletions(-) diff --git a/crates/client/src/user.rs b/crates/client/src/user.rs index 252fb4d455..e0c5713dab 100644 --- a/crates/client/src/user.rs +++ b/crates/client/src/user.rs @@ -39,6 +39,7 @@ impl Eq for User {} pub struct Contact { pub user: Arc, pub online: bool, + pub busy: bool, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -625,6 +626,7 @@ impl Contact { Ok(Self { user, online: contact.online, + busy: contact.busy, }) } } diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index ba344d0aab..72ebe937ab 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -435,12 +435,14 @@ async fn test_calls_on_multiple_connections( }) .await .unwrap(); + deterministic.run_until_parked(); assert!(incoming_call_b1.next().await.unwrap().is_some()); assert!(incoming_call_b2.next().await.unwrap().is_some()); // User B declines the call on one of the two connections, causing both connections // to stop ringing. active_call_b2.update(cx_b2, |call, _| call.decline_incoming().unwrap()); + deterministic.run_until_parked(); assert!(incoming_call_b1.next().await.unwrap().is_none()); assert!(incoming_call_b2.next().await.unwrap().is_none()); @@ -451,6 +453,7 @@ async fn test_calls_on_multiple_connections( }) .await .unwrap(); + deterministic.run_until_parked(); assert!(incoming_call_b1.next().await.unwrap().is_some()); assert!(incoming_call_b2.next().await.unwrap().is_some()); @@ -460,6 +463,7 @@ async fn test_calls_on_multiple_connections( .update(cx_b2, |call, cx| call.accept_incoming(cx)) .await .unwrap(); + deterministic.run_until_parked(); assert!(incoming_call_b1.next().await.unwrap().is_none()); assert!(incoming_call_b2.next().await.unwrap().is_none()); @@ -472,6 +476,7 @@ async fn test_calls_on_multiple_connections( }) .await .unwrap(); + deterministic.run_until_parked(); assert!(incoming_call_b1.next().await.unwrap().is_some()); assert!(incoming_call_b2.next().await.unwrap().is_some()); @@ -482,6 +487,7 @@ async fn test_calls_on_multiple_connections( }) .await .unwrap(); + deterministic.run_until_parked(); assert!(incoming_call_b1.next().await.unwrap().is_none()); assert!(incoming_call_b2.next().await.unwrap().is_none()); } @@ -4015,19 +4021,31 @@ async fn test_contacts( server .make_contacts(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) .await; + let active_call_a = cx_a.read(ActiveCall::global); + let active_call_b = cx_b.read(ActiveCall::global); + let active_call_c = cx_c.read(ActiveCall::global); deterministic.run_until_parked(); assert_eq!( contacts(&client_a, cx_a), - [("user_b".to_string(), true), ("user_c".to_string(), true)] + [ + ("user_b".to_string(), "online", "free"), + ("user_c".to_string(), "online", "free") + ] ); assert_eq!( contacts(&client_b, cx_b), - [("user_a".to_string(), true), ("user_c".to_string(), true)] + [ + ("user_a".to_string(), "online", "free"), + ("user_c".to_string(), "online", "free") + ] ); assert_eq!( contacts(&client_c, cx_c), - [("user_a".to_string(), true), ("user_b".to_string(), true)] + [ + ("user_a".to_string(), "online", "free"), + ("user_b".to_string(), "online", "free") + ] ); server.disconnect_client(client_c.current_user_id(cx_c)); @@ -4035,11 +4053,17 @@ async fn test_contacts( deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); assert_eq!( contacts(&client_a, cx_a), - [("user_b".to_string(), true), ("user_c".to_string(), false)] + [ + ("user_b".to_string(), "online", "free"), + ("user_c".to_string(), "offline", "free") + ] ); assert_eq!( contacts(&client_b, cx_b), - [("user_a".to_string(), true), ("user_c".to_string(), false)] + [ + ("user_a".to_string(), "online", "free"), + ("user_c".to_string(), "offline", "free") + ] ); assert_eq!(contacts(&client_c, cx_c), []); @@ -4052,24 +4076,180 @@ async fn test_contacts( deterministic.run_until_parked(); assert_eq!( contacts(&client_a, cx_a), - [("user_b".to_string(), true), ("user_c".to_string(), true)] + [ + ("user_b".to_string(), "online", "free"), + ("user_c".to_string(), "online", "free") + ] ); assert_eq!( contacts(&client_b, cx_b), - [("user_a".to_string(), true), ("user_c".to_string(), true)] + [ + ("user_a".to_string(), "online", "free"), + ("user_c".to_string(), "online", "free") + ] ); assert_eq!( contacts(&client_c, cx_c), - [("user_a".to_string(), true), ("user_b".to_string(), true)] + [ + ("user_a".to_string(), "online", "free"), + ("user_b".to_string(), "online", "free") + ] + ); + + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + deterministic.run_until_parked(); + assert_eq!( + contacts(&client_a, cx_a), + [ + ("user_b".to_string(), "online", "busy"), + ("user_c".to_string(), "online", "free") + ] + ); + assert_eq!( + contacts(&client_b, cx_b), + [ + ("user_a".to_string(), "online", "busy"), + ("user_c".to_string(), "online", "free") + ] + ); + assert_eq!( + contacts(&client_c, cx_c), + [ + ("user_a".to_string(), "online", "busy"), + ("user_b".to_string(), "online", "busy") + ] + ); + + active_call_b.update(cx_b, |call, _| call.decline_incoming().unwrap()); + deterministic.run_until_parked(); + assert_eq!( + contacts(&client_a, cx_a), + [ + ("user_b".to_string(), "online", "free"), + ("user_c".to_string(), "online", "free") + ] + ); + assert_eq!( + contacts(&client_b, cx_b), + [ + ("user_a".to_string(), "online", "free"), + ("user_c".to_string(), "online", "free") + ] + ); + assert_eq!( + contacts(&client_c, cx_c), + [ + ("user_a".to_string(), "online", "free"), + ("user_b".to_string(), "online", "free") + ] + ); + + active_call_c + .update(cx_c, |call, cx| { + call.invite(client_a.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + deterministic.run_until_parked(); + assert_eq!( + contacts(&client_a, cx_a), + [ + ("user_b".to_string(), "online", "free"), + ("user_c".to_string(), "online", "busy") + ] + ); + assert_eq!( + contacts(&client_b, cx_b), + [ + ("user_a".to_string(), "online", "busy"), + ("user_c".to_string(), "online", "busy") + ] + ); + assert_eq!( + contacts(&client_c, cx_c), + [ + ("user_a".to_string(), "online", "busy"), + ("user_b".to_string(), "online", "free") + ] + ); + + active_call_a + .update(cx_a, |call, cx| call.accept_incoming(cx)) + .await + .unwrap(); + deterministic.run_until_parked(); + assert_eq!( + contacts(&client_a, cx_a), + [ + ("user_b".to_string(), "online", "free"), + ("user_c".to_string(), "online", "busy") + ] + ); + assert_eq!( + contacts(&client_b, cx_b), + [ + ("user_a".to_string(), "online", "busy"), + ("user_c".to_string(), "online", "busy") + ] + ); + assert_eq!( + contacts(&client_c, cx_c), + [ + ("user_a".to_string(), "online", "busy"), + ("user_b".to_string(), "online", "free") + ] + ); + + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + deterministic.run_until_parked(); + assert_eq!( + contacts(&client_a, cx_a), + [ + ("user_b".to_string(), "online", "busy"), + ("user_c".to_string(), "online", "busy") + ] + ); + assert_eq!( + contacts(&client_b, cx_b), + [ + ("user_a".to_string(), "online", "busy"), + ("user_c".to_string(), "online", "busy") + ] + ); + assert_eq!( + contacts(&client_c, cx_c), + [ + ("user_a".to_string(), "online", "busy"), + ("user_b".to_string(), "online", "busy") + ] ); #[allow(clippy::type_complexity)] - fn contacts(client: &TestClient, cx: &TestAppContext) -> Vec<(String, bool)> { + fn contacts( + client: &TestClient, + cx: &TestAppContext, + ) -> Vec<(String, &'static str, &'static str)> { client.user_store.read_with(cx, |store, _| { store .contacts() .iter() - .map(|contact| (contact.user.github_login.clone(), contact.online)) + .map(|contact| { + ( + contact.user.github_login.clone(), + if contact.online { "online" } else { "offline" }, + if contact.busy { "busy" } else { "free" }, + ) + }) .collect() }) } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 50d1c82fc8..bd7afba775 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -585,8 +585,15 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { - let room_id = self.store().await.create_room(request.sender_id)?; + let user_id; + let room_id; + { + let mut store = self.store().await; + user_id = store.user_id_for_connection(request.sender_id)?; + room_id = store.create_room(request.sender_id)?; + } response.send(proto::CreateRoomResponse { id: room_id })?; + self.update_user_contacts(user_id).await?; Ok(()) } @@ -595,61 +602,71 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { - let room_id = request.payload.id; - let mut store = self.store().await; - let (room, recipient_connection_ids) = store.join_room(room_id, request.sender_id)?; - for recipient_id in recipient_connection_ids { - self.peer - .send(recipient_id, proto::CallCanceled {}) - .trace_err(); + let user_id; + { + let mut store = self.store().await; + user_id = store.user_id_for_connection(request.sender_id)?; + let (room, recipient_connection_ids) = + store.join_room(request.payload.id, request.sender_id)?; + for recipient_id in recipient_connection_ids { + self.peer + .send(recipient_id, proto::CallCanceled {}) + .trace_err(); + } + response.send(proto::JoinRoomResponse { + room: Some(room.clone()), + })?; + self.room_updated(room); } - response.send(proto::JoinRoomResponse { - room: Some(room.clone()), - })?; - self.room_updated(room); + self.update_user_contacts(user_id).await?; Ok(()) } async fn leave_room(self: Arc, message: TypedEnvelope) -> Result<()> { - let room_id = message.payload.id; - let mut store = self.store().await; - let left_room = store.leave_room(room_id, message.sender_id)?; + let user_id; + { + let mut store = self.store().await; + user_id = store.user_id_for_connection(message.sender_id)?; + let left_room = store.leave_room(message.payload.id, message.sender_id)?; - for project in left_room.unshared_projects { - for connection_id in project.connection_ids() { - self.peer.send( - connection_id, - proto::UnshareProject { - project_id: project.id.to_proto(), - }, - )?; - } - } - - for project in left_room.left_projects { - if project.remove_collaborator { - for connection_id in project.connection_ids { + for project in left_room.unshared_projects { + for connection_id in project.connection_ids() { self.peer.send( connection_id, - proto::RemoveProjectCollaborator { + proto::UnshareProject { project_id: project.id.to_proto(), - peer_id: message.sender_id.0, }, )?; } + } - self.peer.send( - message.sender_id, - proto::UnshareProject { - project_id: project.id.to_proto(), - }, - )?; + for project in left_room.left_projects { + if project.remove_collaborator { + for connection_id in project.connection_ids { + self.peer.send( + connection_id, + proto::RemoveProjectCollaborator { + project_id: project.id.to_proto(), + peer_id: message.sender_id.0, + }, + )?; + } + + self.peer.send( + message.sender_id, + proto::UnshareProject { + project_id: project.id.to_proto(), + }, + )?; + } + } + + if let Some(room) = left_room.room { + self.room_updated(room); } } + self.update_user_contacts(user_id).await?; - if let Some(room) = left_room.room { - self.room_updated(room); - } Ok(()) } @@ -694,6 +711,7 @@ impl Server { }) .collect::>() }; + self.update_user_contacts(recipient_user_id).await?; while let Some(call_response) = calls.next().await { match call_response.as_ref() { @@ -712,6 +730,7 @@ impl Server { let room = store.call_failed(room_id, recipient_user_id)?; self.room_updated(&room); } + self.update_user_contacts(recipient_user_id).await?; Err(anyhow!("failed to ring call recipient"))? } @@ -721,19 +740,23 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { - let mut store = self.store().await; - let (room, recipient_connection_ids) = store.cancel_call( - request.payload.room_id, - UserId::from_proto(request.payload.recipient_user_id), - request.sender_id, - )?; - for recipient_id in recipient_connection_ids { - self.peer - .send(recipient_id, proto::CallCanceled {}) - .trace_err(); + let recipient_user_id = UserId::from_proto(request.payload.recipient_user_id); + { + let mut store = self.store().await; + let (room, recipient_connection_ids) = store.cancel_call( + request.payload.room_id, + recipient_user_id, + request.sender_id, + )?; + for recipient_id in recipient_connection_ids { + self.peer + .send(recipient_id, proto::CallCanceled {}) + .trace_err(); + } + self.room_updated(room); + response.send(proto::Ack {})?; } - self.room_updated(room); - response.send(proto::Ack {})?; + self.update_user_contacts(recipient_user_id).await?; Ok(()) } @@ -741,15 +764,20 @@ impl Server { self: Arc, message: TypedEnvelope, ) -> Result<()> { - let mut store = self.store().await; - let (room, recipient_connection_ids) = - store.decline_call(message.payload.room_id, message.sender_id)?; - for recipient_id in recipient_connection_ids { - self.peer - .send(recipient_id, proto::CallCanceled {}) - .trace_err(); + let recipient_user_id; + { + let mut store = self.store().await; + recipient_user_id = store.user_id_for_connection(message.sender_id)?; + let (room, recipient_connection_ids) = + store.decline_call(message.payload.room_id, message.sender_id)?; + for recipient_id in recipient_connection_ids { + self.peer + .send(recipient_id, proto::CallCanceled {}) + .trace_err(); + } + self.room_updated(room); } - self.room_updated(room); + self.update_user_contacts(recipient_user_id).await?; Ok(()) } diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index a9ae91aba0..be7f798685 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -314,6 +314,14 @@ impl Store { .is_empty() } + fn is_user_busy(&self, user_id: UserId) -> bool { + self.connected_users + .get(&user_id) + .unwrap_or(&Default::default()) + .active_call + .is_some() + } + pub fn build_initial_contacts_update( &self, contacts: Vec, @@ -352,6 +360,7 @@ impl Store { proto::Contact { user_id: user_id.to_proto(), online: self.is_user_online(user_id), + busy: self.is_user_busy(user_id), should_notify, } } diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 334bcfbf90..5f62e3585e 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -1023,7 +1023,8 @@ message ChannelMessage { message Contact { uint64 user_id = 1; bool online = 2; - bool should_notify = 3; + bool busy = 3; + bool should_notify = 4; } message WorktreeMetadata {