diff --git a/.cargo/config.toml b/.cargo/config.toml index 35049cbcb1..9da6b3be08 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,2 +1,6 @@ [alias] xtask = "run --package xtask --" + +[build] +# v0 mangling scheme provides more detailed backtraces around closures +rustflags = ["-C", "symbol-mangling-version=v0"] diff --git a/Cargo.lock b/Cargo.lock index 2f549c568d..c620cb2b88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,18 +79,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "0.7.20" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac" -dependencies = [ - "memchr", -] - -[[package]] -name = "aho-corasick" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6748e8def348ed4d14996fa801f4122cd763fff530258cdc03f64b25f89d3a5a" +checksum = "0f2135563fb5c609d2b2b87c1e8ce7bc41b0b45430fa9661f457981503dd5bf0" dependencies = [ "memchr", ] @@ -101,6 +92,7 @@ version = "0.1.0" dependencies = [ "anyhow", "chrono", + "client", "collections", "ctor", "editor", @@ -127,6 +119,7 @@ dependencies = [ "theme", "tiktoken-rs 0.4.5", "util", + "uuid 1.4.1", "workspace", ] @@ -1554,6 +1547,7 @@ dependencies = [ "settings", "theme", "theme_selector", + "time 0.3.27", "util", "vcs_menu", "workspace", @@ -2329,7 +2323,7 @@ checksum = "bbfc4744c1b8f2a09adc0e55242f60b1af195d88596bd8700be74418c056c555" name = "editor" version = "0.1.0" dependencies = [ - "aho-corasick 0.7.20", + "aho-corasick", "anyhow", "client", "clock", @@ -3077,7 +3071,7 @@ version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "759c97c1e17c55525b57192c06a267cda0ac5210b222d6b82189a2338fa1c13d" dependencies = [ - "aho-corasick 1.0.4", + "aho-corasick", "bstr", "fnv", "log", @@ -5459,7 +5453,7 @@ dependencies = [ name = "project" version = "0.1.0" dependencies = [ - "aho-corasick 0.7.20", + "aho-corasick", "anyhow", "async-trait", "backtrace", @@ -5969,7 +5963,7 @@ version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a" dependencies = [ - "aho-corasick 1.0.4", + "aho-corasick", "memchr", "regex-automata 0.3.6", "regex-syntax 0.7.4", @@ -5990,7 +5984,7 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69" dependencies = [ - "aho-corasick 1.0.4", + "aho-corasick", "memchr", "regex-syntax 0.7.4", ] diff --git a/assets/icons/logo_96.svg b/assets/icons/logo_96.svg new file mode 100644 index 0000000000..dc98bb8bc2 --- /dev/null +++ b/assets/icons/logo_96.svg @@ -0,0 +1,3 @@ + + + diff --git a/assets/icons/user_group_16.svg b/assets/icons/user_group_16.svg new file mode 100644 index 0000000000..aa99277646 --- /dev/null +++ b/assets/icons/user_group_16.svg @@ -0,0 +1,3 @@ + + + diff --git a/assets/keymaps/vim.json b/assets/keymaps/vim.json index 44dbf2533f..c824fc7589 100644 --- a/assets/keymaps/vim.json +++ b/assets/keymaps/vim.json @@ -455,6 +455,7 @@ "shift-d": "vim::VisualDelete", "shift-x": "vim::VisualDelete", "y": "vim::VisualYank", + "shift-y": "vim::VisualYank", "p": "vim::Paste", "shift-p": [ "vim::Paste", diff --git a/assets/settings/default.json b/assets/settings/default.json index 6739819e71..86def54d32 100644 --- a/assets/settings/default.json +++ b/assets/settings/default.json @@ -131,6 +131,14 @@ // Default width of the channels panel. "default_width": 240 }, + "chat_panel": { + // Whether to show the collaboration panel button in the status bar. + "button": true, + // Where to dock channels panel. Can be 'left' or 'right'. + "dock": "right", + // Default width of the channels panel. + "default_width": 240 + }, "assistant": { // Whether to show the assistant panel button in the status bar. "button": true, diff --git a/crates/ai/Cargo.toml b/crates/ai/Cargo.toml index d96e470d5c..8002b0d35d 100644 --- a/crates/ai/Cargo.toml +++ b/crates/ai/Cargo.toml @@ -9,6 +9,7 @@ path = "src/ai.rs" doctest = false [dependencies] +client = { path = "../client" } collections = { path = "../collections"} editor = { path = "../editor" } fs = { path = "../fs" } @@ -19,6 +20,7 @@ search = { path = "../search" } settings = { path = "../settings" } theme = { path = "../theme" } util = { path = "../util" } +uuid = { version = "1.1.2", features = ["v4"] } workspace = { path = "../workspace" } anyhow.workspace = true diff --git a/crates/ai/src/ai.rs b/crates/ai/src/ai.rs index 7d9b93b0a7..dfd9a523b4 100644 --- a/crates/ai/src/ai.rs +++ b/crates/ai/src/ai.rs @@ -61,6 +61,7 @@ struct SavedMessage { #[derive(Serialize, Deserialize)] struct SavedConversation { + id: Option, zed: String, version: String, text: String, diff --git a/crates/ai/src/assistant.rs b/crates/ai/src/assistant.rs index 7fa66f26fb..263382c03e 100644 --- a/crates/ai/src/assistant.rs +++ b/crates/ai/src/assistant.rs @@ -6,6 +6,7 @@ use crate::{ }; use anyhow::{anyhow, Result}; use chrono::{DateTime, Local}; +use client::{telemetry::AssistantKind, ClickhouseEvent, TelemetrySettings}; use collections::{hash_map, HashMap, HashSet, VecDeque}; use editor::{ display_map::{ @@ -48,6 +49,7 @@ use theme::{ AssistantStyle, }; use util::{paths::CONVERSATIONS_DIR, post_inc, ResultExt, TryFutureExt}; +use uuid::Uuid; use workspace::{ dock::{DockPosition, Panel}, searchable::Direction, @@ -296,6 +298,7 @@ impl AssistantPanel { self.include_conversation_in_next_inline_assist, self.inline_prompt_history.clone(), codegen.clone(), + self.workspace.clone(), cx, ); cx.focus_self(); @@ -724,6 +727,7 @@ impl AssistantPanel { self.api_key.clone(), self.languages.clone(), self.fs.clone(), + self.workspace.clone(), cx, ) }); @@ -1059,6 +1063,7 @@ impl AssistantPanel { } let fs = self.fs.clone(); + let workspace = self.workspace.clone(); let api_key = self.api_key.clone(); let languages = self.languages.clone(); cx.spawn(|this, mut cx| async move { @@ -1073,8 +1078,9 @@ impl AssistantPanel { if let Some(ix) = this.editor_index_for_path(&path, cx) { this.set_active_editor_index(Some(ix), cx); } else { - let editor = cx - .add_view(|cx| ConversationEditor::for_conversation(conversation, fs, cx)); + let editor = cx.add_view(|cx| { + ConversationEditor::for_conversation(conversation, fs, workspace, cx) + }); this.add_conversation(editor, cx); } })?; @@ -1348,6 +1354,7 @@ struct Summary { } struct Conversation { + id: Option, buffer: ModelHandle, message_anchors: Vec, messages_metadata: HashMap, @@ -1398,6 +1405,7 @@ impl Conversation { let model = settings.default_open_ai_model.clone(); let mut this = Self { + id: Some(Uuid::new_v4().to_string()), message_anchors: Default::default(), messages_metadata: Default::default(), next_message_id: Default::default(), @@ -1435,6 +1443,7 @@ impl Conversation { fn serialize(&self, cx: &AppContext) -> SavedConversation { SavedConversation { + id: self.id.clone(), zed: "conversation".into(), version: SavedConversation::VERSION.into(), text: self.buffer.read(cx).text(), @@ -1462,6 +1471,10 @@ impl Conversation { language_registry: Arc, cx: &mut ModelContext, ) -> Self { + let id = match saved_conversation.id { + Some(id) => Some(id), + None => Some(Uuid::new_v4().to_string()), + }; let model = saved_conversation.model; let markdown = language_registry.language_for_name("Markdown"); let mut message_anchors = Vec::new(); @@ -1491,6 +1504,7 @@ impl Conversation { }); let mut this = Self { + id, message_anchors, messages_metadata: saved_conversation.message_metadata, next_message_id, @@ -2108,6 +2122,7 @@ struct ScrollPosition { struct ConversationEditor { conversation: ModelHandle, fs: Arc, + workspace: WeakViewHandle, editor: ViewHandle, blocks: HashSet, scroll_position: Option, @@ -2119,15 +2134,17 @@ impl ConversationEditor { api_key: Rc>>, language_registry: Arc, fs: Arc, + workspace: WeakViewHandle, cx: &mut ViewContext, ) -> Self { let conversation = cx.add_model(|cx| Conversation::new(api_key, language_registry, cx)); - Self::for_conversation(conversation, fs, cx) + Self::for_conversation(conversation, fs, workspace, cx) } fn for_conversation( conversation: ModelHandle, fs: Arc, + workspace: WeakViewHandle, cx: &mut ViewContext, ) -> Self { let editor = cx.add_view(|cx| { @@ -2150,6 +2167,7 @@ impl ConversationEditor { blocks: Default::default(), scroll_position: None, fs, + workspace, _subscriptions, }; this.update_message_headers(cx); @@ -2157,6 +2175,13 @@ impl ConversationEditor { } fn assist(&mut self, _: &Assist, cx: &mut ViewContext) { + report_assistant_event( + self.workspace.clone(), + self.conversation.read(cx).id.clone(), + AssistantKind::Panel, + cx, + ); + let cursors = self.cursors(cx); let user_messages = self.conversation.update(cx, |conversation, cx| { @@ -2665,6 +2690,7 @@ enum InlineAssistantEvent { struct InlineAssistant { id: usize, prompt_editor: ViewHandle, + workspace: WeakViewHandle, confirmed: bool, has_focus: bool, include_conversation: bool, @@ -2780,6 +2806,7 @@ impl InlineAssistant { include_conversation: bool, prompt_history: VecDeque, codegen: ModelHandle, + workspace: WeakViewHandle, cx: &mut ViewContext, ) -> Self { let prompt_editor = cx.add_view(|cx| { @@ -2801,6 +2828,7 @@ impl InlineAssistant { Self { id, prompt_editor, + workspace, confirmed: false, has_focus: false, include_conversation, @@ -2859,6 +2887,8 @@ impl InlineAssistant { if self.confirmed { cx.emit(InlineAssistantEvent::Dismissed); } else { + report_assistant_event(self.workspace.clone(), None, AssistantKind::Inline, cx); + let prompt = self.prompt_editor.read(cx).text(cx); self.prompt_editor.update(cx, |editor, cx| { editor.set_read_only(true); @@ -3347,3 +3377,30 @@ mod tests { .collect() } } + +fn report_assistant_event( + workspace: WeakViewHandle, + conversation_id: Option, + assistant_kind: AssistantKind, + cx: &AppContext, +) { + let Some(workspace) = workspace.upgrade(cx) else { + return; + }; + + let client = workspace.read(cx).project().read(cx).client(); + let telemetry = client.telemetry(); + + let model = settings::get::(cx) + .default_open_ai_model + .clone(); + + let event = ClickhouseEvent::Assistant { + conversation_id, + kind: assistant_kind, + model: model.full_name(), + }; + let telemetry_settings = *settings::get::(cx); + + telemetry.report_clickhouse_event(event, telemetry_settings) +} diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index cc7445dbcc..e7899ab2d8 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -172,7 +172,7 @@ impl Room { cx.spawn(|this, mut cx| async move { connect.await?; - if !cx.read(|cx| settings::get::(cx).mute_on_join) { + if !cx.read(Self::mute_on_join) { this.update(&mut cx, |this, cx| this.share_microphone(cx)) .await?; } @@ -301,6 +301,10 @@ impl Room { }) } + pub fn mute_on_join(cx: &AppContext) -> bool { + settings::get::(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some() + } + fn from_join_response( response: proto::JoinRoomResponse, client: Arc, @@ -1124,7 +1128,7 @@ impl Room { self.live_kit .as_ref() .and_then(|live_kit| match &live_kit.microphone_track { - LocalTrack::None => Some(settings::get::(cx).mute_on_join), + LocalTrack::None => Some(Self::mute_on_join(cx)), LocalTrack::Pending { muted, .. } => Some(*muted), LocalTrack::Published { muted, .. } => Some(*muted), }) diff --git a/crates/channel/Cargo.toml b/crates/channel/Cargo.toml index c2191fdfa3..00e9135bc1 100644 --- a/crates/channel/Cargo.toml +++ b/crates/channel/Cargo.toml @@ -47,5 +47,6 @@ tempfile = "3" collections = { path = "../collections", features = ["test-support"] } gpui = { path = "../gpui", features = ["test-support"] } rpc = { path = "../rpc", features = ["test-support"] } +client = { path = "../client", features = ["test-support"] } settings = { path = "../settings", features = ["test-support"] } util = { path = "../util", features = ["test-support"] } diff --git a/crates/channel/src/channel.rs b/crates/channel/src/channel.rs index 15631b7dd3..37f1c0ce44 100644 --- a/crates/channel/src/channel.rs +++ b/crates/channel/src/channel.rs @@ -1,14 +1,18 @@ +mod channel_buffer; +mod channel_chat; mod channel_store; -pub mod channel_buffer; -use std::sync::Arc; +pub use channel_buffer::{ChannelBuffer, ChannelBufferEvent}; +pub use channel_chat::{ChannelChat, ChannelChatEvent, ChannelMessage, ChannelMessageId}; +pub use channel_store::{Channel, ChannelEvent, ChannelId, ChannelMembership, ChannelStore}; -pub use channel_store::*; use client::Client; +use std::sync::Arc; #[cfg(test)] mod channel_store_tests; pub fn init(client: &Arc) { channel_buffer::init(client); + channel_chat::init(client); } diff --git a/crates/channel/src/channel_buffer.rs b/crates/channel/src/channel_buffer.rs index e11282cf79..06f9093fb5 100644 --- a/crates/channel/src/channel_buffer.rs +++ b/crates/channel/src/channel_buffer.rs @@ -23,13 +23,13 @@ pub struct ChannelBuffer { subscription: Option, } -pub enum Event { +pub enum ChannelBufferEvent { CollaboratorsChanged, Disconnected, } impl Entity for ChannelBuffer { - type Event = Event; + type Event = ChannelBufferEvent; fn release(&mut self, _: &mut AppContext) { if self.connected { @@ -101,7 +101,7 @@ impl ChannelBuffer { } } self.collaborators = collaborators; - cx.emit(Event::CollaboratorsChanged); + cx.emit(ChannelBufferEvent::CollaboratorsChanged); cx.notify(); } @@ -141,7 +141,7 @@ impl ChannelBuffer { this.update(&mut cx, |this, cx| { this.collaborators.push(collaborator); - cx.emit(Event::CollaboratorsChanged); + cx.emit(ChannelBufferEvent::CollaboratorsChanged); cx.notify(); }); @@ -165,7 +165,7 @@ impl ChannelBuffer { true } }); - cx.emit(Event::CollaboratorsChanged); + cx.emit(ChannelBufferEvent::CollaboratorsChanged); cx.notify(); }); @@ -185,7 +185,7 @@ impl ChannelBuffer { break; } } - cx.emit(Event::CollaboratorsChanged); + cx.emit(ChannelBufferEvent::CollaboratorsChanged); cx.notify(); }); @@ -230,7 +230,7 @@ impl ChannelBuffer { if self.connected { self.connected = false; self.subscription.take(); - cx.emit(Event::Disconnected); + cx.emit(ChannelBufferEvent::Disconnected); cx.notify() } } diff --git a/crates/channel/src/channel_chat.rs b/crates/channel/src/channel_chat.rs new file mode 100644 index 0000000000..8e03a3b6fd --- /dev/null +++ b/crates/channel/src/channel_chat.rs @@ -0,0 +1,505 @@ +use crate::Channel; +use anyhow::{anyhow, Result}; +use client::{ + proto, + user::{User, UserStore}, + Client, Subscription, TypedEnvelope, +}; +use futures::lock::Mutex; +use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task}; +use rand::prelude::*; +use std::{collections::HashSet, mem, ops::Range, sync::Arc}; +use sum_tree::{Bias, SumTree}; +use time::OffsetDateTime; +use util::{post_inc, ResultExt as _, TryFutureExt}; + +pub struct ChannelChat { + channel: Arc, + messages: SumTree, + loaded_all_messages: bool, + next_pending_message_id: usize, + user_store: ModelHandle, + rpc: Arc, + outgoing_messages_lock: Arc>, + rng: StdRng, + _subscription: Subscription, +} + +#[derive(Clone, Debug)] +pub struct ChannelMessage { + pub id: ChannelMessageId, + pub body: String, + pub timestamp: OffsetDateTime, + pub sender: Arc, + pub nonce: u128, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum ChannelMessageId { + Saved(u64), + Pending(usize), +} + +#[derive(Clone, Debug, Default)] +pub struct ChannelMessageSummary { + max_id: ChannelMessageId, + count: usize, +} + +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] +struct Count(usize); + +#[derive(Clone, Debug, PartialEq)] +pub enum ChannelChatEvent { + MessagesUpdated { + old_range: Range, + new_count: usize, + }, +} + +pub fn init(client: &Arc) { + client.add_model_message_handler(ChannelChat::handle_message_sent); + client.add_model_message_handler(ChannelChat::handle_message_removed); +} + +impl Entity for ChannelChat { + type Event = ChannelChatEvent; + + fn release(&mut self, _: &mut AppContext) { + self.rpc + .send(proto::LeaveChannelChat { + channel_id: self.channel.id, + }) + .log_err(); + } +} + +impl ChannelChat { + pub async fn new( + channel: Arc, + user_store: ModelHandle, + client: Arc, + mut cx: AsyncAppContext, + ) -> Result> { + let channel_id = channel.id; + let subscription = client.subscribe_to_entity(channel_id).unwrap(); + + let response = client + .request(proto::JoinChannelChat { channel_id }) + .await?; + let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?; + let loaded_all_messages = response.done; + + Ok(cx.add_model(|cx| { + let mut this = Self { + channel, + user_store, + rpc: client, + outgoing_messages_lock: Default::default(), + messages: Default::default(), + loaded_all_messages, + next_pending_message_id: 0, + rng: StdRng::from_entropy(), + _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()), + }; + this.insert_messages(messages, cx); + this + })) + } + + pub fn channel(&self) -> &Arc { + &self.channel + } + + pub fn send_message( + &mut self, + body: String, + cx: &mut ModelContext, + ) -> Result>> { + if body.is_empty() { + Err(anyhow!("message body can't be empty"))?; + } + + let current_user = self + .user_store + .read(cx) + .current_user() + .ok_or_else(|| anyhow!("current_user is not present"))?; + + let channel_id = self.channel.id; + let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id)); + let nonce = self.rng.gen(); + self.insert_messages( + SumTree::from_item( + ChannelMessage { + id: pending_id, + body: body.clone(), + sender: current_user, + timestamp: OffsetDateTime::now_utc(), + nonce, + }, + &(), + ), + cx, + ); + let user_store = self.user_store.clone(); + let rpc = self.rpc.clone(); + let outgoing_messages_lock = self.outgoing_messages_lock.clone(); + Ok(cx.spawn(|this, mut cx| async move { + let outgoing_message_guard = outgoing_messages_lock.lock().await; + let request = rpc.request(proto::SendChannelMessage { + channel_id, + body, + nonce: Some(nonce.into()), + }); + let response = request.await?; + drop(outgoing_message_guard); + let message = ChannelMessage::from_proto( + response.message.ok_or_else(|| anyhow!("invalid message"))?, + &user_store, + &mut cx, + ) + .await?; + this.update(&mut cx, |this, cx| { + this.insert_messages(SumTree::from_item(message, &()), cx); + Ok(()) + }) + })) + } + + pub fn remove_message(&mut self, id: u64, cx: &mut ModelContext) -> Task> { + let response = self.rpc.request(proto::RemoveChannelMessage { + channel_id: self.channel.id, + message_id: id, + }); + cx.spawn(|this, mut cx| async move { + response.await?; + + this.update(&mut cx, |this, cx| { + this.message_removed(id, cx); + Ok(()) + }) + }) + } + + pub fn load_more_messages(&mut self, cx: &mut ModelContext) -> bool { + if !self.loaded_all_messages { + let rpc = self.rpc.clone(); + let user_store = self.user_store.clone(); + let channel_id = self.channel.id; + if let Some(before_message_id) = + self.messages.first().and_then(|message| match message.id { + ChannelMessageId::Saved(id) => Some(id), + ChannelMessageId::Pending(_) => None, + }) + { + cx.spawn(|this, mut cx| { + async move { + let response = rpc + .request(proto::GetChannelMessages { + channel_id, + before_message_id, + }) + .await?; + let loaded_all_messages = response.done; + let messages = + messages_from_proto(response.messages, &user_store, &mut cx).await?; + this.update(&mut cx, |this, cx| { + this.loaded_all_messages = loaded_all_messages; + this.insert_messages(messages, cx); + }); + anyhow::Ok(()) + } + .log_err() + }) + .detach(); + return true; + } + } + false + } + + pub fn rejoin(&mut self, cx: &mut ModelContext) { + let user_store = self.user_store.clone(); + let rpc = self.rpc.clone(); + let channel_id = self.channel.id; + cx.spawn(|this, mut cx| { + async move { + let response = rpc.request(proto::JoinChannelChat { channel_id }).await?; + let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?; + let loaded_all_messages = response.done; + + let pending_messages = this.update(&mut cx, |this, cx| { + if let Some((first_new_message, last_old_message)) = + messages.first().zip(this.messages.last()) + { + if first_new_message.id > last_old_message.id { + let old_messages = mem::take(&mut this.messages); + cx.emit(ChannelChatEvent::MessagesUpdated { + old_range: 0..old_messages.summary().count, + new_count: 0, + }); + this.loaded_all_messages = loaded_all_messages; + } + } + + this.insert_messages(messages, cx); + if loaded_all_messages { + this.loaded_all_messages = loaded_all_messages; + } + + this.pending_messages().cloned().collect::>() + }); + + for pending_message in pending_messages { + let request = rpc.request(proto::SendChannelMessage { + channel_id, + body: pending_message.body, + nonce: Some(pending_message.nonce.into()), + }); + let response = request.await?; + let message = ChannelMessage::from_proto( + response.message.ok_or_else(|| anyhow!("invalid message"))?, + &user_store, + &mut cx, + ) + .await?; + this.update(&mut cx, |this, cx| { + this.insert_messages(SumTree::from_item(message, &()), cx); + }); + } + + anyhow::Ok(()) + } + .log_err() + }) + .detach(); + } + + pub fn message_count(&self) -> usize { + self.messages.summary().count + } + + pub fn messages(&self) -> &SumTree { + &self.messages + } + + pub fn message(&self, ix: usize) -> &ChannelMessage { + let mut cursor = self.messages.cursor::(); + cursor.seek(&Count(ix), Bias::Right, &()); + cursor.item().unwrap() + } + + pub fn messages_in_range(&self, range: Range) -> impl Iterator { + let mut cursor = self.messages.cursor::(); + cursor.seek(&Count(range.start), Bias::Right, &()); + cursor.take(range.len()) + } + + pub fn pending_messages(&self) -> impl Iterator { + let mut cursor = self.messages.cursor::(); + cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &()); + cursor + } + + async fn handle_message_sent( + this: ModelHandle, + message: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + let user_store = this.read_with(&cx, |this, _| this.user_store.clone()); + let message = message + .payload + .message + .ok_or_else(|| anyhow!("empty message"))?; + + let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?; + this.update(&mut cx, |this, cx| { + this.insert_messages(SumTree::from_item(message, &()), cx) + }); + + Ok(()) + } + + async fn handle_message_removed( + this: ModelHandle, + message: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + this.update(&mut cx, |this, cx| { + this.message_removed(message.payload.message_id, cx) + }); + Ok(()) + } + + fn insert_messages(&mut self, messages: SumTree, cx: &mut ModelContext) { + if let Some((first_message, last_message)) = messages.first().zip(messages.last()) { + let nonces = messages + .cursor::<()>() + .map(|m| m.nonce) + .collect::>(); + + let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>(); + let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &()); + let start_ix = old_cursor.start().1 .0; + let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &()); + let removed_count = removed_messages.summary().count; + let new_count = messages.summary().count; + let end_ix = start_ix + removed_count; + + new_messages.append(messages, &()); + + let mut ranges = Vec::>::new(); + if new_messages.last().unwrap().is_pending() { + new_messages.append(old_cursor.suffix(&()), &()); + } else { + new_messages.append( + old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()), + &(), + ); + + while let Some(message) = old_cursor.item() { + let message_ix = old_cursor.start().1 .0; + if nonces.contains(&message.nonce) { + if ranges.last().map_or(false, |r| r.end == message_ix) { + ranges.last_mut().unwrap().end += 1; + } else { + ranges.push(message_ix..message_ix + 1); + } + } else { + new_messages.push(message.clone(), &()); + } + old_cursor.next(&()); + } + } + + drop(old_cursor); + self.messages = new_messages; + + for range in ranges.into_iter().rev() { + cx.emit(ChannelChatEvent::MessagesUpdated { + old_range: range, + new_count: 0, + }); + } + cx.emit(ChannelChatEvent::MessagesUpdated { + old_range: start_ix..end_ix, + new_count, + }); + cx.notify(); + } + } + + fn message_removed(&mut self, id: u64, cx: &mut ModelContext) { + let mut cursor = self.messages.cursor::(); + let mut messages = cursor.slice(&ChannelMessageId::Saved(id), Bias::Left, &()); + if let Some(item) = cursor.item() { + if item.id == ChannelMessageId::Saved(id) { + let ix = messages.summary().count; + cursor.next(&()); + messages.append(cursor.suffix(&()), &()); + drop(cursor); + self.messages = messages; + cx.emit(ChannelChatEvent::MessagesUpdated { + old_range: ix..ix + 1, + new_count: 0, + }); + } + } + } +} + +async fn messages_from_proto( + proto_messages: Vec, + user_store: &ModelHandle, + cx: &mut AsyncAppContext, +) -> Result> { + let unique_user_ids = proto_messages + .iter() + .map(|m| m.sender_id) + .collect::>() + .into_iter() + .collect(); + user_store + .update(cx, |user_store, cx| { + user_store.get_users(unique_user_ids, cx) + }) + .await?; + + let mut messages = Vec::with_capacity(proto_messages.len()); + for message in proto_messages { + messages.push(ChannelMessage::from_proto(message, user_store, cx).await?); + } + let mut result = SumTree::new(); + result.extend(messages, &()); + Ok(result) +} + +impl ChannelMessage { + pub async fn from_proto( + message: proto::ChannelMessage, + user_store: &ModelHandle, + cx: &mut AsyncAppContext, + ) -> Result { + let sender = user_store + .update(cx, |user_store, cx| { + user_store.get_user(message.sender_id, cx) + }) + .await?; + Ok(ChannelMessage { + id: ChannelMessageId::Saved(message.id), + body: message.body, + timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?, + sender, + nonce: message + .nonce + .ok_or_else(|| anyhow!("nonce is required"))? + .into(), + }) + } + + pub fn is_pending(&self) -> bool { + matches!(self.id, ChannelMessageId::Pending(_)) + } +} + +impl sum_tree::Item for ChannelMessage { + type Summary = ChannelMessageSummary; + + fn summary(&self) -> Self::Summary { + ChannelMessageSummary { + max_id: self.id, + count: 1, + } + } +} + +impl Default for ChannelMessageId { + fn default() -> Self { + Self::Saved(0) + } +} + +impl sum_tree::Summary for ChannelMessageSummary { + type Context = (); + + fn add_summary(&mut self, summary: &Self, _: &()) { + self.max_id = summary.max_id; + self.count += summary.count; + } +} + +impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId { + fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) { + debug_assert!(summary.max_id > *self); + *self = summary.max_id; + } +} + +impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count { + fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) { + self.0 += summary.count; + } +} diff --git a/crates/channel/src/channel_store.rs b/crates/channel/src/channel_store.rs index a4c8da6df4..e61e520b47 100644 --- a/crates/channel/src/channel_store.rs +++ b/crates/channel/src/channel_store.rs @@ -1,4 +1,4 @@ -use crate::channel_buffer::ChannelBuffer; +use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat}; use anyhow::{anyhow, Result}; use client::{Client, Subscription, User, UserId, UserStore}; use collections::{hash_map, HashMap, HashSet}; @@ -20,7 +20,8 @@ pub struct ChannelStore { channels_with_admin_privileges: HashSet, outgoing_invites: HashSet<(ChannelId, UserId)>, update_channels_tx: mpsc::UnboundedSender, - opened_buffers: HashMap, + opened_buffers: HashMap>, + opened_chats: HashMap>, client: Arc, user_store: ModelHandle, _rpc_subscription: Subscription, @@ -50,15 +51,9 @@ impl Entity for ChannelStore { type Event = ChannelEvent; } -pub enum ChannelMemberStatus { - Invited, - Member, - NotMember, -} - -enum OpenedChannelBuffer { - Open(WeakModelHandle), - Loading(Shared, Arc>>>), +enum OpenedModelHandle { + Open(WeakModelHandle), + Loading(Shared, Arc>>>), } impl ChannelStore { @@ -94,6 +89,7 @@ impl ChannelStore { channels_with_admin_privileges: Default::default(), outgoing_invites: Default::default(), opened_buffers: Default::default(), + opened_chats: Default::default(), update_channels_tx, client, user_store, @@ -115,6 +111,10 @@ impl ChannelStore { } } + pub fn client(&self) -> Arc { + self.client.clone() + } + pub fn has_children(&self, channel_id: ChannelId) -> bool { self.channel_paths.iter().any(|path| { if let Some(ix) = path.iter().position(|id| *id == channel_id) { @@ -129,6 +129,12 @@ impl ChannelStore { self.channel_paths.len() } + pub fn index_of_channel(&self, channel_id: ChannelId) -> Option { + self.channel_paths + .iter() + .position(|path| path.ends_with(&[channel_id])) + } + pub fn channels(&self) -> impl '_ + Iterator)> { self.channel_paths.iter().map(move |path| { let id = path.last().unwrap(); @@ -154,7 +160,7 @@ impl ChannelStore { pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool { if let Some(buffer) = self.opened_buffers.get(&channel_id) { - if let OpenedChannelBuffer::Open(buffer) = buffer { + if let OpenedModelHandle::Open(buffer) = buffer { return buffer.upgrade(cx).is_some(); } } @@ -166,24 +172,62 @@ impl ChannelStore { channel_id: ChannelId, cx: &mut ModelContext, ) -> Task>> { - // Make sure that a given channel buffer is only opened once per - // app instance, even if this method is called multiple times - // with the same channel id while the first task is still running. + let client = self.client.clone(); + self.open_channel_resource( + channel_id, + |this| &mut this.opened_buffers, + |channel, cx| ChannelBuffer::new(channel, client, cx), + cx, + ) + } + + pub fn open_channel_chat( + &mut self, + channel_id: ChannelId, + cx: &mut ModelContext, + ) -> Task>> { + let client = self.client.clone(); + let user_store = self.user_store.clone(); + self.open_channel_resource( + channel_id, + |this| &mut this.opened_chats, + |channel, cx| ChannelChat::new(channel, user_store, client, cx), + cx, + ) + } + + /// Asynchronously open a given resource associated with a channel. + /// + /// Make sure that the resource is only opened once, even if this method + /// is called multiple times with the same channel id while the first task + /// is still running. + fn open_channel_resource( + &mut self, + channel_id: ChannelId, + get_map: fn(&mut Self) -> &mut HashMap>, + load: F, + cx: &mut ModelContext, + ) -> Task>> + where + F: 'static + FnOnce(Arc, AsyncAppContext) -> Fut, + Fut: Future>>, + { let task = loop { - match self.opened_buffers.entry(channel_id) { + match get_map(self).entry(channel_id) { hash_map::Entry::Occupied(e) => match e.get() { - OpenedChannelBuffer::Open(buffer) => { - if let Some(buffer) = buffer.upgrade(cx) { - break Task::ready(Ok(buffer)).shared(); + OpenedModelHandle::Open(model) => { + if let Some(model) = model.upgrade(cx) { + break Task::ready(Ok(model)).shared(); } else { - self.opened_buffers.remove(&channel_id); + get_map(self).remove(&channel_id); continue; } } - OpenedChannelBuffer::Loading(task) => break task.clone(), + OpenedModelHandle::Loading(task) => { + break task.clone(); + } }, hash_map::Entry::Vacant(e) => { - let client = self.client.clone(); let task = cx .spawn(|this, cx| async move { let channel = this.read_with(&cx, |this, _| { @@ -192,30 +236,24 @@ impl ChannelStore { }) })?; - ChannelBuffer::new(channel, client, cx) - .await - .map_err(Arc::new) + load(channel, cx).await.map_err(Arc::new) }) .shared(); - e.insert(OpenedChannelBuffer::Loading(task.clone())); + + e.insert(OpenedModelHandle::Loading(task.clone())); cx.spawn({ let task = task.clone(); |this, mut cx| async move { let result = task.await; - this.update(&mut cx, |this, cx| match result { - Ok(buffer) => { - cx.observe_release(&buffer, move |this, _, _| { - this.opened_buffers.remove(&channel_id); - }) - .detach(); - this.opened_buffers.insert( + this.update(&mut cx, |this, _| match result { + Ok(model) => { + get_map(this).insert( channel_id, - OpenedChannelBuffer::Open(buffer.downgrade()), + OpenedModelHandle::Open(model.downgrade()), ); } - Err(error) => { - log::error!("failed to open channel buffer {error:?}"); - this.opened_buffers.remove(&channel_id); + Err(_) => { + get_map(this).remove(&channel_id); } }); } @@ -494,9 +532,19 @@ impl ChannelStore { fn handle_connect(&mut self, cx: &mut ModelContext) -> Task> { self.disconnect_channel_buffers_task.take(); + for chat in self.opened_chats.values() { + if let OpenedModelHandle::Open(chat) = chat { + if let Some(chat) = chat.upgrade(cx) { + chat.update(cx, |chat, cx| { + chat.rejoin(cx); + }); + } + } + } + let mut buffer_versions = Vec::new(); for buffer in self.opened_buffers.values() { - if let OpenedChannelBuffer::Open(buffer) = buffer { + if let OpenedModelHandle::Open(buffer) = buffer { if let Some(buffer) = buffer.upgrade(cx) { let channel_buffer = buffer.read(cx); let buffer = channel_buffer.buffer().read(cx); @@ -522,7 +570,7 @@ impl ChannelStore { this.update(&mut cx, |this, cx| { this.opened_buffers.retain(|_, buffer| match buffer { - OpenedChannelBuffer::Open(channel_buffer) => { + OpenedModelHandle::Open(channel_buffer) => { let Some(channel_buffer) = channel_buffer.upgrade(cx) else { return false; }; @@ -583,7 +631,7 @@ impl ChannelStore { false }) } - OpenedChannelBuffer::Loading(_) => true, + OpenedModelHandle::Loading(_) => true, }); }); anyhow::Ok(()) @@ -605,7 +653,7 @@ impl ChannelStore { if let Some(this) = this.upgrade(&cx) { this.update(&mut cx, |this, cx| { for (_, buffer) in this.opened_buffers.drain() { - if let OpenedChannelBuffer::Open(buffer) = buffer { + if let OpenedModelHandle::Open(buffer) = buffer { if let Some(buffer) = buffer.upgrade(cx) { buffer.update(cx, |buffer, cx| buffer.disconnect(cx)); } @@ -654,7 +702,7 @@ impl ChannelStore { for channel_id in &payload.remove_channels { let channel_id = *channel_id; - if let Some(OpenedChannelBuffer::Open(buffer)) = + if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.remove(&channel_id) { if let Some(buffer) = buffer.upgrade(cx) { diff --git a/crates/channel/src/channel_store_tests.rs b/crates/channel/src/channel_store_tests.rs index 18894b1f47..22174f161b 100644 --- a/crates/channel/src/channel_store_tests.rs +++ b/crates/channel/src/channel_store_tests.rs @@ -1,16 +1,15 @@ +use crate::channel_chat::ChannelChatEvent; + use super::*; -use client::{Client, UserStore}; -use gpui::{AppContext, ModelHandle}; +use client::{test::FakeServer, Client, UserStore}; +use gpui::{AppContext, ModelHandle, TestAppContext}; use rpc::proto; +use settings::SettingsStore; use util::http::FakeHttpClient; #[gpui::test] fn test_update_channels(cx: &mut AppContext) { - let http = FakeHttpClient::with_404_response(); - let client = Client::new(http.clone(), cx); - let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx)); - - let channel_store = cx.add_model(|cx| ChannelStore::new(client, user_store, cx)); + let channel_store = init_test(cx); update_channels( &channel_store, @@ -78,11 +77,7 @@ fn test_update_channels(cx: &mut AppContext) { #[gpui::test] fn test_dangling_channel_paths(cx: &mut AppContext) { - let http = FakeHttpClient::with_404_response(); - let client = Client::new(http.clone(), cx); - let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx)); - - let channel_store = cx.add_model(|cx| ChannelStore::new(client, user_store, cx)); + let channel_store = init_test(cx); update_channels( &channel_store, @@ -137,6 +132,208 @@ fn test_dangling_channel_paths(cx: &mut AppContext) { assert_channels(&channel_store, &[(0, "a".to_string(), true)], cx); } +#[gpui::test] +async fn test_channel_messages(cx: &mut TestAppContext) { + let user_id = 5; + let channel_id = 5; + let channel_store = cx.update(init_test); + let client = channel_store.read_with(cx, |s, _| s.client()); + let server = FakeServer::for_client(user_id, &client, cx).await; + + // Get the available channels. + server.send(proto::UpdateChannels { + channels: vec![proto::Channel { + id: channel_id, + name: "the-channel".to_string(), + parent_id: None, + }], + ..Default::default() + }); + cx.foreground().run_until_parked(); + cx.read(|cx| { + assert_channels(&channel_store, &[(0, "the-channel".to_string(), false)], cx); + }); + + let get_users = server.receive::().await.unwrap(); + assert_eq!(get_users.payload.user_ids, vec![5]); + server.respond( + get_users.receipt(), + proto::UsersResponse { + users: vec![proto::User { + id: 5, + github_login: "nathansobo".into(), + avatar_url: "http://avatar.com/nathansobo".into(), + }], + }, + ); + + // Join a channel and populate its existing messages. + let channel = channel_store.update(cx, |store, cx| { + let channel_id = store.channels().next().unwrap().1.id; + store.open_channel_chat(channel_id, cx) + }); + let join_channel = server.receive::().await.unwrap(); + server.respond( + join_channel.receipt(), + proto::JoinChannelChatResponse { + messages: vec![ + proto::ChannelMessage { + id: 10, + body: "a".into(), + timestamp: 1000, + sender_id: 5, + nonce: Some(1.into()), + }, + proto::ChannelMessage { + id: 11, + body: "b".into(), + timestamp: 1001, + sender_id: 6, + nonce: Some(2.into()), + }, + ], + done: false, + }, + ); + + cx.foreground().start_waiting(); + + // Client requests all users for the received messages + let mut get_users = server.receive::().await.unwrap(); + get_users.payload.user_ids.sort(); + assert_eq!(get_users.payload.user_ids, vec![6]); + server.respond( + get_users.receipt(), + proto::UsersResponse { + users: vec![proto::User { + id: 6, + github_login: "maxbrunsfeld".into(), + avatar_url: "http://avatar.com/maxbrunsfeld".into(), + }], + }, + ); + + let channel = channel.await.unwrap(); + channel.read_with(cx, |channel, _| { + assert_eq!( + channel + .messages_in_range(0..2) + .map(|message| (message.sender.github_login.clone(), message.body.clone())) + .collect::>(), + &[ + ("nathansobo".into(), "a".into()), + ("maxbrunsfeld".into(), "b".into()) + ] + ); + }); + + // Receive a new message. + server.send(proto::ChannelMessageSent { + channel_id, + message: Some(proto::ChannelMessage { + id: 12, + body: "c".into(), + timestamp: 1002, + sender_id: 7, + nonce: Some(3.into()), + }), + }); + + // Client requests user for message since they haven't seen them yet + let get_users = server.receive::().await.unwrap(); + assert_eq!(get_users.payload.user_ids, vec![7]); + server.respond( + get_users.receipt(), + proto::UsersResponse { + users: vec![proto::User { + id: 7, + github_login: "as-cii".into(), + avatar_url: "http://avatar.com/as-cii".into(), + }], + }, + ); + + assert_eq!( + channel.next_event(cx).await, + ChannelChatEvent::MessagesUpdated { + old_range: 2..2, + new_count: 1, + } + ); + channel.read_with(cx, |channel, _| { + assert_eq!( + channel + .messages_in_range(2..3) + .map(|message| (message.sender.github_login.clone(), message.body.clone())) + .collect::>(), + &[("as-cii".into(), "c".into())] + ) + }); + + // Scroll up to view older messages. + channel.update(cx, |channel, cx| { + assert!(channel.load_more_messages(cx)); + }); + let get_messages = server.receive::().await.unwrap(); + assert_eq!(get_messages.payload.channel_id, 5); + assert_eq!(get_messages.payload.before_message_id, 10); + server.respond( + get_messages.receipt(), + proto::GetChannelMessagesResponse { + done: true, + messages: vec![ + proto::ChannelMessage { + id: 8, + body: "y".into(), + timestamp: 998, + sender_id: 5, + nonce: Some(4.into()), + }, + proto::ChannelMessage { + id: 9, + body: "z".into(), + timestamp: 999, + sender_id: 6, + nonce: Some(5.into()), + }, + ], + }, + ); + + assert_eq!( + channel.next_event(cx).await, + ChannelChatEvent::MessagesUpdated { + old_range: 0..0, + new_count: 2, + } + ); + channel.read_with(cx, |channel, _| { + assert_eq!( + channel + .messages_in_range(0..2) + .map(|message| (message.sender.github_login.clone(), message.body.clone())) + .collect::>(), + &[ + ("nathansobo".into(), "y".into()), + ("maxbrunsfeld".into(), "z".into()) + ] + ); + }); +} + +fn init_test(cx: &mut AppContext) -> ModelHandle { + let http = FakeHttpClient::with_404_response(); + let client = Client::new(http.clone(), cx); + let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx)); + + cx.foreground().forbid_parking(); + cx.set_global(SettingsStore::test(cx)); + crate::init(&client); + client::init(&client, cx); + + cx.add_model(|cx| ChannelStore::new(client, user_store, cx)) +} + fn update_channels( channel_store: &ModelHandle, message: proto::UpdateChannels, diff --git a/crates/client/src/telemetry.rs b/crates/client/src/telemetry.rs index f8642dd7fa..1596e6d850 100644 --- a/crates/client/src/telemetry.rs +++ b/crates/client/src/telemetry.rs @@ -56,6 +56,13 @@ struct ClickhouseEventWrapper { event: ClickhouseEvent, } +#[derive(Serialize, Debug)] +#[serde(rename_all = "snake_case")] +pub enum AssistantKind { + Panel, + Inline, +} + #[derive(Serialize, Debug)] #[serde(tag = "type")] pub enum ClickhouseEvent { @@ -76,6 +83,11 @@ pub enum ClickhouseEvent { room_id: Option, channel_id: Option, }, + Assistant { + conversation_id: Option, + kind: AssistantKind, + model: &'static str, + }, } #[cfg(debug_assertions)] diff --git a/crates/client/src/test.rs b/crates/client/src/test.rs index 00e7cd1508..38cd12f21c 100644 --- a/crates/client/src/test.rs +++ b/crates/client/src/test.rs @@ -170,8 +170,7 @@ impl FakeServer { staff: false, flags: Default::default(), }, - ) - .await; + ); continue; } @@ -182,11 +181,7 @@ impl FakeServer { } } - pub async fn respond( - &self, - receipt: Receipt, - response: T::Response, - ) { + pub fn respond(&self, receipt: Receipt, response: T::Response) { self.peer.respond(receipt, response).unwrap() } diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index 80477dcb3c..ab12039b10 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -192,6 +192,26 @@ CREATE TABLE "channels" ( "created_at" TIMESTAMP NOT NULL DEFAULT now ); +CREATE TABLE IF NOT EXISTS "channel_chat_participants" ( + "id" INTEGER PRIMARY KEY AUTOINCREMENT, + "user_id" INTEGER NOT NULL REFERENCES users (id), + "channel_id" INTEGER NOT NULL REFERENCES channels (id) ON DELETE CASCADE, + "connection_id" INTEGER NOT NULL, + "connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE +); +CREATE INDEX "index_channel_chat_participants_on_channel_id" ON "channel_chat_participants" ("channel_id"); + +CREATE TABLE IF NOT EXISTS "channel_messages" ( + "id" INTEGER PRIMARY KEY AUTOINCREMENT, + "channel_id" INTEGER NOT NULL REFERENCES channels (id) ON DELETE CASCADE, + "sender_id" INTEGER NOT NULL REFERENCES users (id), + "body" TEXT NOT NULL, + "sent_at" TIMESTAMP, + "nonce" BLOB NOT NULL +); +CREATE INDEX "index_channel_messages_on_channel_id" ON "channel_messages" ("channel_id"); +CREATE UNIQUE INDEX "index_channel_messages_on_nonce" ON "channel_messages" ("nonce"); + CREATE TABLE "channel_paths" ( "id_path" TEXT NOT NULL PRIMARY KEY, "channel_id" INTEGER NOT NULL REFERENCES channels (id) ON DELETE CASCADE diff --git a/crates/collab/migrations/20230907114200_add_channel_messages.sql b/crates/collab/migrations/20230907114200_add_channel_messages.sql new file mode 100644 index 0000000000..abe7753ca6 --- /dev/null +++ b/crates/collab/migrations/20230907114200_add_channel_messages.sql @@ -0,0 +1,19 @@ +CREATE TABLE IF NOT EXISTS "channel_messages" ( + "id" SERIAL PRIMARY KEY, + "channel_id" INTEGER NOT NULL REFERENCES channels (id) ON DELETE CASCADE, + "sender_id" INTEGER NOT NULL REFERENCES users (id), + "body" TEXT NOT NULL, + "sent_at" TIMESTAMP, + "nonce" UUID NOT NULL +); +CREATE INDEX "index_channel_messages_on_channel_id" ON "channel_messages" ("channel_id"); +CREATE UNIQUE INDEX "index_channel_messages_on_nonce" ON "channel_messages" ("nonce"); + +CREATE TABLE IF NOT EXISTS "channel_chat_participants" ( + "id" SERIAL PRIMARY KEY, + "user_id" INTEGER NOT NULL REFERENCES users (id), + "channel_id" INTEGER NOT NULL REFERENCES channels (id) ON DELETE CASCADE, + "connection_id" INTEGER NOT NULL, + "connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE +); +CREATE INDEX "index_channel_chat_participants_on_channel_id" ON "channel_chat_participants" ("channel_id"); diff --git a/crates/collab/src/db/ids.rs b/crates/collab/src/db/ids.rs index b33ea57183..865a39fd71 100644 --- a/crates/collab/src/db/ids.rs +++ b/crates/collab/src/db/ids.rs @@ -112,8 +112,10 @@ fn value_to_integer(v: Value) -> Result { id_type!(BufferId); id_type!(AccessTokenId); +id_type!(ChannelChatParticipantId); id_type!(ChannelId); id_type!(ChannelMemberId); +id_type!(MessageId); id_type!(ContactId); id_type!(FollowerId); id_type!(RoomId); diff --git a/crates/collab/src/db/queries.rs b/crates/collab/src/db/queries.rs index d132596438..80bd8704b2 100644 --- a/crates/collab/src/db/queries.rs +++ b/crates/collab/src/db/queries.rs @@ -4,6 +4,7 @@ pub mod access_tokens; pub mod buffers; pub mod channels; pub mod contacts; +pub mod messages; pub mod projects; pub mod rooms; pub mod servers; diff --git a/crates/collab/src/db/queries/buffers.rs b/crates/collab/src/db/queries/buffers.rs index 00de201403..62ead11932 100644 --- a/crates/collab/src/db/queries/buffers.rs +++ b/crates/collab/src/db/queries/buffers.rs @@ -249,6 +249,29 @@ impl Database { .await } + pub async fn channel_buffer_connection_lost( + &self, + connection: ConnectionId, + tx: &DatabaseTransaction, + ) -> Result<()> { + channel_buffer_collaborator::Entity::update_many() + .filter( + Condition::all() + .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32)) + .add( + channel_buffer_collaborator::Column::ConnectionServerId + .eq(connection.owner_id as i32), + ), + ) + .set(channel_buffer_collaborator::ActiveModel { + connection_lost: ActiveValue::set(true), + ..Default::default() + }) + .exec(&*tx) + .await?; + Ok(()) + } + pub async fn leave_channel_buffers( &self, connection: ConnectionId, diff --git a/crates/collab/src/db/queries/messages.rs b/crates/collab/src/db/queries/messages.rs new file mode 100644 index 0000000000..0b88df6716 --- /dev/null +++ b/crates/collab/src/db/queries/messages.rs @@ -0,0 +1,214 @@ +use super::*; +use time::OffsetDateTime; + +impl Database { + pub async fn join_channel_chat( + &self, + channel_id: ChannelId, + connection_id: ConnectionId, + user_id: UserId, + ) -> Result<()> { + self.transaction(|tx| async move { + self.check_user_is_channel_member(channel_id, user_id, &*tx) + .await?; + channel_chat_participant::ActiveModel { + id: ActiveValue::NotSet, + channel_id: ActiveValue::Set(channel_id), + user_id: ActiveValue::Set(user_id), + connection_id: ActiveValue::Set(connection_id.id as i32), + connection_server_id: ActiveValue::Set(ServerId(connection_id.owner_id as i32)), + } + .insert(&*tx) + .await?; + Ok(()) + }) + .await + } + + pub async fn channel_chat_connection_lost( + &self, + connection_id: ConnectionId, + tx: &DatabaseTransaction, + ) -> Result<()> { + channel_chat_participant::Entity::delete_many() + .filter( + Condition::all() + .add( + channel_chat_participant::Column::ConnectionServerId + .eq(connection_id.owner_id), + ) + .add(channel_chat_participant::Column::ConnectionId.eq(connection_id.id)), + ) + .exec(tx) + .await?; + Ok(()) + } + + pub async fn leave_channel_chat( + &self, + channel_id: ChannelId, + connection_id: ConnectionId, + _user_id: UserId, + ) -> Result<()> { + self.transaction(|tx| async move { + channel_chat_participant::Entity::delete_many() + .filter( + Condition::all() + .add( + channel_chat_participant::Column::ConnectionServerId + .eq(connection_id.owner_id), + ) + .add(channel_chat_participant::Column::ConnectionId.eq(connection_id.id)) + .add(channel_chat_participant::Column::ChannelId.eq(channel_id)), + ) + .exec(&*tx) + .await?; + + Ok(()) + }) + .await + } + + pub async fn get_channel_messages( + &self, + channel_id: ChannelId, + user_id: UserId, + count: usize, + before_message_id: Option, + ) -> Result> { + self.transaction(|tx| async move { + self.check_user_is_channel_member(channel_id, user_id, &*tx) + .await?; + + let mut condition = + Condition::all().add(channel_message::Column::ChannelId.eq(channel_id)); + + if let Some(before_message_id) = before_message_id { + condition = condition.add(channel_message::Column::Id.lt(before_message_id)); + } + + let mut rows = channel_message::Entity::find() + .filter(condition) + .limit(count as u64) + .stream(&*tx) + .await?; + + let mut messages = Vec::new(); + while let Some(row) = rows.next().await { + let row = row?; + let nonce = row.nonce.as_u64_pair(); + messages.push(proto::ChannelMessage { + id: row.id.to_proto(), + sender_id: row.sender_id.to_proto(), + body: row.body, + timestamp: row.sent_at.assume_utc().unix_timestamp() as u64, + nonce: Some(proto::Nonce { + upper_half: nonce.0, + lower_half: nonce.1, + }), + }); + } + + Ok(messages) + }) + .await + } + + pub async fn create_channel_message( + &self, + channel_id: ChannelId, + user_id: UserId, + body: &str, + timestamp: OffsetDateTime, + nonce: u128, + ) -> Result<(MessageId, Vec)> { + self.transaction(|tx| async move { + let mut rows = channel_chat_participant::Entity::find() + .filter(channel_chat_participant::Column::ChannelId.eq(channel_id)) + .stream(&*tx) + .await?; + + let mut is_participant = false; + let mut participant_connection_ids = Vec::new(); + while let Some(row) = rows.next().await { + let row = row?; + if row.user_id == user_id { + is_participant = true; + } + participant_connection_ids.push(row.connection()); + } + drop(rows); + + if !is_participant { + Err(anyhow!("not a chat participant"))?; + } + + let timestamp = timestamp.to_offset(time::UtcOffset::UTC); + let timestamp = time::PrimitiveDateTime::new(timestamp.date(), timestamp.time()); + + let message = channel_message::Entity::insert(channel_message::ActiveModel { + channel_id: ActiveValue::Set(channel_id), + sender_id: ActiveValue::Set(user_id), + body: ActiveValue::Set(body.to_string()), + sent_at: ActiveValue::Set(timestamp), + nonce: ActiveValue::Set(Uuid::from_u128(nonce)), + id: ActiveValue::NotSet, + }) + .on_conflict( + OnConflict::column(channel_message::Column::Nonce) + .update_column(channel_message::Column::Nonce) + .to_owned(), + ) + .exec(&*tx) + .await?; + + #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)] + enum QueryConnectionId { + ConnectionId, + } + + Ok((message.last_insert_id, participant_connection_ids)) + }) + .await + } + + pub async fn remove_channel_message( + &self, + channel_id: ChannelId, + message_id: MessageId, + user_id: UserId, + ) -> Result> { + self.transaction(|tx| async move { + let mut rows = channel_chat_participant::Entity::find() + .filter(channel_chat_participant::Column::ChannelId.eq(channel_id)) + .stream(&*tx) + .await?; + + let mut is_participant = false; + let mut participant_connection_ids = Vec::new(); + while let Some(row) = rows.next().await { + let row = row?; + if row.user_id == user_id { + is_participant = true; + } + participant_connection_ids.push(row.connection()); + } + drop(rows); + + if !is_participant { + Err(anyhow!("not a chat participant"))?; + } + + let result = channel_message::Entity::delete_by_id(message_id) + .filter(channel_message::Column::SenderId.eq(user_id)) + .exec(&*tx) + .await?; + if result.rows_affected == 0 { + Err(anyhow!("no such message"))?; + } + + Ok(participant_connection_ids) + }) + .await + } +} diff --git a/crates/collab/src/db/queries/rooms.rs b/crates/collab/src/db/queries/rooms.rs index e348b50bee..fb81fef176 100644 --- a/crates/collab/src/db/queries/rooms.rs +++ b/crates/collab/src/db/queries/rooms.rs @@ -890,54 +890,43 @@ impl Database { pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> { self.transaction(|tx| async move { - let participant = room_participant::Entity::find() - .filter( - Condition::all() - .add( - room_participant::Column::AnsweringConnectionId - .eq(connection.id as i32), - ) - .add( - room_participant::Column::AnsweringConnectionServerId - .eq(connection.owner_id as i32), - ), - ) - .one(&*tx) + self.room_connection_lost(connection, &*tx).await?; + self.channel_buffer_connection_lost(connection, &*tx) .await?; - - if let Some(participant) = participant { - room_participant::Entity::update(room_participant::ActiveModel { - answering_connection_lost: ActiveValue::set(true), - ..participant.into_active_model() - }) - .exec(&*tx) - .await?; - } - - channel_buffer_collaborator::Entity::update_many() - .filter( - Condition::all() - .add( - channel_buffer_collaborator::Column::ConnectionId - .eq(connection.id as i32), - ) - .add( - channel_buffer_collaborator::Column::ConnectionServerId - .eq(connection.owner_id as i32), - ), - ) - .set(channel_buffer_collaborator::ActiveModel { - connection_lost: ActiveValue::set(true), - ..Default::default() - }) - .exec(&*tx) - .await?; - + self.channel_chat_connection_lost(connection, &*tx).await?; Ok(()) }) .await } + pub async fn room_connection_lost( + &self, + connection: ConnectionId, + tx: &DatabaseTransaction, + ) -> Result<()> { + let participant = room_participant::Entity::find() + .filter( + Condition::all() + .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32)) + .add( + room_participant::Column::AnsweringConnectionServerId + .eq(connection.owner_id as i32), + ), + ) + .one(&*tx) + .await?; + + if let Some(participant) = participant { + room_participant::Entity::update(room_participant::ActiveModel { + answering_connection_lost: ActiveValue::set(true), + ..participant.into_active_model() + }) + .exec(&*tx) + .await?; + } + Ok(()) + } + fn build_incoming_call( room: &proto::Room, called_user_id: UserId, diff --git a/crates/collab/src/db/tables.rs b/crates/collab/src/db/tables.rs index 1765cee065..81200df3e7 100644 --- a/crates/collab/src/db/tables.rs +++ b/crates/collab/src/db/tables.rs @@ -4,7 +4,9 @@ pub mod buffer_operation; pub mod buffer_snapshot; pub mod channel; pub mod channel_buffer_collaborator; +pub mod channel_chat_participant; pub mod channel_member; +pub mod channel_message; pub mod channel_path; pub mod contact; pub mod feature_flag; diff --git a/crates/collab/src/db/tables/channel.rs b/crates/collab/src/db/tables/channel.rs index 05895ede4c..54f12defc1 100644 --- a/crates/collab/src/db/tables/channel.rs +++ b/crates/collab/src/db/tables/channel.rs @@ -21,6 +21,8 @@ pub enum Relation { Member, #[sea_orm(has_many = "super::channel_buffer_collaborator::Entity")] BufferCollaborators, + #[sea_orm(has_many = "super::channel_chat_participant::Entity")] + ChatParticipants, } impl Related for Entity { @@ -46,3 +48,9 @@ impl Related for Entity { Relation::BufferCollaborators.def() } } + +impl Related for Entity { + fn to() -> RelationDef { + Relation::ChatParticipants.def() + } +} diff --git a/crates/collab/src/db/tables/channel_chat_participant.rs b/crates/collab/src/db/tables/channel_chat_participant.rs new file mode 100644 index 0000000000..f3ef36c289 --- /dev/null +++ b/crates/collab/src/db/tables/channel_chat_participant.rs @@ -0,0 +1,41 @@ +use crate::db::{ChannelChatParticipantId, ChannelId, ServerId, UserId}; +use rpc::ConnectionId; +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] +#[sea_orm(table_name = "channel_chat_participants")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: ChannelChatParticipantId, + pub channel_id: ChannelId, + pub user_id: UserId, + pub connection_id: i32, + pub connection_server_id: ServerId, +} + +impl Model { + pub fn connection(&self) -> ConnectionId { + ConnectionId { + owner_id: self.connection_server_id.0 as u32, + id: self.connection_id as u32, + } + } +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::channel::Entity", + from = "Column::ChannelId", + to = "super::channel::Column::Id" + )] + Channel, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Channel.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/collab/src/db/tables/channel_message.rs b/crates/collab/src/db/tables/channel_message.rs new file mode 100644 index 0000000000..ff49c63ba7 --- /dev/null +++ b/crates/collab/src/db/tables/channel_message.rs @@ -0,0 +1,45 @@ +use crate::db::{ChannelId, MessageId, UserId}; +use sea_orm::entity::prelude::*; +use time::PrimitiveDateTime; + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] +#[sea_orm(table_name = "channel_messages")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: MessageId, + pub channel_id: ChannelId, + pub sender_id: UserId, + pub body: String, + pub sent_at: PrimitiveDateTime, + pub nonce: Uuid, +} + +impl ActiveModelBehavior for ActiveModel {} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::channel::Entity", + from = "Column::ChannelId", + to = "super::channel::Column::Id" + )] + Channel, + #[sea_orm( + belongs_to = "super::user::Entity", + from = "Column::SenderId", + to = "super::user::Column::Id" + )] + Sender, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Channel.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Sender.def() + } +} diff --git a/crates/collab/src/db/tests.rs b/crates/collab/src/db/tests.rs index ee961006cb..1d6c550865 100644 --- a/crates/collab/src/db/tests.rs +++ b/crates/collab/src/db/tests.rs @@ -1,6 +1,7 @@ mod buffer_tests; mod db_tests; mod feature_flag_tests; +mod message_tests; use super::*; use gpui::executor::Background; diff --git a/crates/collab/src/db/tests/message_tests.rs b/crates/collab/src/db/tests/message_tests.rs new file mode 100644 index 0000000000..c40e53d355 --- /dev/null +++ b/crates/collab/src/db/tests/message_tests.rs @@ -0,0 +1,59 @@ +use crate::{ + db::{Database, NewUserParams}, + test_both_dbs, +}; +use std::sync::Arc; +use time::OffsetDateTime; + +test_both_dbs!( + test_channel_message_nonces, + test_channel_message_nonces_postgres, + test_channel_message_nonces_sqlite +); + +async fn test_channel_message_nonces(db: &Arc) { + let user = db + .create_user( + "user@example.com", + false, + NewUserParams { + github_login: "user".into(), + github_user_id: 1, + invite_count: 0, + }, + ) + .await + .unwrap() + .user_id; + let channel = db + .create_channel("channel", None, "room", user) + .await + .unwrap(); + + let owner_id = db.create_server("test").await.unwrap().0 as u32; + + db.join_channel_chat(channel, rpc::ConnectionId { owner_id, id: 0 }, user) + .await + .unwrap(); + + let msg1_id = db + .create_channel_message(channel, user, "1", OffsetDateTime::now_utc(), 1) + .await + .unwrap(); + let msg2_id = db + .create_channel_message(channel, user, "2", OffsetDateTime::now_utc(), 2) + .await + .unwrap(); + let msg3_id = db + .create_channel_message(channel, user, "3", OffsetDateTime::now_utc(), 1) + .await + .unwrap(); + let msg4_id = db + .create_channel_message(channel, user, "4", OffsetDateTime::now_utc(), 2) + .await + .unwrap(); + + assert_ne!(msg1_id, msg2_id); + assert_eq!(msg1_id, msg3_id); + assert_eq!(msg2_id, msg4_id); +} diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 2d66b43b93..3289daf6ca 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -2,7 +2,10 @@ mod connection_pool; use crate::{ auth, - db::{self, ChannelId, ChannelsForUser, Database, ProjectId, RoomId, ServerId, User, UserId}, + db::{ + self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, ServerId, User, + UserId, + }, executor::Executor, AppState, Result, }; @@ -56,6 +59,7 @@ use std::{ }, time::{Duration, Instant}, }; +use time::OffsetDateTime; use tokio::sync::{watch, Semaphore}; use tower::ServiceBuilder; use tracing::{info_span, instrument, Instrument}; @@ -63,6 +67,9 @@ use tracing::{info_span, instrument, Instrument}; pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30); pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10); +const MESSAGE_COUNT_PER_PAGE: usize = 100; +const MAX_MESSAGE_LEN: usize = 1024; + lazy_static! { static ref METRIC_CONNECTIONS: IntGauge = register_int_gauge!("connections", "number of connections").unwrap(); @@ -255,6 +262,11 @@ impl Server { .add_request_handler(get_channel_members) .add_request_handler(respond_to_channel_invite) .add_request_handler(join_channel) + .add_request_handler(join_channel_chat) + .add_message_handler(leave_channel_chat) + .add_request_handler(send_channel_message) + .add_request_handler(remove_channel_message) + .add_request_handler(get_channel_messages) .add_request_handler(follow) .add_message_handler(unfollow) .add_message_handler(update_followers) @@ -885,9 +897,8 @@ async fn connection_lost( room_updated(&room, &session.peer); } } + update_user_contacts(session.user_id, &session).await?; - - } _ = teardown.changed().fuse() => {} } @@ -2633,6 +2644,131 @@ fn channel_buffer_updated( }); } +async fn send_channel_message( + request: proto::SendChannelMessage, + response: Response, + session: Session, +) -> Result<()> { + // Validate the message body. + let body = request.body.trim().to_string(); + if body.len() > MAX_MESSAGE_LEN { + return Err(anyhow!("message is too long"))?; + } + if body.is_empty() { + return Err(anyhow!("message can't be blank"))?; + } + + let timestamp = OffsetDateTime::now_utc(); + let nonce = request + .nonce + .ok_or_else(|| anyhow!("nonce can't be blank"))?; + + let channel_id = ChannelId::from_proto(request.channel_id); + let (message_id, connection_ids) = session + .db() + .await + .create_channel_message( + channel_id, + session.user_id, + &body, + timestamp, + nonce.clone().into(), + ) + .await?; + let message = proto::ChannelMessage { + sender_id: session.user_id.to_proto(), + id: message_id.to_proto(), + body, + timestamp: timestamp.unix_timestamp() as u64, + nonce: Some(nonce), + }; + broadcast(Some(session.connection_id), connection_ids, |connection| { + session.peer.send( + connection, + proto::ChannelMessageSent { + channel_id: channel_id.to_proto(), + message: Some(message.clone()), + }, + ) + }); + response.send(proto::SendChannelMessageResponse { + message: Some(message), + })?; + Ok(()) +} + +async fn remove_channel_message( + request: proto::RemoveChannelMessage, + response: Response, + session: Session, +) -> Result<()> { + let channel_id = ChannelId::from_proto(request.channel_id); + let message_id = MessageId::from_proto(request.message_id); + let connection_ids = session + .db() + .await + .remove_channel_message(channel_id, message_id, session.user_id) + .await?; + broadcast(Some(session.connection_id), connection_ids, |connection| { + session.peer.send(connection, request.clone()) + }); + response.send(proto::Ack {})?; + Ok(()) +} + +async fn join_channel_chat( + request: proto::JoinChannelChat, + response: Response, + session: Session, +) -> Result<()> { + let channel_id = ChannelId::from_proto(request.channel_id); + + let db = session.db().await; + db.join_channel_chat(channel_id, session.connection_id, session.user_id) + .await?; + let messages = db + .get_channel_messages(channel_id, session.user_id, MESSAGE_COUNT_PER_PAGE, None) + .await?; + response.send(proto::JoinChannelChatResponse { + done: messages.len() < MESSAGE_COUNT_PER_PAGE, + messages, + })?; + Ok(()) +} + +async fn leave_channel_chat(request: proto::LeaveChannelChat, session: Session) -> Result<()> { + let channel_id = ChannelId::from_proto(request.channel_id); + session + .db() + .await + .leave_channel_chat(channel_id, session.connection_id, session.user_id) + .await?; + Ok(()) +} + +async fn get_channel_messages( + request: proto::GetChannelMessages, + response: Response, + session: Session, +) -> Result<()> { + let channel_id = ChannelId::from_proto(request.channel_id); + let messages = session + .db() + .await + .get_channel_messages( + channel_id, + session.user_id, + MESSAGE_COUNT_PER_PAGE, + Some(MessageId::from_proto(request.before_message_id)), + ) + .await?; + response.send(proto::GetChannelMessagesResponse { + done: messages.len() < MESSAGE_COUNT_PER_PAGE, + messages, + })?; + Ok(()) +} + async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> { let project_id = ProjectId::from_proto(request.project_id); let project_connection_ids = session diff --git a/crates/collab/src/tests.rs b/crates/collab/src/tests.rs index 3000f0d8c3..b0f5b96fde 100644 --- a/crates/collab/src/tests.rs +++ b/crates/collab/src/tests.rs @@ -2,6 +2,7 @@ use call::Room; use gpui::{ModelHandle, TestAppContext}; mod channel_buffer_tests; +mod channel_message_tests; mod channel_tests; mod integration_tests; mod random_channel_buffer_tests; diff --git a/crates/collab/src/tests/channel_message_tests.rs b/crates/collab/src/tests/channel_message_tests.rs new file mode 100644 index 0000000000..1a9460c6cf --- /dev/null +++ b/crates/collab/src/tests/channel_message_tests.rs @@ -0,0 +1,214 @@ +use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer}; +use channel::{ChannelChat, ChannelMessageId}; +use gpui::{executor::Deterministic, ModelHandle, TestAppContext}; +use std::sync::Arc; + +#[gpui::test] +async fn test_basic_channel_messages( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + + let channel_id = server + .make_channel("the-channel", (&client_a, cx_a), &mut [(&client_b, cx_b)]) + .await; + + let channel_chat_a = client_a + .channel_store() + .update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx)) + .await + .unwrap(); + let channel_chat_b = client_b + .channel_store() + .update(cx_b, |store, cx| store.open_channel_chat(channel_id, cx)) + .await + .unwrap(); + + channel_chat_a + .update(cx_a, |c, cx| c.send_message("one".into(), cx).unwrap()) + .await + .unwrap(); + channel_chat_a + .update(cx_a, |c, cx| c.send_message("two".into(), cx).unwrap()) + .await + .unwrap(); + + deterministic.run_until_parked(); + channel_chat_b + .update(cx_b, |c, cx| c.send_message("three".into(), cx).unwrap()) + .await + .unwrap(); + + deterministic.run_until_parked(); + channel_chat_a.update(cx_a, |c, _| { + assert_eq!( + c.messages() + .iter() + .map(|m| m.body.as_str()) + .collect::>(), + vec!["one", "two", "three"] + ); + }) +} + +#[gpui::test] +async fn test_rejoin_channel_chat( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + + let channel_id = server + .make_channel("the-channel", (&client_a, cx_a), &mut [(&client_b, cx_b)]) + .await; + + let channel_chat_a = client_a + .channel_store() + .update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx)) + .await + .unwrap(); + let channel_chat_b = client_b + .channel_store() + .update(cx_b, |store, cx| store.open_channel_chat(channel_id, cx)) + .await + .unwrap(); + + channel_chat_a + .update(cx_a, |c, cx| c.send_message("one".into(), cx).unwrap()) + .await + .unwrap(); + channel_chat_b + .update(cx_b, |c, cx| c.send_message("two".into(), cx).unwrap()) + .await + .unwrap(); + + server.forbid_connections(); + server.disconnect_client(client_a.peer_id().unwrap()); + + // While client A is disconnected, clients A and B both send new messages. + channel_chat_a + .update(cx_a, |c, cx| c.send_message("three".into(), cx).unwrap()) + .await + .unwrap_err(); + channel_chat_a + .update(cx_a, |c, cx| c.send_message("four".into(), cx).unwrap()) + .await + .unwrap_err(); + channel_chat_b + .update(cx_b, |c, cx| c.send_message("five".into(), cx).unwrap()) + .await + .unwrap(); + channel_chat_b + .update(cx_b, |c, cx| c.send_message("six".into(), cx).unwrap()) + .await + .unwrap(); + + // Client A reconnects. + server.allow_connections(); + deterministic.advance_clock(RECONNECT_TIMEOUT); + + // Client A fetches the messages that were sent while they were disconnected + // and resends their own messages which failed to send. + let expected_messages = &["one", "two", "five", "six", "three", "four"]; + assert_messages(&channel_chat_a, expected_messages, cx_a); + assert_messages(&channel_chat_b, expected_messages, cx_b); +} + +#[gpui::test] +async fn test_remove_channel_message( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, + cx_c: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + let client_c = server.create_client(cx_c, "user_c").await; + + let channel_id = server + .make_channel( + "the-channel", + (&client_a, cx_a), + &mut [(&client_b, cx_b), (&client_c, cx_c)], + ) + .await; + + let channel_chat_a = client_a + .channel_store() + .update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx)) + .await + .unwrap(); + let channel_chat_b = client_b + .channel_store() + .update(cx_b, |store, cx| store.open_channel_chat(channel_id, cx)) + .await + .unwrap(); + + // Client A sends some messages. + channel_chat_a + .update(cx_a, |c, cx| c.send_message("one".into(), cx).unwrap()) + .await + .unwrap(); + channel_chat_a + .update(cx_a, |c, cx| c.send_message("two".into(), cx).unwrap()) + .await + .unwrap(); + channel_chat_a + .update(cx_a, |c, cx| c.send_message("three".into(), cx).unwrap()) + .await + .unwrap(); + + // Clients A and B see all of the messages. + deterministic.run_until_parked(); + let expected_messages = &["one", "two", "three"]; + assert_messages(&channel_chat_a, expected_messages, cx_a); + assert_messages(&channel_chat_b, expected_messages, cx_b); + + // Client A deletes one of their messages. + channel_chat_a + .update(cx_a, |c, cx| { + let ChannelMessageId::Saved(id) = c.message(1).id else { + panic!("message not saved") + }; + c.remove_message(id, cx) + }) + .await + .unwrap(); + + // Client B sees that the message is gone. + deterministic.run_until_parked(); + let expected_messages = &["one", "three"]; + assert_messages(&channel_chat_a, expected_messages, cx_a); + assert_messages(&channel_chat_b, expected_messages, cx_b); + + // Client C joins the channel chat, and does not see the deleted message. + let channel_chat_c = client_c + .channel_store() + .update(cx_c, |store, cx| store.open_channel_chat(channel_id, cx)) + .await + .unwrap(); + assert_messages(&channel_chat_c, expected_messages, cx_c); +} + +#[track_caller] +fn assert_messages(chat: &ModelHandle, messages: &[&str], cx: &mut TestAppContext) { + assert_eq!( + chat.read_with(cx, |chat, _| chat + .messages() + .iter() + .map(|m| m.body.clone()) + .collect::>(),), + messages + ); +} diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index a9f4a31eb7..10d6baec19 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -4825,7 +4825,7 @@ async fn test_project_search( let mut results = HashMap::default(); let mut search_rx = project_b.update(cx_b, |project, cx| { project.search( - SearchQuery::text("world", false, false, Vec::new(), Vec::new()), + SearchQuery::text("world", false, false, Vec::new(), Vec::new()).unwrap(), cx, ) }); diff --git a/crates/collab/src/tests/random_project_collaboration_tests.rs b/crates/collab/src/tests/random_project_collaboration_tests.rs index 7570768249..6f9513c325 100644 --- a/crates/collab/src/tests/random_project_collaboration_tests.rs +++ b/crates/collab/src/tests/random_project_collaboration_tests.rs @@ -869,7 +869,7 @@ impl RandomizedTest for ProjectCollaborationTest { let mut search = project.update(cx, |project, cx| { project.search( - SearchQuery::text(query, false, false, Vec::new(), Vec::new()), + SearchQuery::text(query, false, false, Vec::new(), Vec::new()).unwrap(), cx, ) }); diff --git a/crates/collab/src/tests/test_server.rs b/crates/collab/src/tests/test_server.rs index eef1dde967..a7dbd97239 100644 --- a/crates/collab/src/tests/test_server.rs +++ b/crates/collab/src/tests/test_server.rs @@ -6,7 +6,7 @@ use crate::{ }; use anyhow::anyhow; use call::ActiveCall; -use channel::{channel_buffer::ChannelBuffer, ChannelStore}; +use channel::{ChannelBuffer, ChannelStore}; use client::{ self, proto::PeerId, Client, Connection, Credentials, EstablishConnectionError, UserStore, }; diff --git a/crates/collab_ui/Cargo.toml b/crates/collab_ui/Cargo.toml index da32308558..0a52b9a19f 100644 --- a/crates/collab_ui/Cargo.toml +++ b/crates/collab_ui/Cargo.toml @@ -55,6 +55,7 @@ schemars.workspace = true postage.workspace = true serde.workspace = true serde_derive.workspace = true +time.workspace = true [dev-dependencies] call = { path = "../call", features = ["test-support"] } diff --git a/crates/collab_ui/src/channel_view.rs b/crates/collab_ui/src/channel_view.rs index a09073c55d..c6f32cecd2 100644 --- a/crates/collab_ui/src/channel_view.rs +++ b/crates/collab_ui/src/channel_view.rs @@ -1,8 +1,6 @@ use anyhow::{anyhow, Result}; -use channel::{ - channel_buffer::{self, ChannelBuffer}, - ChannelId, -}; +use call::ActiveCall; +use channel::{ChannelBuffer, ChannelBufferEvent, ChannelId}; use client::proto; use clock::ReplicaId; use collections::HashMap; @@ -38,6 +36,30 @@ pub struct ChannelView { } impl ChannelView { + pub fn deploy(channel_id: ChannelId, workspace: ViewHandle, cx: &mut AppContext) { + let pane = workspace.read(cx).active_pane().clone(); + let channel_view = Self::open(channel_id, pane.clone(), workspace.clone(), cx); + cx.spawn(|mut cx| async move { + let channel_view = channel_view.await?; + pane.update(&mut cx, |pane, cx| { + let room_id = ActiveCall::global(cx) + .read(cx) + .room() + .map(|room| room.read(cx).id()); + ActiveCall::report_call_event_for_room( + "open channel notes", + room_id, + Some(channel_id), + &workspace.read(cx).app_state().client, + cx, + ); + pane.add_item(Box::new(channel_view), true, true, None, cx); + }); + anyhow::Ok(()) + }) + .detach(); + } + pub fn open( channel_id: ChannelId, pane: ViewHandle, @@ -56,6 +78,7 @@ impl ChannelView { cx.spawn(|mut cx| async move { let channel_buffer = channel_buffer.await?; + let markdown = markdown.await?; channel_buffer.update(&mut cx, |buffer, cx| { buffer.buffer().update(cx, |buffer, cx| { @@ -78,7 +101,6 @@ impl ChannelView { cx: &mut ViewContext, ) -> Self { let buffer = channel_buffer.read(cx).buffer(); - // buffer.update(cx, |buffer, cx| buffer.set_language(language, cx)); let editor = cx.add_view(|cx| Editor::for_buffer(buffer, None, cx)); let _editor_event_subscription = cx.subscribe(&editor, |_, _, e, cx| cx.emit(e.clone())); @@ -118,14 +140,14 @@ impl ChannelView { fn handle_channel_buffer_event( &mut self, _: ModelHandle, - event: &channel_buffer::Event, + event: &ChannelBufferEvent, cx: &mut ViewContext, ) { match event { - channel_buffer::Event::CollaboratorsChanged => { + ChannelBufferEvent::CollaboratorsChanged => { self.refresh_replica_id_map(cx); } - channel_buffer::Event::Disconnected => self.editor.update(cx, |editor, cx| { + ChannelBufferEvent::Disconnected => self.editor.update(cx, |editor, cx| { editor.set_read_only(true); cx.notify(); }), diff --git a/crates/collab_ui/src/chat_panel.rs b/crates/collab_ui/src/chat_panel.rs new file mode 100644 index 0000000000..087f2e1b8e --- /dev/null +++ b/crates/collab_ui/src/chat_panel.rs @@ -0,0 +1,701 @@ +use crate::{channel_view::ChannelView, ChatPanelSettings}; +use anyhow::Result; +use call::ActiveCall; +use channel::{ChannelChat, ChannelChatEvent, ChannelMessageId, ChannelStore}; +use client::Client; +use db::kvp::KEY_VALUE_STORE; +use editor::Editor; +use feature_flags::{ChannelsAlpha, FeatureFlagAppExt}; +use gpui::{ + actions, + elements::*, + platform::{CursorStyle, MouseButton}, + serde_json, + views::{ItemType, Select, SelectStyle}, + AnyViewHandle, AppContext, AsyncAppContext, Entity, ModelHandle, Subscription, Task, View, + ViewContext, ViewHandle, WeakViewHandle, +}; +use language::language_settings::SoftWrap; +use menu::Confirm; +use project::Fs; +use serde::{Deserialize, Serialize}; +use settings::SettingsStore; +use std::sync::Arc; +use theme::{IconButton, Theme}; +use time::{OffsetDateTime, UtcOffset}; +use util::{ResultExt, TryFutureExt}; +use workspace::{ + dock::{DockPosition, Panel}, + Workspace, +}; + +const MESSAGE_LOADING_THRESHOLD: usize = 50; +const CHAT_PANEL_KEY: &'static str = "ChatPanel"; + +pub struct ChatPanel { + client: Arc, + channel_store: ModelHandle, + active_chat: Option<(ModelHandle, Subscription)>, + message_list: ListState, + input_editor: ViewHandle, + channel_select: ViewHandle, + ) -> AnyElement