From 3422eb65e81394329385a44223638fed5ea23f2a Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 7 Sep 2023 11:16:51 -0700 Subject: [PATCH] Restore channel chat model and panel view --- Cargo.lock | 1 + crates/channel/src/channel.rs | 9 +- crates/channel/src/channel_buffer.rs | 14 +- crates/channel/src/channel_chat.rs | 459 ++++++++++++++++++++++ crates/channel/src/channel_store.rs | 88 +++-- crates/channel/src/channel_store_tests.rs | 220 ++++++++++- crates/collab/src/tests/test_server.rs | 2 +- crates/collab_ui/Cargo.toml | 1 + crates/collab_ui/src/channel_view.rs | 11 +- crates/collab_ui/src/chat_panel.rs | 391 ++++++++++++++++++ crates/collab_ui/src/collab_ui.rs | 1 + crates/rpc/proto/zed.proto | 57 ++- crates/rpc/src/proto.rs | 12 + crates/theme/src/theme.rs | 12 + 14 files changed, 1227 insertions(+), 51 deletions(-) create mode 100644 crates/channel/src/channel_chat.rs create mode 100644 crates/collab_ui/src/chat_panel.rs diff --git a/Cargo.lock b/Cargo.lock index 121e9a28dd..6b25b47b75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1553,6 +1553,7 @@ dependencies = [ "settings", "theme", "theme_selector", + "time 0.3.27", "util", "vcs_menu", "workspace", diff --git a/crates/channel/src/channel.rs b/crates/channel/src/channel.rs index 15631b7dd3..b4acc0c25f 100644 --- a/crates/channel/src/channel.rs +++ b/crates/channel/src/channel.rs @@ -1,10 +1,13 @@ +mod channel_buffer; +mod channel_chat; mod channel_store; -pub mod channel_buffer; -use std::sync::Arc; +pub use channel_buffer::{ChannelBuffer, ChannelBufferEvent}; +pub use channel_chat::{ChannelChat, ChannelChatEvent, ChannelMessage, ChannelMessageId}; +pub use channel_store::{Channel, ChannelEvent, ChannelId, ChannelMembership, ChannelStore}; -pub use channel_store::*; use client::Client; +use std::sync::Arc; #[cfg(test)] mod channel_store_tests; diff --git a/crates/channel/src/channel_buffer.rs b/crates/channel/src/channel_buffer.rs index e11282cf79..06f9093fb5 100644 --- a/crates/channel/src/channel_buffer.rs +++ b/crates/channel/src/channel_buffer.rs @@ -23,13 +23,13 @@ pub struct ChannelBuffer { subscription: Option, } -pub enum Event { +pub enum ChannelBufferEvent { CollaboratorsChanged, Disconnected, } impl Entity for ChannelBuffer { - type Event = Event; + type Event = ChannelBufferEvent; fn release(&mut self, _: &mut AppContext) { if self.connected { @@ -101,7 +101,7 @@ impl ChannelBuffer { } } self.collaborators = collaborators; - cx.emit(Event::CollaboratorsChanged); + cx.emit(ChannelBufferEvent::CollaboratorsChanged); cx.notify(); } @@ -141,7 +141,7 @@ impl ChannelBuffer { this.update(&mut cx, |this, cx| { this.collaborators.push(collaborator); - cx.emit(Event::CollaboratorsChanged); + cx.emit(ChannelBufferEvent::CollaboratorsChanged); cx.notify(); }); @@ -165,7 +165,7 @@ impl ChannelBuffer { true } }); - cx.emit(Event::CollaboratorsChanged); + cx.emit(ChannelBufferEvent::CollaboratorsChanged); cx.notify(); }); @@ -185,7 +185,7 @@ impl ChannelBuffer { break; } } - cx.emit(Event::CollaboratorsChanged); + cx.emit(ChannelBufferEvent::CollaboratorsChanged); cx.notify(); }); @@ -230,7 +230,7 @@ impl ChannelBuffer { if self.connected { self.connected = false; self.subscription.take(); - cx.emit(Event::Disconnected); + cx.emit(ChannelBufferEvent::Disconnected); cx.notify() } } diff --git a/crates/channel/src/channel_chat.rs b/crates/channel/src/channel_chat.rs new file mode 100644 index 0000000000..ebe491e5b8 --- /dev/null +++ b/crates/channel/src/channel_chat.rs @@ -0,0 +1,459 @@ +use crate::Channel; +use anyhow::{anyhow, Result}; +use client::{ + proto, + user::{User, UserStore}, + Client, Subscription, TypedEnvelope, +}; +use futures::lock::Mutex; +use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task}; +use rand::prelude::*; +use std::{collections::HashSet, mem, ops::Range, sync::Arc}; +use sum_tree::{Bias, SumTree}; +use time::OffsetDateTime; +use util::{post_inc, ResultExt as _, TryFutureExt}; + +pub struct ChannelChat { + channel: Arc, + messages: SumTree, + loaded_all_messages: bool, + next_pending_message_id: usize, + user_store: ModelHandle, + rpc: Arc, + outgoing_messages_lock: Arc>, + rng: StdRng, + _subscription: Subscription, +} + +#[derive(Clone, Debug)] +pub struct ChannelMessage { + pub id: ChannelMessageId, + pub body: String, + pub timestamp: OffsetDateTime, + pub sender: Arc, + pub nonce: u128, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum ChannelMessageId { + Saved(u64), + Pending(usize), +} + +#[derive(Clone, Debug, Default)] +pub struct ChannelMessageSummary { + max_id: ChannelMessageId, + count: usize, +} + +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] +struct Count(usize); + +#[derive(Clone, Debug, PartialEq)] +pub enum ChannelChatEvent { + MessagesUpdated { + old_range: Range, + new_count: usize, + }, +} + +impl Entity for ChannelChat { + type Event = ChannelChatEvent; + + fn release(&mut self, _: &mut AppContext) { + self.rpc + .send(proto::LeaveChannelChat { + channel_id: self.channel.id, + }) + .log_err(); + } +} + +impl ChannelChat { + pub fn init(rpc: &Arc) { + rpc.add_model_message_handler(Self::handle_message_sent); + } + + pub async fn new( + channel: Arc, + user_store: ModelHandle, + client: Arc, + mut cx: AsyncAppContext, + ) -> Result> { + let channel_id = channel.id; + let subscription = client.subscribe_to_entity(channel_id).unwrap(); + + let response = client + .request(proto::JoinChannelChat { channel_id }) + .await?; + let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?; + let loaded_all_messages = response.done; + + Ok(cx.add_model(|cx| { + let mut this = Self { + channel, + user_store, + rpc: client, + outgoing_messages_lock: Default::default(), + messages: Default::default(), + loaded_all_messages, + next_pending_message_id: 0, + rng: StdRng::from_entropy(), + _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()), + }; + this.insert_messages(messages, cx); + this + })) + } + + pub fn name(&self) -> &str { + &self.channel.name + } + + pub fn send_message( + &mut self, + body: String, + cx: &mut ModelContext, + ) -> Result>> { + if body.is_empty() { + Err(anyhow!("message body can't be empty"))?; + } + + let current_user = self + .user_store + .read(cx) + .current_user() + .ok_or_else(|| anyhow!("current_user is not present"))?; + + let channel_id = self.channel.id; + let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id)); + let nonce = self.rng.gen(); + self.insert_messages( + SumTree::from_item( + ChannelMessage { + id: pending_id, + body: body.clone(), + sender: current_user, + timestamp: OffsetDateTime::now_utc(), + nonce, + }, + &(), + ), + cx, + ); + let user_store = self.user_store.clone(); + let rpc = self.rpc.clone(); + let outgoing_messages_lock = self.outgoing_messages_lock.clone(); + Ok(cx.spawn(|this, mut cx| async move { + let outgoing_message_guard = outgoing_messages_lock.lock().await; + let request = rpc.request(proto::SendChannelMessage { + channel_id, + body, + nonce: Some(nonce.into()), + }); + let response = request.await?; + drop(outgoing_message_guard); + let message = ChannelMessage::from_proto( + response.message.ok_or_else(|| anyhow!("invalid message"))?, + &user_store, + &mut cx, + ) + .await?; + this.update(&mut cx, |this, cx| { + this.insert_messages(SumTree::from_item(message, &()), cx); + Ok(()) + }) + })) + } + + pub fn load_more_messages(&mut self, cx: &mut ModelContext) -> bool { + if !self.loaded_all_messages { + let rpc = self.rpc.clone(); + let user_store = self.user_store.clone(); + let channel_id = self.channel.id; + if let Some(before_message_id) = + self.messages.first().and_then(|message| match message.id { + ChannelMessageId::Saved(id) => Some(id), + ChannelMessageId::Pending(_) => None, + }) + { + cx.spawn(|this, mut cx| { + async move { + let response = rpc + .request(proto::GetChannelMessages { + channel_id, + before_message_id, + }) + .await?; + let loaded_all_messages = response.done; + let messages = + messages_from_proto(response.messages, &user_store, &mut cx).await?; + this.update(&mut cx, |this, cx| { + this.loaded_all_messages = loaded_all_messages; + this.insert_messages(messages, cx); + }); + anyhow::Ok(()) + } + .log_err() + }) + .detach(); + return true; + } + } + false + } + + pub fn rejoin(&mut self, cx: &mut ModelContext) { + let user_store = self.user_store.clone(); + let rpc = self.rpc.clone(); + let channel_id = self.channel.id; + cx.spawn(|this, mut cx| { + async move { + let response = rpc.request(proto::JoinChannelChat { channel_id }).await?; + let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?; + let loaded_all_messages = response.done; + + let pending_messages = this.update(&mut cx, |this, cx| { + if let Some((first_new_message, last_old_message)) = + messages.first().zip(this.messages.last()) + { + if first_new_message.id > last_old_message.id { + let old_messages = mem::take(&mut this.messages); + cx.emit(ChannelChatEvent::MessagesUpdated { + old_range: 0..old_messages.summary().count, + new_count: 0, + }); + this.loaded_all_messages = loaded_all_messages; + } + } + + this.insert_messages(messages, cx); + if loaded_all_messages { + this.loaded_all_messages = loaded_all_messages; + } + + this.pending_messages().cloned().collect::>() + }); + + for pending_message in pending_messages { + let request = rpc.request(proto::SendChannelMessage { + channel_id, + body: pending_message.body, + nonce: Some(pending_message.nonce.into()), + }); + let response = request.await?; + let message = ChannelMessage::from_proto( + response.message.ok_or_else(|| anyhow!("invalid message"))?, + &user_store, + &mut cx, + ) + .await?; + this.update(&mut cx, |this, cx| { + this.insert_messages(SumTree::from_item(message, &()), cx); + }); + } + + anyhow::Ok(()) + } + .log_err() + }) + .detach(); + } + + pub fn message_count(&self) -> usize { + self.messages.summary().count + } + + pub fn messages(&self) -> &SumTree { + &self.messages + } + + pub fn message(&self, ix: usize) -> &ChannelMessage { + let mut cursor = self.messages.cursor::(); + cursor.seek(&Count(ix), Bias::Right, &()); + cursor.item().unwrap() + } + + pub fn messages_in_range(&self, range: Range) -> impl Iterator { + let mut cursor = self.messages.cursor::(); + cursor.seek(&Count(range.start), Bias::Right, &()); + cursor.take(range.len()) + } + + pub fn pending_messages(&self) -> impl Iterator { + let mut cursor = self.messages.cursor::(); + cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &()); + cursor + } + + async fn handle_message_sent( + this: ModelHandle, + message: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + let user_store = this.read_with(&cx, |this, _| this.user_store.clone()); + let message = message + .payload + .message + .ok_or_else(|| anyhow!("empty message"))?; + + let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?; + this.update(&mut cx, |this, cx| { + this.insert_messages(SumTree::from_item(message, &()), cx) + }); + + Ok(()) + } + + fn insert_messages(&mut self, messages: SumTree, cx: &mut ModelContext) { + if let Some((first_message, last_message)) = messages.first().zip(messages.last()) { + let nonces = messages + .cursor::<()>() + .map(|m| m.nonce) + .collect::>(); + + let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>(); + let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &()); + let start_ix = old_cursor.start().1 .0; + let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &()); + let removed_count = removed_messages.summary().count; + let new_count = messages.summary().count; + let end_ix = start_ix + removed_count; + + new_messages.append(messages, &()); + + let mut ranges = Vec::>::new(); + if new_messages.last().unwrap().is_pending() { + new_messages.append(old_cursor.suffix(&()), &()); + } else { + new_messages.append( + old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()), + &(), + ); + + while let Some(message) = old_cursor.item() { + let message_ix = old_cursor.start().1 .0; + if nonces.contains(&message.nonce) { + if ranges.last().map_or(false, |r| r.end == message_ix) { + ranges.last_mut().unwrap().end += 1; + } else { + ranges.push(message_ix..message_ix + 1); + } + } else { + new_messages.push(message.clone(), &()); + } + old_cursor.next(&()); + } + } + + drop(old_cursor); + self.messages = new_messages; + + for range in ranges.into_iter().rev() { + cx.emit(ChannelChatEvent::MessagesUpdated { + old_range: range, + new_count: 0, + }); + } + cx.emit(ChannelChatEvent::MessagesUpdated { + old_range: start_ix..end_ix, + new_count, + }); + cx.notify(); + } + } +} + +async fn messages_from_proto( + proto_messages: Vec, + user_store: &ModelHandle, + cx: &mut AsyncAppContext, +) -> Result> { + let unique_user_ids = proto_messages + .iter() + .map(|m| m.sender_id) + .collect::>() + .into_iter() + .collect(); + user_store + .update(cx, |user_store, cx| { + user_store.get_users(unique_user_ids, cx) + }) + .await?; + + let mut messages = Vec::with_capacity(proto_messages.len()); + for message in proto_messages { + messages.push(ChannelMessage::from_proto(message, user_store, cx).await?); + } + let mut result = SumTree::new(); + result.extend(messages, &()); + Ok(result) +} + +impl ChannelMessage { + pub async fn from_proto( + message: proto::ChannelMessage, + user_store: &ModelHandle, + cx: &mut AsyncAppContext, + ) -> Result { + let sender = user_store + .update(cx, |user_store, cx| { + user_store.get_user(message.sender_id, cx) + }) + .await?; + Ok(ChannelMessage { + id: ChannelMessageId::Saved(message.id), + body: message.body, + timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?, + sender, + nonce: message + .nonce + .ok_or_else(|| anyhow!("nonce is required"))? + .into(), + }) + } + + pub fn is_pending(&self) -> bool { + matches!(self.id, ChannelMessageId::Pending(_)) + } +} + +impl sum_tree::Item for ChannelMessage { + type Summary = ChannelMessageSummary; + + fn summary(&self) -> Self::Summary { + ChannelMessageSummary { + max_id: self.id, + count: 1, + } + } +} + +impl Default for ChannelMessageId { + fn default() -> Self { + Self::Saved(0) + } +} + +impl sum_tree::Summary for ChannelMessageSummary { + type Context = (); + + fn add_summary(&mut self, summary: &Self, _: &()) { + self.max_id = summary.max_id; + self.count += summary.count; + } +} + +impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId { + fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) { + debug_assert!(summary.max_id > *self); + *self = summary.max_id; + } +} + +impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count { + fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) { + self.0 += summary.count; + } +} diff --git a/crates/channel/src/channel_store.rs b/crates/channel/src/channel_store.rs index a4c8da6df4..6096692ccb 100644 --- a/crates/channel/src/channel_store.rs +++ b/crates/channel/src/channel_store.rs @@ -1,4 +1,4 @@ -use crate::channel_buffer::ChannelBuffer; +use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat}; use anyhow::{anyhow, Result}; use client::{Client, Subscription, User, UserId, UserStore}; use collections::{hash_map, HashMap, HashSet}; @@ -20,7 +20,8 @@ pub struct ChannelStore { channels_with_admin_privileges: HashSet, outgoing_invites: HashSet<(ChannelId, UserId)>, update_channels_tx: mpsc::UnboundedSender, - opened_buffers: HashMap, + opened_buffers: HashMap>, + opened_chats: HashMap>, client: Arc, user_store: ModelHandle, _rpc_subscription: Subscription, @@ -50,15 +51,9 @@ impl Entity for ChannelStore { type Event = ChannelEvent; } -pub enum ChannelMemberStatus { - Invited, - Member, - NotMember, -} - -enum OpenedChannelBuffer { - Open(WeakModelHandle), - Loading(Shared, Arc>>>), +enum OpenedModelHandle { + Open(WeakModelHandle), + Loading(Shared, Arc>>>), } impl ChannelStore { @@ -94,6 +89,7 @@ impl ChannelStore { channels_with_admin_privileges: Default::default(), outgoing_invites: Default::default(), opened_buffers: Default::default(), + opened_chats: Default::default(), update_channels_tx, client, user_store, @@ -154,7 +150,7 @@ impl ChannelStore { pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool { if let Some(buffer) = self.opened_buffers.get(&channel_id) { - if let OpenedChannelBuffer::Open(buffer) = buffer { + if let OpenedModelHandle::Open(buffer) = buffer { return buffer.upgrade(cx).is_some(); } } @@ -166,24 +162,58 @@ impl ChannelStore { channel_id: ChannelId, cx: &mut ModelContext, ) -> Task>> { - // Make sure that a given channel buffer is only opened once per + let client = self.client.clone(); + self.open_channel_resource( + channel_id, + |this| &mut this.opened_buffers, + |channel, cx| ChannelBuffer::new(channel, client, cx), + cx, + ) + } + + pub fn open_channel_chat( + &mut self, + channel_id: ChannelId, + cx: &mut ModelContext, + ) -> Task>> { + let client = self.client.clone(); + let user_store = self.user_store.clone(); + self.open_channel_resource( + channel_id, + |this| &mut this.opened_chats, + |channel, cx| ChannelChat::new(channel, user_store, client, cx), + cx, + ) + } + + fn open_channel_resource( + &mut self, + channel_id: ChannelId, + map: fn(&mut Self) -> &mut HashMap>, + load: F, + cx: &mut ModelContext, + ) -> Task>> + where + F: 'static + FnOnce(Arc, AsyncAppContext) -> Fut, + Fut: Future>>, + { + // Make sure that a given channel resource is only opened once per // app instance, even if this method is called multiple times // with the same channel id while the first task is still running. let task = loop { - match self.opened_buffers.entry(channel_id) { + match map(self).entry(channel_id) { hash_map::Entry::Occupied(e) => match e.get() { - OpenedChannelBuffer::Open(buffer) => { + OpenedModelHandle::Open(buffer) => { if let Some(buffer) = buffer.upgrade(cx) { break Task::ready(Ok(buffer)).shared(); } else { - self.opened_buffers.remove(&channel_id); + map(self).remove(&channel_id); continue; } } - OpenedChannelBuffer::Loading(task) => break task.clone(), + OpenedModelHandle::Loading(task) => break task.clone(), }, hash_map::Entry::Vacant(e) => { - let client = self.client.clone(); let task = cx .spawn(|this, cx| async move { let channel = this.read_with(&cx, |this, _| { @@ -192,12 +222,10 @@ impl ChannelStore { }) })?; - ChannelBuffer::new(channel, client, cx) - .await - .map_err(Arc::new) + load(channel, cx).await.map_err(Arc::new) }) .shared(); - e.insert(OpenedChannelBuffer::Loading(task.clone())); + e.insert(OpenedModelHandle::Loading(task.clone())); cx.spawn({ let task = task.clone(); |this, mut cx| async move { @@ -208,14 +236,14 @@ impl ChannelStore { this.opened_buffers.remove(&channel_id); }) .detach(); - this.opened_buffers.insert( + map(this).insert( channel_id, - OpenedChannelBuffer::Open(buffer.downgrade()), + OpenedModelHandle::Open(buffer.downgrade()), ); } Err(error) => { log::error!("failed to open channel buffer {error:?}"); - this.opened_buffers.remove(&channel_id); + map(this).remove(&channel_id); } }); } @@ -496,7 +524,7 @@ impl ChannelStore { let mut buffer_versions = Vec::new(); for buffer in self.opened_buffers.values() { - if let OpenedChannelBuffer::Open(buffer) = buffer { + if let OpenedModelHandle::Open(buffer) = buffer { if let Some(buffer) = buffer.upgrade(cx) { let channel_buffer = buffer.read(cx); let buffer = channel_buffer.buffer().read(cx); @@ -522,7 +550,7 @@ impl ChannelStore { this.update(&mut cx, |this, cx| { this.opened_buffers.retain(|_, buffer| match buffer { - OpenedChannelBuffer::Open(channel_buffer) => { + OpenedModelHandle::Open(channel_buffer) => { let Some(channel_buffer) = channel_buffer.upgrade(cx) else { return false; }; @@ -583,7 +611,7 @@ impl ChannelStore { false }) } - OpenedChannelBuffer::Loading(_) => true, + OpenedModelHandle::Loading(_) => true, }); }); anyhow::Ok(()) @@ -605,7 +633,7 @@ impl ChannelStore { if let Some(this) = this.upgrade(&cx) { this.update(&mut cx, |this, cx| { for (_, buffer) in this.opened_buffers.drain() { - if let OpenedChannelBuffer::Open(buffer) = buffer { + if let OpenedModelHandle::Open(buffer) = buffer { if let Some(buffer) = buffer.upgrade(cx) { buffer.update(cx, |buffer, cx| buffer.disconnect(cx)); } @@ -654,7 +682,7 @@ impl ChannelStore { for channel_id in &payload.remove_channels { let channel_id = *channel_id; - if let Some(OpenedChannelBuffer::Open(buffer)) = + if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.remove(&channel_id) { if let Some(buffer) = buffer.upgrade(cx) { diff --git a/crates/channel/src/channel_store_tests.rs b/crates/channel/src/channel_store_tests.rs index 18894b1f47..3ae899ecde 100644 --- a/crates/channel/src/channel_store_tests.rs +++ b/crates/channel/src/channel_store_tests.rs @@ -1,6 +1,8 @@ +use crate::channel_chat::ChannelChatEvent; + use super::*; -use client::{Client, UserStore}; -use gpui::{AppContext, ModelHandle}; +use client::{test::FakeServer, Client, UserStore}; +use gpui::{AppContext, ModelHandle, TestAppContext}; use rpc::proto; use util::http::FakeHttpClient; @@ -137,6 +139,220 @@ fn test_dangling_channel_paths(cx: &mut AppContext) { assert_channels(&channel_store, &[(0, "a".to_string(), true)], cx); } +#[gpui::test] +async fn test_channel_messages(cx: &mut TestAppContext) { + cx.foreground().forbid_parking(); + + let user_id = 5; + let http_client = FakeHttpClient::with_404_response(); + let client = cx.update(|cx| Client::new(http_client.clone(), cx)); + let server = FakeServer::for_client(user_id, &client, cx).await; + let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx)); + crate::init(&client); + + let channel_store = cx.add_model(|cx| ChannelStore::new(client, user_store, cx)); + + let channel_id = 5; + + // Get the available channels. + server.send(proto::UpdateChannels { + channels: vec![proto::Channel { + id: channel_id, + name: "the-channel".to_string(), + parent_id: None, + }], + ..Default::default() + }); + channel_store.next_notification(cx).await; + cx.read(|cx| { + assert_channels(&channel_store, &[(0, "the-channel".to_string(), false)], cx); + }); + + let get_users = server.receive::().await.unwrap(); + assert_eq!(get_users.payload.user_ids, vec![5]); + server + .respond( + get_users.receipt(), + proto::UsersResponse { + users: vec![proto::User { + id: 5, + github_login: "nathansobo".into(), + avatar_url: "http://avatar.com/nathansobo".into(), + }], + }, + ) + .await; + + // Join a channel and populate its existing messages. + let channel = channel_store + .update(cx, |store, cx| { + let channel_id = store.channels().next().unwrap().1.id; + store.open_channel_chat(channel_id, cx) + }) + .await + .unwrap(); + channel.read_with(cx, |channel, _| assert!(channel.messages().is_empty())); + let join_channel = server.receive::().await.unwrap(); + server + .respond( + join_channel.receipt(), + proto::JoinChannelChatResponse { + messages: vec![ + proto::ChannelMessage { + id: 10, + body: "a".into(), + timestamp: 1000, + sender_id: 5, + nonce: Some(1.into()), + }, + proto::ChannelMessage { + id: 11, + body: "b".into(), + timestamp: 1001, + sender_id: 6, + nonce: Some(2.into()), + }, + ], + done: false, + }, + ) + .await; + + // Client requests all users for the received messages + let mut get_users = server.receive::().await.unwrap(); + get_users.payload.user_ids.sort(); + assert_eq!(get_users.payload.user_ids, vec![6]); + server + .respond( + get_users.receipt(), + proto::UsersResponse { + users: vec![proto::User { + id: 6, + github_login: "maxbrunsfeld".into(), + avatar_url: "http://avatar.com/maxbrunsfeld".into(), + }], + }, + ) + .await; + + assert_eq!( + channel.next_event(cx).await, + ChannelChatEvent::MessagesUpdated { + old_range: 0..0, + new_count: 2, + } + ); + channel.read_with(cx, |channel, _| { + assert_eq!( + channel + .messages_in_range(0..2) + .map(|message| (message.sender.github_login.clone(), message.body.clone())) + .collect::>(), + &[ + ("nathansobo".into(), "a".into()), + ("maxbrunsfeld".into(), "b".into()) + ] + ); + }); + + // Receive a new message. + server.send(proto::ChannelMessageSent { + channel_id, + message: Some(proto::ChannelMessage { + id: 12, + body: "c".into(), + timestamp: 1002, + sender_id: 7, + nonce: Some(3.into()), + }), + }); + + // Client requests user for message since they haven't seen them yet + let get_users = server.receive::().await.unwrap(); + assert_eq!(get_users.payload.user_ids, vec![7]); + server + .respond( + get_users.receipt(), + proto::UsersResponse { + users: vec![proto::User { + id: 7, + github_login: "as-cii".into(), + avatar_url: "http://avatar.com/as-cii".into(), + }], + }, + ) + .await; + + assert_eq!( + channel.next_event(cx).await, + ChannelChatEvent::MessagesUpdated { + old_range: 2..2, + new_count: 1, + } + ); + channel.read_with(cx, |channel, _| { + assert_eq!( + channel + .messages_in_range(2..3) + .map(|message| (message.sender.github_login.clone(), message.body.clone())) + .collect::>(), + &[("as-cii".into(), "c".into())] + ) + }); + + // Scroll up to view older messages. + channel.update(cx, |channel, cx| { + assert!(channel.load_more_messages(cx)); + }); + let get_messages = server.receive::().await.unwrap(); + assert_eq!(get_messages.payload.channel_id, 5); + assert_eq!(get_messages.payload.before_message_id, 10); + server + .respond( + get_messages.receipt(), + proto::GetChannelMessagesResponse { + done: true, + messages: vec![ + proto::ChannelMessage { + id: 8, + body: "y".into(), + timestamp: 998, + sender_id: 5, + nonce: Some(4.into()), + }, + proto::ChannelMessage { + id: 9, + body: "z".into(), + timestamp: 999, + sender_id: 6, + nonce: Some(5.into()), + }, + ], + }, + ) + .await; + + assert_eq!( + channel.next_event(cx).await, + ChannelChatEvent::MessagesUpdated { + old_range: 0..0, + new_count: 2, + } + ); + channel.read_with(cx, |channel, _| { + assert_eq!( + channel + .messages_in_range(0..2) + .map(|message| (message.sender.github_login.clone(), message.body.clone())) + .collect::>(), + &[ + ("nathansobo".into(), "y".into()), + ("maxbrunsfeld".into(), "z".into()) + ] + ); + }); +} + fn update_channels( channel_store: &ModelHandle, message: proto::UpdateChannels, diff --git a/crates/collab/src/tests/test_server.rs b/crates/collab/src/tests/test_server.rs index eef1dde967..a7dbd97239 100644 --- a/crates/collab/src/tests/test_server.rs +++ b/crates/collab/src/tests/test_server.rs @@ -6,7 +6,7 @@ use crate::{ }; use anyhow::anyhow; use call::ActiveCall; -use channel::{channel_buffer::ChannelBuffer, ChannelStore}; +use channel::{ChannelBuffer, ChannelStore}; use client::{ self, proto::PeerId, Client, Connection, Credentials, EstablishConnectionError, UserStore, }; diff --git a/crates/collab_ui/Cargo.toml b/crates/collab_ui/Cargo.toml index da32308558..0a52b9a19f 100644 --- a/crates/collab_ui/Cargo.toml +++ b/crates/collab_ui/Cargo.toml @@ -55,6 +55,7 @@ schemars.workspace = true postage.workspace = true serde.workspace = true serde_derive.workspace = true +time.workspace = true [dev-dependencies] call = { path = "../call", features = ["test-support"] } diff --git a/crates/collab_ui/src/channel_view.rs b/crates/collab_ui/src/channel_view.rs index 5086cc8b37..76b79eaf39 100644 --- a/crates/collab_ui/src/channel_view.rs +++ b/crates/collab_ui/src/channel_view.rs @@ -1,8 +1,5 @@ use anyhow::{anyhow, Result}; -use channel::{ - channel_buffer::{self, ChannelBuffer}, - ChannelId, -}; +use channel::{ChannelBuffer, ChannelBufferEvent, ChannelId}; use client::proto; use clock::ReplicaId; use collections::HashMap; @@ -118,14 +115,14 @@ impl ChannelView { fn handle_channel_buffer_event( &mut self, _: ModelHandle, - event: &channel_buffer::Event, + event: &ChannelBufferEvent, cx: &mut ViewContext, ) { match event { - channel_buffer::Event::CollaboratorsChanged => { + ChannelBufferEvent::CollaboratorsChanged => { self.refresh_replica_id_map(cx); } - channel_buffer::Event::Disconnected => self.editor.update(cx, |editor, cx| { + ChannelBufferEvent::Disconnected => self.editor.update(cx, |editor, cx| { editor.set_read_only(true); cx.notify(); }), diff --git a/crates/collab_ui/src/chat_panel.rs b/crates/collab_ui/src/chat_panel.rs new file mode 100644 index 0000000000..0105a9d27b --- /dev/null +++ b/crates/collab_ui/src/chat_panel.rs @@ -0,0 +1,391 @@ +use channel::{ChannelChat, ChannelChatEvent, ChannelMessage, ChannelStore}; +use client::Client; +use editor::Editor; +use gpui::{ + actions, + elements::*, + platform::{CursorStyle, MouseButton}, + views::{ItemType, Select, SelectStyle}, + AnyViewHandle, AppContext, Entity, ModelHandle, Subscription, View, ViewContext, ViewHandle, +}; +use language::language_settings::SoftWrap; +use menu::Confirm; +use std::sync::Arc; +use theme::Theme; +use time::{OffsetDateTime, UtcOffset}; +use util::{ResultExt, TryFutureExt}; + +const MESSAGE_LOADING_THRESHOLD: usize = 50; + +pub struct ChatPanel { + client: Arc, + channel_store: ModelHandle, + active_channel: Option<(ModelHandle, Subscription)>, + message_list: ListState, + input_editor: ViewHandle, + channel_select: ViewHandle { + let channel = &channel_store.read(cx).channel_at_index(ix).unwrap().1; + let theme = match (item_type, is_hovered) { + (ItemType::Header, _) => &theme.header, + (ItemType::Selected, false) => &theme.active_item, + (ItemType::Selected, true) => &theme.hovered_active_item, + (ItemType::Unselected, false) => &theme.item, + (ItemType::Unselected, true) => &theme.hovered_item, + }; + Flex::row() + .with_child( + Label::new("#".to_string(), theme.hash.text.clone()) + .contained() + .with_style(theme.hash.container), + ) + .with_child(Label::new(channel.name.clone(), theme.name.clone())) + .contained() + .with_style(theme.container) + .into_any() + } + + fn render_sign_in_prompt( + &self, + theme: &Arc, + cx: &mut ViewContext, + ) -> AnyElement { + enum SignInPromptLabel {} + + MouseEventHandler::new::(0, cx, |mouse_state, _| { + Label::new( + "Sign in to use chat".to_string(), + theme + .chat_panel + .sign_in_prompt + .style_for(mouse_state) + .clone(), + ) + }) + .with_cursor_style(CursorStyle::PointingHand) + .on_click(MouseButton::Left, move |_, this, cx| { + let client = this.client.clone(); + cx.spawn(|this, mut cx| async move { + if client + .authenticate_and_connect(true, &cx) + .log_err() + .await + .is_some() + { + this.update(&mut cx, |this, cx| { + if cx.handle().is_focused(cx) { + cx.focus(&this.input_editor); + } + }) + .ok(); + } + }) + .detach(); + }) + .aligned() + .into_any() + } + + fn send(&mut self, _: &Confirm, cx: &mut ViewContext) { + if let Some((channel, _)) = self.active_channel.as_ref() { + let body = self.input_editor.update(cx, |editor, cx| { + let body = editor.text(cx); + editor.clear(cx); + body + }); + + if let Some(task) = channel + .update(cx, |channel, cx| channel.send_message(body, cx)) + .log_err() + { + task.detach(); + } + } + } + + fn load_more_messages(&mut self, _: &LoadMoreMessages, cx: &mut ViewContext) { + if let Some((channel, _)) = self.active_channel.as_ref() { + channel.update(cx, |channel, cx| { + channel.load_more_messages(cx); + }) + } + } +} + +impl Entity for ChatPanel { + type Event = Event; +} + +impl View for ChatPanel { + fn ui_name() -> &'static str { + "ChatPanel" + } + + fn render(&mut self, cx: &mut ViewContext) -> AnyElement { + let theme = theme::current(cx); + let element = if self.client.user_id().is_some() { + self.render_channel(cx) + } else { + self.render_sign_in_prompt(&theme, cx) + }; + element + .contained() + .with_style(theme.chat_panel.container) + .constrained() + .with_min_width(150.) + .into_any() + } + + fn focus_in(&mut self, _: AnyViewHandle, cx: &mut ViewContext) { + if matches!( + *self.client.status().borrow(), + client::Status::Connected { .. } + ) { + cx.focus(&self.input_editor); + } + } +} + +fn format_timestamp( + mut timestamp: OffsetDateTime, + mut now: OffsetDateTime, + local_timezone: UtcOffset, +) -> String { + timestamp = timestamp.to_offset(local_timezone); + now = now.to_offset(local_timezone); + + let today = now.date(); + let date = timestamp.date(); + let mut hour = timestamp.hour(); + let mut part = "am"; + if hour > 12 { + hour -= 12; + part = "pm"; + } + if date == today { + format!("{:02}:{:02}{}", hour, timestamp.minute(), part) + } else if date.next_day() == Some(today) { + format!("yesterday at {:02}:{:02}{}", hour, timestamp.minute(), part) + } else { + format!("{:02}/{}/{}", date.month() as u32, date.day(), date.year()) + } +} diff --git a/crates/collab_ui/src/collab_ui.rs b/crates/collab_ui/src/collab_ui.rs index ee34f600fa..b40ecc6992 100644 --- a/crates/collab_ui/src/collab_ui.rs +++ b/crates/collab_ui/src/collab_ui.rs @@ -1,4 +1,5 @@ pub mod channel_view; +pub mod chat_panel; pub mod collab_panel; mod collab_titlebar_item; mod contact_notification; diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 2e96d79f5e..eb26e81b99 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -155,7 +155,16 @@ message Envelope { RemoveChannelBufferCollaborator remove_channel_buffer_collaborator = 136; UpdateChannelBufferCollaborator update_channel_buffer_collaborator = 139; RejoinChannelBuffers rejoin_channel_buffers = 140; - RejoinChannelBuffersResponse rejoin_channel_buffers_response = 141; // Current max + RejoinChannelBuffersResponse rejoin_channel_buffers_response = 141; + + JoinChannelChat join_channel_chat = 142; + JoinChannelChatResponse join_channel_chat_response = 143; + LeaveChannelChat leave_channel_chat = 144; + SendChannelMessage send_channel_message = 145; + SendChannelMessageResponse send_channel_message_response = 146; + ChannelMessageSent channel_message_sent = 147; + GetChannelMessages get_channel_messages = 148; + GetChannelMessagesResponse get_channel_messages_response = 149; // Current max } } @@ -1021,10 +1030,56 @@ message RenameChannel { string name = 2; } +message JoinChannelChat { + uint64 channel_id = 1; +} + +message JoinChannelChatResponse { + repeated ChannelMessage messages = 1; + bool done = 2; +} + +message LeaveChannelChat { + uint64 channel_id = 1; +} + +message SendChannelMessage { + uint64 channel_id = 1; + string body = 2; + Nonce nonce = 3; +} + +message SendChannelMessageResponse { + ChannelMessage message = 1; +} + +message ChannelMessageSent { + uint64 channel_id = 1; + ChannelMessage message = 2; +} + +message GetChannelMessages { + uint64 channel_id = 1; + uint64 before_message_id = 2; +} + +message GetChannelMessagesResponse { + repeated ChannelMessage messages = 1; + bool done = 2; +} + message JoinChannelBuffer { uint64 channel_id = 1; } +message ChannelMessage { + uint64 id = 1; + string body = 2; + uint64 timestamp = 3; + uint64 sender_id = 4; + Nonce nonce = 5; +} + message RejoinChannelBuffers { repeated ChannelBufferVersion buffers = 1; } diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index f643a8c168..7b98f50e75 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -147,6 +147,7 @@ messages!( (CreateBufferForPeer, Foreground), (CreateChannel, Foreground), (ChannelResponse, Foreground), + (ChannelMessageSent, Foreground), (CreateProjectEntry, Foreground), (CreateRoom, Foreground), (CreateRoomResponse, Foreground), @@ -163,6 +164,10 @@ messages!( (GetCodeActionsResponse, Background), (GetHover, Background), (GetHoverResponse, Background), + (GetChannelMessages, Background), + (GetChannelMessagesResponse, Background), + (SendChannelMessage, Background), + (SendChannelMessageResponse, Background), (GetCompletions, Background), (GetCompletionsResponse, Background), (GetDefinition, Background), @@ -184,6 +189,9 @@ messages!( (JoinProjectResponse, Foreground), (JoinRoom, Foreground), (JoinRoomResponse, Foreground), + (JoinChannelChat, Foreground), + (JoinChannelChatResponse, Foreground), + (LeaveChannelChat, Foreground), (LeaveProject, Foreground), (LeaveRoom, Foreground), (OpenBufferById, Background), @@ -293,6 +301,7 @@ request_messages!( (InviteChannelMember, Ack), (JoinProject, JoinProjectResponse), (JoinRoom, JoinRoomResponse), + (JoinChannelChat, JoinChannelChatResponse), (LeaveRoom, Ack), (RejoinRoom, RejoinRoomResponse), (IncomingCall, Ack), @@ -313,6 +322,8 @@ request_messages!( (RespondToContactRequest, Ack), (RespondToChannelInvite, Ack), (SetChannelMemberAdmin, Ack), + (SendChannelMessage, SendChannelMessageResponse), + (GetChannelMessages, GetChannelMessagesResponse), (GetChannelMembers, GetChannelMembersResponse), (JoinChannel, JoinRoomResponse), (RemoveChannel, Ack), @@ -388,6 +399,7 @@ entity_messages!( entity_messages!( channel_id, + ChannelMessageSent, UpdateChannelBuffer, RemoveChannelBufferCollaborator, AddChannelBufferCollaborator, diff --git a/crates/theme/src/theme.rs b/crates/theme/src/theme.rs index a542249788..15d7e1a71f 100644 --- a/crates/theme/src/theme.rs +++ b/crates/theme/src/theme.rs @@ -49,6 +49,7 @@ pub struct Theme { pub copilot: Copilot, pub collab_panel: CollabPanel, pub project_panel: ProjectPanel, + pub chat_panel: ChatPanel, pub command_palette: CommandPalette, pub picker: Picker, pub editor: Editor, @@ -611,6 +612,17 @@ pub struct IconButton { pub button_width: f32, } +#[derive(Deserialize, Default, JsonSchema)] +pub struct ChatPanel { + #[serde(flatten)] + pub container: ContainerStyle, + pub channel_select: ChannelSelect, + pub input_editor: FieldEditor, + pub message: ChatMessage, + pub pending_message: ChatMessage, + pub sign_in_prompt: Interactive, +} + #[derive(Deserialize, Default, JsonSchema)] pub struct ChatMessage { #[serde(flatten)]