From 4d61d01943f9c1e406d86359dbdf2aca3f0ebe86 Mon Sep 17 00:00:00 2001 From: Mikayla Date: Wed, 4 Oct 2023 11:46:41 -0700 Subject: [PATCH] Add an RPC handler for channel buffer acks co-authored-by: max --- crates/channel/src/channel.rs | 2 +- crates/channel/src/channel_buffer.rs | 2 +- crates/collab/src/rpc.rs | 25 ++++++++++++++++--- .../collab/src/tests/channel_buffer_tests.rs | 21 +++++++++++----- crates/collab/src/tests/test_server.rs | 16 +++++++++++- crates/collab_ui/src/collab_panel.rs | 2 -- 6 files changed, 54 insertions(+), 14 deletions(-) diff --git a/crates/channel/src/channel.rs b/crates/channel/src/channel.rs index 724ff75d60..160b8441ff 100644 --- a/crates/channel/src/channel.rs +++ b/crates/channel/src/channel.rs @@ -2,7 +2,7 @@ mod channel_buffer; mod channel_chat; mod channel_store; -pub use channel_buffer::{ChannelBuffer, ChannelBufferEvent}; +pub use channel_buffer::{ChannelBuffer, ChannelBufferEvent, ACKNOWLEDGE_DEBOUNCE_INTERVAL}; pub use channel_chat::{ChannelChat, ChannelChatEvent, ChannelMessage, ChannelMessageId}; pub use channel_store::{ Channel, ChannelData, ChannelEvent, ChannelId, ChannelMembership, ChannelPath, ChannelStore, diff --git a/crates/channel/src/channel_buffer.rs b/crates/channel/src/channel_buffer.rs index a097cc5467..7de8b956f1 100644 --- a/crates/channel/src/channel_buffer.rs +++ b/crates/channel/src/channel_buffer.rs @@ -11,7 +11,7 @@ use rpc::{ use std::{sync::Arc, time::Duration}; use util::ResultExt; -const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250); +pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250); pub(crate) fn init(client: &Arc) { client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer); diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 1f0ecce2bf..6171803341 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -3,8 +3,8 @@ mod connection_pool; use crate::{ auth, db::{ - self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, ServerId, User, - UserId, + self, BufferId, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, + ServerId, User, UserId, }, executor::Executor, AppState, Result, @@ -275,7 +275,8 @@ impl Server { .add_message_handler(update_followers) .add_message_handler(update_diff_base) .add_request_handler(get_private_user_info) - .add_message_handler(acknowledge_channel_message); + .add_message_handler(acknowledge_channel_message) + .add_message_handler(acknowledge_buffer_version); Arc::new(server) } @@ -2912,6 +2913,24 @@ async fn acknowledge_channel_message( Ok(()) } +async fn acknowledge_buffer_version( + request: proto::AckBufferOperation, + session: Session, +) -> Result<()> { + let buffer_id = BufferId::from_proto(request.buffer_id); + session + .db() + .await + .observe_buffer_version( + buffer_id, + session.user_id, + request.epoch as i32, + &request.version, + ) + .await?; + Ok(()) +} + async fn join_channel_chat( request: proto::JoinChannelChat, response: Response, diff --git a/crates/collab/src/tests/channel_buffer_tests.rs b/crates/collab/src/tests/channel_buffer_tests.rs index 82bd7e6afe..a0b9b52484 100644 --- a/crates/collab/src/tests/channel_buffer_tests.rs +++ b/crates/collab/src/tests/channel_buffer_tests.rs @@ -3,7 +3,7 @@ use crate::{ tests::TestServer, }; use call::ActiveCall; -use channel::Channel; +use channel::{Channel, ACKNOWLEDGE_DEBOUNCE_INTERVAL}; use client::ParticipantIndex; use client::{Collaborator, UserId}; use collab_ui::channel_view::ChannelView; @@ -800,7 +800,6 @@ async fn test_channel_buffer_changes( .has_channel_buffer_changed(channel_id) .unwrap() }); - assert!(has_buffer_changed); // Opening the buffer should clear the changed flag. @@ -810,7 +809,6 @@ async fn test_channel_buffer_changes( .update(|cx| ChannelView::open(channel_id, workspace_b.clone(), cx)) .await .unwrap(); - deterministic.run_until_parked(); let has_buffer_changed = cx_b.read(|cx| { @@ -820,10 +818,9 @@ async fn test_channel_buffer_changes( .has_channel_buffer_changed(channel_id) .unwrap() }); - assert!(!has_buffer_changed); - // Editing the channel while the buffer is open shuold not show that the buffer has changed. + // Editing the channel while the buffer is open should not show that the buffer has changed. channel_buffer_a.update(cx_a, |buffer, cx| { buffer.buffer().update(cx, |buffer, cx| { buffer.edit([(0..0, "2")], None, cx); @@ -838,7 +835,20 @@ async fn test_channel_buffer_changes( .has_channel_buffer_changed(channel_id) .unwrap() }); + assert!(!has_buffer_changed); + deterministic.advance_clock(ACKNOWLEDGE_DEBOUNCE_INTERVAL); + + // Test that the server is tracking things correctly, and we retain our 'not changed' + // state across a disconnect + server.simulate_long_connection_interruption(client_b.peer_id().unwrap(), &deterministic); + let has_buffer_changed = cx_b.read(|cx| { + client_b + .channel_store() + .read(cx) + .has_channel_buffer_changed(channel_id) + .unwrap() + }); assert!(!has_buffer_changed); // Closing the buffer should re-enable change tracking @@ -866,7 +876,6 @@ async fn test_channel_buffer_changes( .has_channel_buffer_changed(channel_id) .unwrap() }); - assert!(has_buffer_changed); } diff --git a/crates/collab/src/tests/test_server.rs b/crates/collab/src/tests/test_server.rs index cf5b58703c..e10ded7d95 100644 --- a/crates/collab/src/tests/test_server.rs +++ b/crates/collab/src/tests/test_server.rs @@ -1,7 +1,7 @@ use crate::{ db::{tests::TestDb, NewUserParams, UserId}, executor::Executor, - rpc::{Server, CLEANUP_TIMEOUT}, + rpc::{Server, CLEANUP_TIMEOUT, RECONNECT_TIMEOUT}, AppState, }; use anyhow::anyhow; @@ -17,6 +17,7 @@ use gpui::{executor::Deterministic, ModelHandle, Task, TestAppContext, WindowHan use language::LanguageRegistry; use parking_lot::Mutex; use project::{Project, WorktreeId}; +use rpc::RECEIVE_TIMEOUT; use settings::SettingsStore; use std::{ cell::{Ref, RefCell, RefMut}, @@ -255,6 +256,19 @@ impl TestServer { .store(true, SeqCst); } + pub fn simulate_long_connection_interruption( + &self, + peer_id: PeerId, + deterministic: &Arc, + ) { + self.forbid_connections(); + self.disconnect_client(peer_id); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); + self.allow_connections(); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); + deterministic.run_until_parked(); + } + pub fn forbid_connections(&self) { self.forbid_connections.store(true, SeqCst); } diff --git a/crates/collab_ui/src/collab_panel.rs b/crates/collab_ui/src/collab_panel.rs index 8933067109..39543c8def 100644 --- a/crates/collab_ui/src/collab_panel.rs +++ b/crates/collab_ui/src/collab_panel.rs @@ -2760,11 +2760,9 @@ impl CollabPanel { .read(cx) .channel_id()?; - dbg!(call_channel, channel.id); Some(call_channel == channel.id) }) .unwrap_or(false); - dbg!(is_active); if is_active { self.open_channel_notes( &OpenChannelNotes {