From b085569b46433b04be1c3dd90cd499587c9ad220 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Tue, 31 Oct 2023 11:16:31 -0700 Subject: [PATCH] Add channel2 crate Co-authored-by: Marshall --- Cargo.lock | 37 + Cargo.toml | 1 + crates/channel2/Cargo.toml | 54 + crates/channel2/src/channel2.rs | 23 + crates/channel2/src/channel_buffer.rs | 257 +++++ crates/channel2/src/channel_chat.rs | 647 +++++++++++ crates/channel2/src/channel_store.rs | 1021 +++++++++++++++++ .../src/channel_store/channel_index.rs | 184 +++ crates/channel2/src/channel_store_tests.rs | 380 ++++++ crates/client2/src/user.rs | 33 +- crates/gpui2/src/app/test_context.rs | 19 + crates/rpc2/proto/zed.proto | 255 ++-- crates/rpc2/src/proto.rs | 192 ++-- 13 files changed, 2892 insertions(+), 211 deletions(-) create mode 100644 crates/channel2/Cargo.toml create mode 100644 crates/channel2/src/channel2.rs create mode 100644 crates/channel2/src/channel_buffer.rs create mode 100644 crates/channel2/src/channel_chat.rs create mode 100644 crates/channel2/src/channel_store.rs create mode 100644 crates/channel2/src/channel_store/channel_index.rs create mode 100644 crates/channel2/src/channel_store_tests.rs diff --git a/Cargo.lock b/Cargo.lock index d5d0493936..ff2a43d99d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1347,6 +1347,43 @@ dependencies = [ "uuid 1.4.1", ] +[[package]] +name = "channel2" +version = "0.1.0" +dependencies = [ + "anyhow", + "client2", + "clock", + "collections", + "db2", + "feature_flags2", + "futures 0.3.28", + "gpui2", + "image", + "language2", + "lazy_static", + "log", + "parking_lot 0.11.2", + "postage", + "rand 0.8.5", + "rpc2", + "schemars", + "serde", + "serde_derive", + "settings2", + "smallvec", + "smol", + "sum_tree", + "tempfile", + "text", + "thiserror", + "time", + "tiny_http", + "url", + "util", + "uuid 1.4.1", +] + [[package]] name = "chrono" version = "0.4.31" diff --git a/Cargo.toml b/Cargo.toml index 998ea081a6..772773c977 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "crates/call", "crates/call2", "crates/channel", + "crates/channel2", "crates/cli", "crates/client", "crates/client2", diff --git a/crates/channel2/Cargo.toml b/crates/channel2/Cargo.toml new file mode 100644 index 0000000000..c611944b89 --- /dev/null +++ b/crates/channel2/Cargo.toml @@ -0,0 +1,54 @@ +[package] +name = "channel2" +version = "0.1.0" +edition = "2021" +publish = false + +[lib] +path = "src/channel2.rs" +doctest = false + +[features] +test-support = ["collections/test-support", "gpui2/test-support", "rpc2/test-support"] + +[dependencies] +client2 = { path = "../client2" } +collections = { path = "../collections" } +db2 = { path = "../db2" } +gpui2 = { path = "../gpui2" } +util = { path = "../util" } +rpc2 = { path = "../rpc2" } +text = { path = "../text" } +language2 = { path = "../language2" } +settings2 = { path = "../settings2" } +feature_flags2 = { path = "../feature_flags2" } +sum_tree = { path = "../sum_tree" } +clock = { path = "../clock" } + +anyhow.workspace = true +futures.workspace = true +image = "0.23" +lazy_static.workspace = true +smallvec.workspace = true +log.workspace = true +parking_lot.workspace = true +postage.workspace = true +rand.workspace = true +schemars.workspace = true +smol.workspace = true +thiserror.workspace = true +time.workspace = true +tiny_http = "0.8" +uuid.workspace = true +url = "2.2" +serde.workspace = true +serde_derive.workspace = true +tempfile = "3" + +[dev-dependencies] +collections = { path = "../collections", features = ["test-support"] } +gpui2 = { path = "../gpui2", features = ["test-support"] } +rpc2 = { path = "../rpc2", features = ["test-support"] } +client2 = { path = "../client2", features = ["test-support"] } +settings2 = { path = "../settings2", features = ["test-support"] } +util = { path = "../util", features = ["test-support"] } diff --git a/crates/channel2/src/channel2.rs b/crates/channel2/src/channel2.rs new file mode 100644 index 0000000000..d300b170af --- /dev/null +++ b/crates/channel2/src/channel2.rs @@ -0,0 +1,23 @@ +mod channel_buffer; +mod channel_chat; +mod channel_store; + +use client2::{Client, UserStore}; +use gpui2::{AppContext, Model}; +use std::sync::Arc; + +pub use channel_buffer::{ChannelBuffer, ChannelBufferEvent, ACKNOWLEDGE_DEBOUNCE_INTERVAL}; +pub use channel_chat::{ + mentions_to_proto, ChannelChat, ChannelChatEvent, ChannelMessage, ChannelMessageId, + MessageParams, +}; +pub use channel_store::{Channel, ChannelEvent, ChannelId, ChannelMembership, ChannelStore}; + +#[cfg(test)] +mod channel_store_tests; + +pub fn init(client: &Arc, user_store: Model, cx: &mut AppContext) { + channel_store::init(client, user_store, cx); + channel_buffer::init(client); + channel_chat::init(client); +} diff --git a/crates/channel2/src/channel_buffer.rs b/crates/channel2/src/channel_buffer.rs new file mode 100644 index 0000000000..115896af7c --- /dev/null +++ b/crates/channel2/src/channel_buffer.rs @@ -0,0 +1,257 @@ +use crate::{Channel, ChannelId, ChannelStore}; +use anyhow::Result; +use client2::{Client, Collaborator, UserStore}; +use collections::HashMap; +use gpui2::{AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task}; +use language2::proto::serialize_version; +use rpc2::{ + proto::{self, PeerId}, + TypedEnvelope, +}; +use std::{sync::Arc, time::Duration}; +use util::ResultExt; + +pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250); + +pub(crate) fn init(client: &Arc) { + client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer); + client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators); +} + +pub struct ChannelBuffer { + pub channel_id: ChannelId, + connected: bool, + collaborators: HashMap, + user_store: Model, + channel_store: Model, + buffer: Model, + buffer_epoch: u64, + client: Arc, + subscription: Option, + acknowledge_task: Option>>, +} + +pub enum ChannelBufferEvent { + CollaboratorsChanged, + Disconnected, + BufferEdited, + ChannelChanged, +} + +impl EventEmitter for ChannelBuffer { + type Event = ChannelBufferEvent; +} + +impl ChannelBuffer { + pub(crate) async fn new( + channel: Arc, + client: Arc, + user_store: Model, + channel_store: Model, + mut cx: AsyncAppContext, + ) -> Result> { + let response = client + .request(proto::JoinChannelBuffer { + channel_id: channel.id, + }) + .await?; + + let base_text = response.base_text; + let operations = response + .operations + .into_iter() + .map(language2::proto::deserialize_operation) + .collect::, _>>()?; + + let buffer = cx.build_model(|_| { + language2::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text) + })?; + buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))??; + + let subscription = client.subscribe_to_entity(channel.id)?; + + anyhow::Ok(cx.build_model(|cx| { + cx.subscribe(&buffer, Self::on_buffer_update).detach(); + cx.on_release(Self::release).detach(); + let mut this = Self { + buffer, + buffer_epoch: response.epoch, + client, + connected: true, + collaborators: Default::default(), + acknowledge_task: None, + channel_id: channel.id, + subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())), + user_store, + channel_store, + }; + this.replace_collaborators(response.collaborators, cx); + this + })?) + } + + fn release(&mut self, _: &mut AppContext) { + if self.connected { + if let Some(task) = self.acknowledge_task.take() { + task.detach(); + } + self.client + .send(proto::LeaveChannelBuffer { + channel_id: self.channel_id, + }) + .log_err(); + } + } + + pub fn remote_id(&self, cx: &AppContext) -> u64 { + self.buffer.read(cx).remote_id() + } + + pub fn user_store(&self) -> &Model { + &self.user_store + } + + pub(crate) fn replace_collaborators( + &mut self, + collaborators: Vec, + cx: &mut ModelContext, + ) { + let mut new_collaborators = HashMap::default(); + for collaborator in collaborators { + if let Ok(collaborator) = Collaborator::from_proto(collaborator) { + new_collaborators.insert(collaborator.peer_id, collaborator); + } + } + + for (_, old_collaborator) in &self.collaborators { + if !new_collaborators.contains_key(&old_collaborator.peer_id) { + self.buffer.update(cx, |buffer, cx| { + buffer.remove_peer(old_collaborator.replica_id as u16, cx) + }); + } + } + self.collaborators = new_collaborators; + cx.emit(ChannelBufferEvent::CollaboratorsChanged); + cx.notify(); + } + + async fn handle_update_channel_buffer( + this: Model, + update_channel_buffer: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + let ops = update_channel_buffer + .payload + .operations + .into_iter() + .map(language2::proto::deserialize_operation) + .collect::, _>>()?; + + this.update(&mut cx, |this, cx| { + cx.notify(); + this.buffer + .update(cx, |buffer, cx| buffer.apply_ops(ops, cx)) + })??; + + Ok(()) + } + + async fn handle_update_channel_buffer_collaborators( + this: Model, + message: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + this.update(&mut cx, |this, cx| { + this.replace_collaborators(message.payload.collaborators, cx); + cx.emit(ChannelBufferEvent::CollaboratorsChanged); + cx.notify(); + }) + } + + fn on_buffer_update( + &mut self, + _: Model, + event: &language2::Event, + cx: &mut ModelContext, + ) { + match event { + language2::Event::Operation(operation) => { + let operation = language2::proto::serialize_operation(operation); + self.client + .send(proto::UpdateChannelBuffer { + channel_id: self.channel_id, + operations: vec![operation], + }) + .log_err(); + } + language2::Event::Edited => { + cx.emit(ChannelBufferEvent::BufferEdited); + } + _ => {} + } + } + + pub fn acknowledge_buffer_version(&mut self, cx: &mut ModelContext<'_, ChannelBuffer>) { + let buffer = self.buffer.read(cx); + let version = buffer.version(); + let buffer_id = buffer.remote_id(); + let client = self.client.clone(); + let epoch = self.epoch(); + + self.acknowledge_task = Some(cx.spawn(move |_, cx| async move { + cx.executor().timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL).await; + client + .send(proto::AckBufferOperation { + buffer_id, + epoch, + version: serialize_version(&version), + }) + .ok(); + Ok(()) + })); + } + + pub fn epoch(&self) -> u64 { + self.buffer_epoch + } + + pub fn buffer(&self) -> Model { + self.buffer.clone() + } + + pub fn collaborators(&self) -> &HashMap { + &self.collaborators + } + + pub fn channel(&self, cx: &AppContext) -> Option> { + self.channel_store + .read(cx) + .channel_for_id(self.channel_id) + .cloned() + } + + pub(crate) fn disconnect(&mut self, cx: &mut ModelContext) { + log::info!("channel buffer {} disconnected", self.channel_id); + if self.connected { + self.connected = false; + self.subscription.take(); + cx.emit(ChannelBufferEvent::Disconnected); + cx.notify() + } + } + + pub(crate) fn channel_changed(&mut self, cx: &mut ModelContext) { + cx.emit(ChannelBufferEvent::ChannelChanged); + cx.notify() + } + + pub fn is_connected(&self) -> bool { + self.connected + } + + pub fn replica_id(&self, cx: &AppContext) -> u16 { + self.buffer.read(cx).replica_id() + } +} diff --git a/crates/channel2/src/channel_chat.rs b/crates/channel2/src/channel_chat.rs new file mode 100644 index 0000000000..9306d78520 --- /dev/null +++ b/crates/channel2/src/channel_chat.rs @@ -0,0 +1,647 @@ +use crate::{Channel, ChannelId, ChannelStore}; +use anyhow::{anyhow, Result}; +use client2::{ + proto, + user::{User, UserStore}, + Client, Subscription, TypedEnvelope, UserId, +}; +use futures::lock::Mutex; +use gpui2::{AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task}; +use rand::prelude::*; +use std::{ + collections::HashSet, + mem, + ops::{ControlFlow, Range}, + sync::Arc, +}; +use sum_tree::{Bias, SumTree}; +use time::OffsetDateTime; +use util::{post_inc, ResultExt as _, TryFutureExt}; + +pub struct ChannelChat { + pub channel_id: ChannelId, + messages: SumTree, + acknowledged_message_ids: HashSet, + channel_store: Model, + loaded_all_messages: bool, + last_acknowledged_id: Option, + next_pending_message_id: usize, + user_store: Model, + rpc: Arc, + outgoing_messages_lock: Arc>, + rng: StdRng, + _subscription: Subscription, +} + +#[derive(Debug, PartialEq, Eq)] +pub struct MessageParams { + pub text: String, + pub mentions: Vec<(Range, UserId)>, +} + +#[derive(Clone, Debug)] +pub struct ChannelMessage { + pub id: ChannelMessageId, + pub body: String, + pub timestamp: OffsetDateTime, + pub sender: Arc, + pub nonce: u128, + pub mentions: Vec<(Range, UserId)>, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum ChannelMessageId { + Saved(u64), + Pending(usize), +} + +#[derive(Clone, Debug, Default)] +pub struct ChannelMessageSummary { + max_id: ChannelMessageId, + count: usize, +} + +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] +struct Count(usize); + +#[derive(Clone, Debug, PartialEq)] +pub enum ChannelChatEvent { + MessagesUpdated { + old_range: Range, + new_count: usize, + }, + NewMessage { + channel_id: ChannelId, + message_id: u64, + }, +} + +impl EventEmitter for ChannelChat { + type Event = ChannelChatEvent; +} +pub fn init(client: &Arc) { + client.add_model_message_handler(ChannelChat::handle_message_sent); + client.add_model_message_handler(ChannelChat::handle_message_removed); +} + +impl ChannelChat { + pub async fn new( + channel: Arc, + channel_store: Model, + user_store: Model, + client: Arc, + mut cx: AsyncAppContext, + ) -> Result> { + let channel_id = channel.id; + let subscription = client.subscribe_to_entity(channel_id).unwrap(); + + let response = client + .request(proto::JoinChannelChat { channel_id }) + .await?; + let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?; + let loaded_all_messages = response.done; + + Ok(cx.build_model(|cx| { + cx.on_release(Self::release).detach(); + let mut this = Self { + channel_id: channel.id, + user_store, + channel_store, + rpc: client, + outgoing_messages_lock: Default::default(), + messages: Default::default(), + acknowledged_message_ids: Default::default(), + loaded_all_messages, + next_pending_message_id: 0, + last_acknowledged_id: None, + rng: StdRng::from_entropy(), + _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()), + }; + this.insert_messages(messages, cx); + this + })?) + } + + fn release(&mut self, _: &mut AppContext) { + self.rpc + .send(proto::LeaveChannelChat { + channel_id: self.channel_id, + }) + .log_err(); + } + + pub fn channel(&self, cx: &AppContext) -> Option> { + self.channel_store + .read(cx) + .channel_for_id(self.channel_id) + .cloned() + } + + pub fn client(&self) -> &Arc { + &self.rpc + } + + pub fn send_message( + &mut self, + message: MessageParams, + cx: &mut ModelContext, + ) -> Result>> { + if message.text.is_empty() { + Err(anyhow!("message body can't be empty"))?; + } + + let current_user = self + .user_store + .read(cx) + .current_user() + .ok_or_else(|| anyhow!("current_user is not present"))?; + + let channel_id = self.channel_id; + let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id)); + let nonce = self.rng.gen(); + self.insert_messages( + SumTree::from_item( + ChannelMessage { + id: pending_id, + body: message.text.clone(), + sender: current_user, + timestamp: OffsetDateTime::now_utc(), + mentions: message.mentions.clone(), + nonce, + }, + &(), + ), + cx, + ); + let user_store = self.user_store.clone(); + let rpc = self.rpc.clone(); + let outgoing_messages_lock = self.outgoing_messages_lock.clone(); + Ok(cx.spawn(move |this, mut cx| async move { + let outgoing_message_guard = outgoing_messages_lock.lock().await; + let request = rpc.request(proto::SendChannelMessage { + channel_id, + body: message.text, + nonce: Some(nonce.into()), + mentions: mentions_to_proto(&message.mentions), + }); + let response = request.await?; + drop(outgoing_message_guard); + let response = response.message.ok_or_else(|| anyhow!("invalid message"))?; + let id = response.id; + let message = ChannelMessage::from_proto(response, &user_store, &mut cx).await?; + this.update(&mut cx, |this, cx| { + this.insert_messages(SumTree::from_item(message, &()), cx); + })?; + Ok(id) + })) + } + + pub fn remove_message(&mut self, id: u64, cx: &mut ModelContext) -> Task> { + let response = self.rpc.request(proto::RemoveChannelMessage { + channel_id: self.channel_id, + message_id: id, + }); + cx.spawn(move |this, mut cx| async move { + response.await?; + this.update(&mut cx, |this, cx| { + this.message_removed(id, cx); + })?; + Ok(()) + }) + } + + pub fn load_more_messages(&mut self, cx: &mut ModelContext) -> Option>> { + if self.loaded_all_messages { + return None; + } + + let rpc = self.rpc.clone(); + let user_store = self.user_store.clone(); + let channel_id = self.channel_id; + let before_message_id = self.first_loaded_message_id()?; + Some(cx.spawn(move |this, mut cx| { + async move { + let response = rpc + .request(proto::GetChannelMessages { + channel_id, + before_message_id, + }) + .await?; + let loaded_all_messages = response.done; + let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?; + this.update(&mut cx, |this, cx| { + this.loaded_all_messages = loaded_all_messages; + this.insert_messages(messages, cx); + })?; + anyhow::Ok(()) + } + .log_err() + })) + } + + pub fn first_loaded_message_id(&mut self) -> Option { + self.messages.first().and_then(|message| match message.id { + ChannelMessageId::Saved(id) => Some(id), + ChannelMessageId::Pending(_) => None, + }) + } + + /// Load all of the chat messages since a certain message id. + /// + /// For now, we always maintain a suffix of the channel's messages. + pub async fn load_history_since_message( + chat: Model, + message_id: u64, + mut cx: AsyncAppContext, + ) -> Option { + loop { + let step = chat + .update(&mut cx, |chat, cx| { + if let Some(first_id) = chat.first_loaded_message_id() { + if first_id <= message_id { + let mut cursor = chat.messages.cursor::<(ChannelMessageId, Count)>(); + let message_id = ChannelMessageId::Saved(message_id); + cursor.seek(&message_id, Bias::Left, &()); + return ControlFlow::Break( + if cursor + .item() + .map_or(false, |message| message.id == message_id) + { + Some(cursor.start().1 .0) + } else { + None + }, + ); + } + } + ControlFlow::Continue(chat.load_more_messages(cx)) + }) + .log_err()?; + match step { + ControlFlow::Break(ix) => return ix, + ControlFlow::Continue(task) => task?.await?, + } + } + } + + pub fn acknowledge_last_message(&mut self, cx: &mut ModelContext) { + if let ChannelMessageId::Saved(latest_message_id) = self.messages.summary().max_id { + if self + .last_acknowledged_id + .map_or(true, |acknowledged_id| acknowledged_id < latest_message_id) + { + self.rpc + .send(proto::AckChannelMessage { + channel_id: self.channel_id, + message_id: latest_message_id, + }) + .ok(); + self.last_acknowledged_id = Some(latest_message_id); + self.channel_store.update(cx, |store, cx| { + store.acknowledge_message_id(self.channel_id, latest_message_id, cx); + }); + } + } + } + + pub fn rejoin(&mut self, cx: &mut ModelContext) { + let user_store = self.user_store.clone(); + let rpc = self.rpc.clone(); + let channel_id = self.channel_id; + cx.spawn(move |this, mut cx| { + async move { + let response = rpc.request(proto::JoinChannelChat { channel_id }).await?; + let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?; + let loaded_all_messages = response.done; + + let pending_messages = this.update(&mut cx, |this, cx| { + if let Some((first_new_message, last_old_message)) = + messages.first().zip(this.messages.last()) + { + if first_new_message.id > last_old_message.id { + let old_messages = mem::take(&mut this.messages); + cx.emit(ChannelChatEvent::MessagesUpdated { + old_range: 0..old_messages.summary().count, + new_count: 0, + }); + this.loaded_all_messages = loaded_all_messages; + } + } + + this.insert_messages(messages, cx); + if loaded_all_messages { + this.loaded_all_messages = loaded_all_messages; + } + + this.pending_messages().cloned().collect::>() + })?; + + for pending_message in pending_messages { + let request = rpc.request(proto::SendChannelMessage { + channel_id, + body: pending_message.body, + mentions: mentions_to_proto(&pending_message.mentions), + nonce: Some(pending_message.nonce.into()), + }); + let response = request.await?; + let message = ChannelMessage::from_proto( + response.message.ok_or_else(|| anyhow!("invalid message"))?, + &user_store, + &mut cx, + ) + .await?; + this.update(&mut cx, |this, cx| { + this.insert_messages(SumTree::from_item(message, &()), cx); + })?; + } + + anyhow::Ok(()) + } + .log_err() + }) + .detach(); + } + + pub fn message_count(&self) -> usize { + self.messages.summary().count + } + + pub fn messages(&self) -> &SumTree { + &self.messages + } + + pub fn message(&self, ix: usize) -> &ChannelMessage { + let mut cursor = self.messages.cursor::(); + cursor.seek(&Count(ix), Bias::Right, &()); + cursor.item().unwrap() + } + + pub fn acknowledge_message(&mut self, id: u64) { + if self.acknowledged_message_ids.insert(id) { + self.rpc + .send(proto::AckChannelMessage { + channel_id: self.channel_id, + message_id: id, + }) + .ok(); + } + } + + pub fn messages_in_range(&self, range: Range) -> impl Iterator { + let mut cursor = self.messages.cursor::(); + cursor.seek(&Count(range.start), Bias::Right, &()); + cursor.take(range.len()) + } + + pub fn pending_messages(&self) -> impl Iterator { + let mut cursor = self.messages.cursor::(); + cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &()); + cursor + } + + async fn handle_message_sent( + this: Model, + message: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?; + let message = message + .payload + .message + .ok_or_else(|| anyhow!("empty message"))?; + let message_id = message.id; + + let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?; + this.update(&mut cx, |this, cx| { + this.insert_messages(SumTree::from_item(message, &()), cx); + cx.emit(ChannelChatEvent::NewMessage { + channel_id: this.channel_id, + message_id, + }) + })?; + + Ok(()) + } + + async fn handle_message_removed( + this: Model, + message: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + this.update(&mut cx, |this, cx| { + this.message_removed(message.payload.message_id, cx) + })?; + Ok(()) + } + + fn insert_messages(&mut self, messages: SumTree, cx: &mut ModelContext) { + if let Some((first_message, last_message)) = messages.first().zip(messages.last()) { + let nonces = messages + .cursor::<()>() + .map(|m| m.nonce) + .collect::>(); + + let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>(); + let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &()); + let start_ix = old_cursor.start().1 .0; + let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &()); + let removed_count = removed_messages.summary().count; + let new_count = messages.summary().count; + let end_ix = start_ix + removed_count; + + new_messages.append(messages, &()); + + let mut ranges = Vec::>::new(); + if new_messages.last().unwrap().is_pending() { + new_messages.append(old_cursor.suffix(&()), &()); + } else { + new_messages.append( + old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()), + &(), + ); + + while let Some(message) = old_cursor.item() { + let message_ix = old_cursor.start().1 .0; + if nonces.contains(&message.nonce) { + if ranges.last().map_or(false, |r| r.end == message_ix) { + ranges.last_mut().unwrap().end += 1; + } else { + ranges.push(message_ix..message_ix + 1); + } + } else { + new_messages.push(message.clone(), &()); + } + old_cursor.next(&()); + } + } + + drop(old_cursor); + self.messages = new_messages; + + for range in ranges.into_iter().rev() { + cx.emit(ChannelChatEvent::MessagesUpdated { + old_range: range, + new_count: 0, + }); + } + cx.emit(ChannelChatEvent::MessagesUpdated { + old_range: start_ix..end_ix, + new_count, + }); + + cx.notify(); + } + } + + fn message_removed(&mut self, id: u64, cx: &mut ModelContext) { + let mut cursor = self.messages.cursor::(); + let mut messages = cursor.slice(&ChannelMessageId::Saved(id), Bias::Left, &()); + if let Some(item) = cursor.item() { + if item.id == ChannelMessageId::Saved(id) { + let ix = messages.summary().count; + cursor.next(&()); + messages.append(cursor.suffix(&()), &()); + drop(cursor); + self.messages = messages; + cx.emit(ChannelChatEvent::MessagesUpdated { + old_range: ix..ix + 1, + new_count: 0, + }); + } + } + } +} + +async fn messages_from_proto( + proto_messages: Vec, + user_store: &Model, + cx: &mut AsyncAppContext, +) -> Result> { + let messages = ChannelMessage::from_proto_vec(proto_messages, user_store, cx).await?; + let mut result = SumTree::new(); + result.extend(messages, &()); + Ok(result) +} + +impl ChannelMessage { + pub async fn from_proto( + message: proto::ChannelMessage, + user_store: &Model, + cx: &mut AsyncAppContext, + ) -> Result { + let sender = user_store + .update(cx, |user_store, cx| { + user_store.get_user(message.sender_id, cx) + })? + .await?; + Ok(ChannelMessage { + id: ChannelMessageId::Saved(message.id), + body: message.body, + mentions: message + .mentions + .into_iter() + .filter_map(|mention| { + let range = mention.range?; + Some((range.start as usize..range.end as usize, mention.user_id)) + }) + .collect(), + timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?, + sender, + nonce: message + .nonce + .ok_or_else(|| anyhow!("nonce is required"))? + .into(), + }) + } + + pub fn is_pending(&self) -> bool { + matches!(self.id, ChannelMessageId::Pending(_)) + } + + pub async fn from_proto_vec( + proto_messages: Vec, + user_store: &Model, + cx: &mut AsyncAppContext, + ) -> Result> { + let unique_user_ids = proto_messages + .iter() + .map(|m| m.sender_id) + .collect::>() + .into_iter() + .collect(); + user_store + .update(cx, |user_store, cx| { + user_store.get_users(unique_user_ids, cx) + })? + .await?; + + let mut messages = Vec::with_capacity(proto_messages.len()); + for message in proto_messages { + messages.push(ChannelMessage::from_proto(message, user_store, cx).await?); + } + Ok(messages) + } +} + +pub fn mentions_to_proto(mentions: &[(Range, UserId)]) -> Vec { + mentions + .iter() + .map(|(range, user_id)| proto::ChatMention { + range: Some(proto::Range { + start: range.start as u64, + end: range.end as u64, + }), + user_id: *user_id as u64, + }) + .collect() +} + +impl sum_tree::Item for ChannelMessage { + type Summary = ChannelMessageSummary; + + fn summary(&self) -> Self::Summary { + ChannelMessageSummary { + max_id: self.id, + count: 1, + } + } +} + +impl Default for ChannelMessageId { + fn default() -> Self { + Self::Saved(0) + } +} + +impl sum_tree::Summary for ChannelMessageSummary { + type Context = (); + + fn add_summary(&mut self, summary: &Self, _: &()) { + self.max_id = summary.max_id; + self.count += summary.count; + } +} + +impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId { + fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) { + debug_assert!(summary.max_id > *self); + *self = summary.max_id; + } +} + +impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count { + fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) { + self.0 += summary.count; + } +} + +impl<'a> From<&'a str> for MessageParams { + fn from(value: &'a str) -> Self { + Self { + text: value.into(), + mentions: Vec::new(), + } + } +} diff --git a/crates/channel2/src/channel_store.rs b/crates/channel2/src/channel_store.rs new file mode 100644 index 0000000000..389b96ba42 --- /dev/null +++ b/crates/channel2/src/channel_store.rs @@ -0,0 +1,1021 @@ +mod channel_index; + +use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage}; +use anyhow::{anyhow, Result}; +use channel_index::ChannelIndex; +use client2::{Client, Subscription, User, UserId, UserStore}; +use collections::{hash_map, HashMap, HashSet}; +use db2::RELEASE_CHANNEL; +use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt}; +use gpui2::{ + AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel, +}; +use rpc2::{ + proto::{self, ChannelVisibility}, + TypedEnvelope, +}; +use std::{mem, sync::Arc, time::Duration}; +use util::{async_maybe, ResultExt}; + +pub fn init(client: &Arc, user_store: Model, cx: &mut AppContext) { + let channel_store = + cx.build_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx)); + cx.set_global(channel_store); +} + +pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30); + +pub type ChannelId = u64; + +pub struct ChannelStore { + pub channel_index: ChannelIndex, + channel_invitations: Vec>, + channel_participants: HashMap>>, + outgoing_invites: HashSet<(ChannelId, UserId)>, + update_channels_tx: mpsc::UnboundedSender, + opened_buffers: HashMap>, + opened_chats: HashMap>, + client: Arc, + user_store: Model, + _rpc_subscription: Subscription, + _watch_connection_status: Task>, + disconnect_channel_buffers_task: Option>, + _update_channels: Task<()>, +} + +#[derive(Clone, Debug, PartialEq)] +pub struct Channel { + pub id: ChannelId, + pub name: String, + pub visibility: proto::ChannelVisibility, + pub role: proto::ChannelRole, + pub unseen_note_version: Option<(u64, clock::Global)>, + pub unseen_message_id: Option, + pub parent_path: Vec, +} + +impl Channel { + pub fn link(&self) -> String { + RELEASE_CHANNEL.link_prefix().to_owned() + + "channel/" + + &self.slug() + + "-" + + &self.id.to_string() + } + + pub fn slug(&self) -> String { + let slug: String = self + .name + .chars() + .map(|c| if c.is_alphanumeric() { c } else { '-' }) + .collect(); + + slug.trim_matches(|c| c == '-').to_string() + } + + pub fn can_edit_notes(&self) -> bool { + self.role == proto::ChannelRole::Member || self.role == proto::ChannelRole::Admin + } +} + +pub struct ChannelMembership { + pub user: Arc, + pub kind: proto::channel_member::Kind, + pub role: proto::ChannelRole, +} +impl ChannelMembership { + pub fn sort_key(&self) -> MembershipSortKey { + MembershipSortKey { + role_order: match self.role { + proto::ChannelRole::Admin => 0, + proto::ChannelRole::Member => 1, + proto::ChannelRole::Banned => 2, + proto::ChannelRole::Guest => 3, + }, + kind_order: match self.kind { + proto::channel_member::Kind::Member => 0, + proto::channel_member::Kind::AncestorMember => 1, + proto::channel_member::Kind::Invitee => 2, + }, + username_order: self.user.github_login.as_str(), + } + } +} + +#[derive(PartialOrd, Ord, PartialEq, Eq)] +pub struct MembershipSortKey<'a> { + role_order: u8, + kind_order: u8, + username_order: &'a str, +} + +pub enum ChannelEvent { + ChannelCreated(ChannelId), + ChannelRenamed(ChannelId), +} + +impl EventEmitter for ChannelStore { + type Event = ChannelEvent; +} + +enum OpenedModelHandle { + Open(WeakModel), + Loading(Shared, Arc>>>), +} + +impl ChannelStore { + pub fn global(cx: &AppContext) -> Model { + cx.global::>().clone() + } + + pub fn new( + client: Arc, + user_store: Model, + cx: &mut ModelContext, + ) -> Self { + let rpc_subscription = + client.add_message_handler(cx.weak_model(), Self::handle_update_channels); + + let mut connection_status = client.status(); + let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded(); + let watch_connection_status = cx.spawn(|this, mut cx| async move { + while let Some(status) = connection_status.next().await { + let this = this.upgrade()?; + match status { + client2::Status::Connected { .. } => { + this.update(&mut cx, |this, cx| this.handle_connect(cx)) + .ok()? + .await + .log_err()?; + } + client2::Status::SignedOut | client2::Status::UpgradeRequired => { + this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx)) + .ok(); + } + _ => { + this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx)) + .ok(); + } + } + } + Some(()) + }); + + Self { + channel_invitations: Vec::default(), + channel_index: ChannelIndex::default(), + channel_participants: Default::default(), + outgoing_invites: Default::default(), + opened_buffers: Default::default(), + opened_chats: Default::default(), + update_channels_tx, + client, + user_store, + _rpc_subscription: rpc_subscription, + _watch_connection_status: watch_connection_status, + disconnect_channel_buffers_task: None, + _update_channels: cx.spawn(|this, mut cx| async move { + async_maybe!({ + while let Some(update_channels) = update_channels_rx.next().await { + if let Some(this) = this.upgrade() { + let update_task = this.update(&mut cx, |this, cx| { + this.update_channels(update_channels, cx) + })?; + if let Some(update_task) = update_task { + update_task.await.log_err(); + } + } + } + anyhow::Ok(()) + }) + .await + .log_err(); + }), + } + } + + pub fn client(&self) -> Arc { + self.client.clone() + } + + /// Returns the number of unique channels in the store + pub fn channel_count(&self) -> usize { + self.channel_index.by_id().len() + } + + /// Returns the index of a channel ID in the list of unique channels + pub fn index_of_channel(&self, channel_id: ChannelId) -> Option { + self.channel_index + .by_id() + .keys() + .position(|id| *id == channel_id) + } + + /// Returns an iterator over all unique channels + pub fn channels(&self) -> impl '_ + Iterator> { + self.channel_index.by_id().values() + } + + /// Iterate over all entries in the channel DAG + pub fn ordered_channels(&self) -> impl '_ + Iterator)> { + self.channel_index + .ordered_channels() + .iter() + .filter_map(move |id| { + let channel = self.channel_index.by_id().get(id)?; + Some((channel.parent_path.len(), channel)) + }) + } + + pub fn channel_at_index(&self, ix: usize) -> Option<&Arc> { + let channel_id = self.channel_index.ordered_channels().get(ix)?; + self.channel_index.by_id().get(channel_id) + } + + pub fn channel_at(&self, ix: usize) -> Option<&Arc> { + self.channel_index.by_id().values().nth(ix) + } + + pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool { + self.channel_invitations + .iter() + .any(|channel| channel.id == channel_id) + } + + pub fn channel_invitations(&self) -> &[Arc] { + &self.channel_invitations + } + + pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc> { + self.channel_index.by_id().get(&channel_id) + } + + pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool { + if let Some(buffer) = self.opened_buffers.get(&channel_id) { + if let OpenedModelHandle::Open(buffer) = buffer { + return buffer.upgrade().is_some(); + } + } + false + } + + pub fn open_channel_buffer( + &mut self, + channel_id: ChannelId, + cx: &mut ModelContext, + ) -> Task>> { + let client = self.client.clone(); + let user_store = self.user_store.clone(); + let channel_store = cx.handle(); + self.open_channel_resource( + channel_id, + |this| &mut this.opened_buffers, + |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx), + cx, + ) + } + + pub fn fetch_channel_messages( + &self, + message_ids: Vec, + cx: &mut ModelContext, + ) -> Task>> { + let request = if message_ids.is_empty() { + None + } else { + Some( + self.client + .request(proto::GetChannelMessagesById { message_ids }), + ) + }; + cx.spawn(|this, mut cx| async move { + if let Some(request) = request { + let response = request.await?; + let this = this + .upgrade() + .ok_or_else(|| anyhow!("channel store dropped"))?; + let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?; + ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await + } else { + Ok(Vec::new()) + } + }) + } + + pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option { + self.channel_index + .by_id() + .get(&channel_id) + .map(|channel| channel.unseen_note_version.is_some()) + } + + pub fn has_new_messages(&self, channel_id: ChannelId) -> Option { + self.channel_index + .by_id() + .get(&channel_id) + .map(|channel| channel.unseen_message_id.is_some()) + } + + pub fn notes_changed( + &mut self, + channel_id: ChannelId, + epoch: u64, + version: &clock::Global, + cx: &mut ModelContext, + ) { + self.channel_index.note_changed(channel_id, epoch, version); + cx.notify(); + } + + pub fn new_message( + &mut self, + channel_id: ChannelId, + message_id: u64, + cx: &mut ModelContext, + ) { + self.channel_index.new_message(channel_id, message_id); + cx.notify(); + } + + pub fn acknowledge_message_id( + &mut self, + channel_id: ChannelId, + message_id: u64, + cx: &mut ModelContext, + ) { + self.channel_index + .acknowledge_message_id(channel_id, message_id); + cx.notify(); + } + + pub fn acknowledge_notes_version( + &mut self, + channel_id: ChannelId, + epoch: u64, + version: &clock::Global, + cx: &mut ModelContext, + ) { + self.channel_index + .acknowledge_note_version(channel_id, epoch, version); + cx.notify(); + } + + pub fn open_channel_chat( + &mut self, + channel_id: ChannelId, + cx: &mut ModelContext, + ) -> Task>> { + let client = self.client.clone(); + let user_store = self.user_store.clone(); + let this = cx.handle(); + self.open_channel_resource( + channel_id, + |this| &mut this.opened_chats, + |channel, cx| ChannelChat::new(channel, this, user_store, client, cx), + cx, + ) + } + + /// Asynchronously open a given resource associated with a channel. + /// + /// Make sure that the resource is only opened once, even if this method + /// is called multiple times with the same channel id while the first task + /// is still running. + fn open_channel_resource( + &mut self, + channel_id: ChannelId, + get_map: fn(&mut Self) -> &mut HashMap>, + load: F, + cx: &mut ModelContext, + ) -> Task>> + where + F: 'static + Send + FnOnce(Arc, AsyncAppContext) -> Fut, + Fut: Send + Future>>, + T: 'static, + { + let task = loop { + match get_map(self).entry(channel_id) { + hash_map::Entry::Occupied(e) => match e.get() { + OpenedModelHandle::Open(model) => { + if let Some(model) = model.upgrade() { + break Task::ready(Ok(model)).shared(); + } else { + get_map(self).remove(&channel_id); + continue; + } + } + OpenedModelHandle::Loading(task) => { + break task.clone(); + } + }, + hash_map::Entry::Vacant(e) => { + let task = cx + .spawn(move |this, mut cx| async move { + let channel = this.update(&mut cx, |this, _| { + this.channel_for_id(channel_id).cloned().ok_or_else(|| { + Arc::new(anyhow!("no channel for id: {}", channel_id)) + }) + })??; + + load(channel, cx).await.map_err(Arc::new) + }) + .shared(); + + e.insert(OpenedModelHandle::Loading(task.clone())); + cx.spawn({ + let task = task.clone(); + move |this, mut cx| async move { + let result = task.await; + this.update(&mut cx, |this, _| match result { + Ok(model) => { + get_map(this).insert( + channel_id, + OpenedModelHandle::Open(model.downgrade()), + ); + } + Err(_) => { + get_map(this).remove(&channel_id); + } + }) + .ok(); + } + }) + .detach(); + break task; + } + } + }; + cx.executor() + .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) }) + } + + pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool { + let Some(channel) = self.channel_for_id(channel_id) else { + return false; + }; + channel.role == proto::ChannelRole::Admin + } + + pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc] { + self.channel_participants + .get(&channel_id) + .map_or(&[], |v| v.as_slice()) + } + + pub fn create_channel( + &self, + name: &str, + parent_id: Option, + cx: &mut ModelContext, + ) -> Task> { + let client = self.client.clone(); + let name = name.trim_start_matches("#").to_owned(); + cx.spawn(move |this, mut cx| async move { + let response = client + .request(proto::CreateChannel { name, parent_id }) + .await?; + + let channel = response + .channel + .ok_or_else(|| anyhow!("missing channel in response"))?; + let channel_id = channel.id; + + this.update(&mut cx, |this, cx| { + let task = this.update_channels( + proto::UpdateChannels { + channels: vec![channel], + ..Default::default() + }, + cx, + ); + assert!(task.is_none()); + + // This event is emitted because the collab panel wants to clear the pending edit state + // before this frame is rendered. But we can't guarantee that the collab panel's future + // will resolve before this flush_effects finishes. Synchronously emitting this event + // ensures that the collab panel will observe this creation before the frame completes + cx.emit(ChannelEvent::ChannelCreated(channel_id)); + })?; + + Ok(channel_id) + }) + } + + pub fn move_channel( + &mut self, + channel_id: ChannelId, + to: Option, + cx: &mut ModelContext, + ) -> Task> { + let client = self.client.clone(); + cx.spawn(move |_, _| async move { + let _ = client + .request(proto::MoveChannel { channel_id, to }) + .await?; + + Ok(()) + }) + } + + pub fn set_channel_visibility( + &mut self, + channel_id: ChannelId, + visibility: ChannelVisibility, + cx: &mut ModelContext, + ) -> Task> { + let client = self.client.clone(); + cx.spawn(move |_, _| async move { + let _ = client + .request(proto::SetChannelVisibility { + channel_id, + visibility: visibility.into(), + }) + .await?; + + Ok(()) + }) + } + + pub fn invite_member( + &mut self, + channel_id: ChannelId, + user_id: UserId, + role: proto::ChannelRole, + cx: &mut ModelContext, + ) -> Task> { + if !self.outgoing_invites.insert((channel_id, user_id)) { + return Task::ready(Err(anyhow!("invite request already in progress"))); + } + + cx.notify(); + let client = self.client.clone(); + cx.spawn(move |this, mut cx| async move { + let result = client + .request(proto::InviteChannelMember { + channel_id, + user_id, + role: role.into(), + }) + .await; + + this.update(&mut cx, |this, cx| { + this.outgoing_invites.remove(&(channel_id, user_id)); + cx.notify(); + })?; + + result?; + + Ok(()) + }) + } + + pub fn remove_member( + &mut self, + channel_id: ChannelId, + user_id: u64, + cx: &mut ModelContext, + ) -> Task> { + if !self.outgoing_invites.insert((channel_id, user_id)) { + return Task::ready(Err(anyhow!("invite request already in progress"))); + } + + cx.notify(); + let client = self.client.clone(); + cx.spawn(move |this, mut cx| async move { + let result = client + .request(proto::RemoveChannelMember { + channel_id, + user_id, + }) + .await; + + this.update(&mut cx, |this, cx| { + this.outgoing_invites.remove(&(channel_id, user_id)); + cx.notify(); + })?; + result?; + Ok(()) + }) + } + + pub fn set_member_role( + &mut self, + channel_id: ChannelId, + user_id: UserId, + role: proto::ChannelRole, + cx: &mut ModelContext, + ) -> Task> { + if !self.outgoing_invites.insert((channel_id, user_id)) { + return Task::ready(Err(anyhow!("member request already in progress"))); + } + + cx.notify(); + let client = self.client.clone(); + cx.spawn(move |this, mut cx| async move { + let result = client + .request(proto::SetChannelMemberRole { + channel_id, + user_id, + role: role.into(), + }) + .await; + + this.update(&mut cx, |this, cx| { + this.outgoing_invites.remove(&(channel_id, user_id)); + cx.notify(); + })?; + + result?; + Ok(()) + }) + } + + pub fn rename( + &mut self, + channel_id: ChannelId, + new_name: &str, + cx: &mut ModelContext, + ) -> Task> { + let client = self.client.clone(); + let name = new_name.to_string(); + cx.spawn(move |this, mut cx| async move { + let channel = client + .request(proto::RenameChannel { channel_id, name }) + .await? + .channel + .ok_or_else(|| anyhow!("missing channel in response"))?; + this.update(&mut cx, |this, cx| { + let task = this.update_channels( + proto::UpdateChannels { + channels: vec![channel], + ..Default::default() + }, + cx, + ); + assert!(task.is_none()); + + // This event is emitted because the collab panel wants to clear the pending edit state + // before this frame is rendered. But we can't guarantee that the collab panel's future + // will resolve before this flush_effects finishes. Synchronously emitting this event + // ensures that the collab panel will observe this creation before the frame complete + cx.emit(ChannelEvent::ChannelRenamed(channel_id)) + })?; + Ok(()) + }) + } + + pub fn respond_to_channel_invite( + &mut self, + channel_id: ChannelId, + accept: bool, + cx: &mut ModelContext, + ) -> Task> { + let client = self.client.clone(); + cx.executor().spawn(async move { + client + .request(proto::RespondToChannelInvite { channel_id, accept }) + .await?; + Ok(()) + }) + } + + pub fn get_channel_member_details( + &self, + channel_id: ChannelId, + cx: &mut ModelContext, + ) -> Task>> { + let client = self.client.clone(); + let user_store = self.user_store.downgrade(); + cx.spawn(move |_, mut cx| async move { + let response = client + .request(proto::GetChannelMembers { channel_id }) + .await?; + + let user_ids = response.members.iter().map(|m| m.user_id).collect(); + let user_store = user_store + .upgrade() + .ok_or_else(|| anyhow!("user store dropped"))?; + let users = user_store + .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))? + .await?; + + Ok(users + .into_iter() + .zip(response.members) + .filter_map(|(user, member)| { + Some(ChannelMembership { + user, + role: member.role(), + kind: member.kind(), + }) + }) + .collect()) + }) + } + + pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future> { + let client = self.client.clone(); + async move { + client.request(proto::DeleteChannel { channel_id }).await?; + Ok(()) + } + } + + pub fn has_pending_channel_invite_response(&self, _: &Arc) -> bool { + false + } + + pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool { + self.outgoing_invites.contains(&(channel_id, user_id)) + } + + async fn handle_update_channels( + this: Model, + message: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + this.update(&mut cx, |this, _| { + this.update_channels_tx + .unbounded_send(message.payload) + .unwrap(); + })?; + Ok(()) + } + + fn handle_connect(&mut self, cx: &mut ModelContext) -> Task> { + self.channel_index.clear(); + self.channel_invitations.clear(); + self.channel_participants.clear(); + self.channel_index.clear(); + self.outgoing_invites.clear(); + self.disconnect_channel_buffers_task.take(); + + for chat in self.opened_chats.values() { + if let OpenedModelHandle::Open(chat) = chat { + if let Some(chat) = chat.upgrade() { + chat.update(cx, |chat, cx| { + chat.rejoin(cx); + }); + } + } + } + + let mut buffer_versions = Vec::new(); + for buffer in self.opened_buffers.values() { + if let OpenedModelHandle::Open(buffer) = buffer { + if let Some(buffer) = buffer.upgrade() { + let channel_buffer = buffer.read(cx); + let buffer = channel_buffer.buffer().read(cx); + buffer_versions.push(proto::ChannelBufferVersion { + channel_id: channel_buffer.channel_id, + epoch: channel_buffer.epoch(), + version: language2::proto::serialize_version(&buffer.version()), + }); + } + } + } + + if buffer_versions.is_empty() { + return Task::ready(Ok(())); + } + + let response = self.client.request(proto::RejoinChannelBuffers { + buffers: buffer_versions, + }); + + cx.spawn(|this, mut cx| async move { + let mut response = response.await?; + + this.update(&mut cx, |this, cx| { + this.opened_buffers.retain(|_, buffer| match buffer { + OpenedModelHandle::Open(channel_buffer) => { + let Some(channel_buffer) = channel_buffer.upgrade() else { + return false; + }; + + channel_buffer.update(cx, |channel_buffer, cx| { + let channel_id = channel_buffer.channel_id; + if let Some(remote_buffer) = response + .buffers + .iter_mut() + .find(|buffer| buffer.channel_id == channel_id) + { + let channel_id = channel_buffer.channel_id; + let remote_version = + language2::proto::deserialize_version(&remote_buffer.version); + + channel_buffer.replace_collaborators( + mem::take(&mut remote_buffer.collaborators), + cx, + ); + + let operations = channel_buffer + .buffer() + .update(cx, |buffer, cx| { + let outgoing_operations = + buffer.serialize_ops(Some(remote_version), cx); + let incoming_operations = + mem::take(&mut remote_buffer.operations) + .into_iter() + .map(language2::proto::deserialize_operation) + .collect::>>()?; + buffer.apply_ops(incoming_operations, cx)?; + anyhow::Ok(outgoing_operations) + }) + .log_err(); + + if let Some(operations) = operations { + let client = this.client.clone(); + cx.executor() + .spawn(async move { + let operations = operations.await; + for chunk in + language2::proto::split_operations(operations) + { + client + .send(proto::UpdateChannelBuffer { + channel_id, + operations: chunk, + }) + .ok(); + } + }) + .detach(); + return true; + } + } + + channel_buffer.disconnect(cx); + false + }) + } + OpenedModelHandle::Loading(_) => true, + }); + }) + .ok(); + anyhow::Ok(()) + }) + } + + fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext) { + cx.notify(); + + self.disconnect_channel_buffers_task.get_or_insert_with(|| { + cx.spawn(move |this, mut cx| async move { + if wait_for_reconnect { + cx.executor().timer(RECONNECT_TIMEOUT).await; + } + + if let Some(this) = this.upgrade() { + this.update(&mut cx, |this, cx| { + for (_, buffer) in this.opened_buffers.drain() { + if let OpenedModelHandle::Open(buffer) = buffer { + if let Some(buffer) = buffer.upgrade() { + buffer.update(cx, |buffer, cx| buffer.disconnect(cx)); + } + } + } + }) + .ok(); + } + }) + }); + } + + pub(crate) fn update_channels( + &mut self, + payload: proto::UpdateChannels, + cx: &mut ModelContext, + ) -> Option>> { + if !payload.remove_channel_invitations.is_empty() { + self.channel_invitations + .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id)); + } + for channel in payload.channel_invitations { + match self + .channel_invitations + .binary_search_by_key(&channel.id, |c| c.id) + { + Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name, + Err(ix) => self.channel_invitations.insert( + ix, + Arc::new(Channel { + id: channel.id, + visibility: channel.visibility(), + role: channel.role(), + name: channel.name, + unseen_note_version: None, + unseen_message_id: None, + parent_path: channel.parent_path, + }), + ), + } + } + + let channels_changed = !payload.channels.is_empty() + || !payload.delete_channels.is_empty() + || !payload.unseen_channel_messages.is_empty() + || !payload.unseen_channel_buffer_changes.is_empty(); + + if channels_changed { + if !payload.delete_channels.is_empty() { + self.channel_index.delete_channels(&payload.delete_channels); + self.channel_participants + .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id)); + + for channel_id in &payload.delete_channels { + let channel_id = *channel_id; + if payload + .channels + .iter() + .any(|channel| channel.id == channel_id) + { + continue; + } + if let Some(OpenedModelHandle::Open(buffer)) = + self.opened_buffers.remove(&channel_id) + { + if let Some(buffer) = buffer.upgrade() { + buffer.update(cx, ChannelBuffer::disconnect); + } + } + } + } + + let mut index = self.channel_index.bulk_insert(); + for channel in payload.channels { + let id = channel.id; + let channel_changed = index.insert(channel); + + if channel_changed { + if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) { + if let Some(buffer) = buffer.upgrade() { + buffer.update(cx, ChannelBuffer::channel_changed); + } + } + } + } + + for unseen_buffer_change in payload.unseen_channel_buffer_changes { + let version = language2::proto::deserialize_version(&unseen_buffer_change.version); + index.note_changed( + unseen_buffer_change.channel_id, + unseen_buffer_change.epoch, + &version, + ); + } + + for unseen_channel_message in payload.unseen_channel_messages { + index.new_messages( + unseen_channel_message.channel_id, + unseen_channel_message.message_id, + ); + } + } + + cx.notify(); + if payload.channel_participants.is_empty() { + return None; + } + + let mut all_user_ids = Vec::new(); + let channel_participants = payload.channel_participants; + for entry in &channel_participants { + for user_id in entry.participant_user_ids.iter() { + if let Err(ix) = all_user_ids.binary_search(user_id) { + all_user_ids.insert(ix, *user_id); + } + } + } + + let users = self + .user_store + .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx)); + Some(cx.spawn(|this, mut cx| async move { + let users = users.await?; + + this.update(&mut cx, |this, cx| { + for entry in &channel_participants { + let mut participants: Vec<_> = entry + .participant_user_ids + .iter() + .filter_map(|user_id| { + users + .binary_search_by_key(&user_id, |user| &user.id) + .ok() + .map(|ix| users[ix].clone()) + }) + .collect(); + + participants.sort_by_key(|u| u.id); + + this.channel_participants + .insert(entry.channel_id, participants); + } + + cx.notify(); + }) + })) + } +} diff --git a/crates/channel2/src/channel_store/channel_index.rs b/crates/channel2/src/channel_store/channel_index.rs new file mode 100644 index 0000000000..9b8abb0c95 --- /dev/null +++ b/crates/channel2/src/channel_store/channel_index.rs @@ -0,0 +1,184 @@ +use crate::{Channel, ChannelId}; +use collections::BTreeMap; +use rpc2::proto; +use std::sync::Arc; + +#[derive(Default, Debug)] +pub struct ChannelIndex { + channels_ordered: Vec, + channels_by_id: BTreeMap>, +} + +impl ChannelIndex { + pub fn by_id(&self) -> &BTreeMap> { + &self.channels_by_id + } + + pub fn ordered_channels(&self) -> &[ChannelId] { + &self.channels_ordered + } + + pub fn clear(&mut self) { + self.channels_ordered.clear(); + self.channels_by_id.clear(); + } + + /// Delete the given channels from this index. + pub fn delete_channels(&mut self, channels: &[ChannelId]) { + self.channels_by_id + .retain(|channel_id, _| !channels.contains(channel_id)); + self.channels_ordered + .retain(|channel_id| !channels.contains(channel_id)); + } + + pub fn bulk_insert(&mut self) -> ChannelPathsInsertGuard { + ChannelPathsInsertGuard { + channels_ordered: &mut self.channels_ordered, + channels_by_id: &mut self.channels_by_id, + } + } + + pub fn acknowledge_note_version( + &mut self, + channel_id: ChannelId, + epoch: u64, + version: &clock::Global, + ) { + if let Some(channel) = self.channels_by_id.get_mut(&channel_id) { + let channel = Arc::make_mut(channel); + if let Some((unseen_epoch, unseen_version)) = &channel.unseen_note_version { + if epoch > *unseen_epoch + || epoch == *unseen_epoch && version.observed_all(unseen_version) + { + channel.unseen_note_version = None; + } + } + } + } + + pub fn acknowledge_message_id(&mut self, channel_id: ChannelId, message_id: u64) { + if let Some(channel) = self.channels_by_id.get_mut(&channel_id) { + let channel = Arc::make_mut(channel); + if let Some(unseen_message_id) = channel.unseen_message_id { + if message_id >= unseen_message_id { + channel.unseen_message_id = None; + } + } + } + } + + pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) { + insert_note_changed(&mut self.channels_by_id, channel_id, epoch, version); + } + + pub fn new_message(&mut self, channel_id: ChannelId, message_id: u64) { + insert_new_message(&mut self.channels_by_id, channel_id, message_id) + } +} + +/// A guard for ensuring that the paths index maintains its sort and uniqueness +/// invariants after a series of insertions +#[derive(Debug)] +pub struct ChannelPathsInsertGuard<'a> { + channels_ordered: &'a mut Vec, + channels_by_id: &'a mut BTreeMap>, +} + +impl<'a> ChannelPathsInsertGuard<'a> { + pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) { + insert_note_changed(&mut self.channels_by_id, channel_id, epoch, &version); + } + + pub fn new_messages(&mut self, channel_id: ChannelId, message_id: u64) { + insert_new_message(&mut self.channels_by_id, channel_id, message_id) + } + + pub fn insert(&mut self, channel_proto: proto::Channel) -> bool { + let mut ret = false; + if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) { + let existing_channel = Arc::make_mut(existing_channel); + + ret = existing_channel.visibility != channel_proto.visibility() + || existing_channel.role != channel_proto.role() + || existing_channel.name != channel_proto.name; + + existing_channel.visibility = channel_proto.visibility(); + existing_channel.role = channel_proto.role(); + existing_channel.name = channel_proto.name; + } else { + self.channels_by_id.insert( + channel_proto.id, + Arc::new(Channel { + id: channel_proto.id, + visibility: channel_proto.visibility(), + role: channel_proto.role(), + name: channel_proto.name, + unseen_note_version: None, + unseen_message_id: None, + parent_path: channel_proto.parent_path, + }), + ); + self.insert_root(channel_proto.id); + } + ret + } + + fn insert_root(&mut self, channel_id: ChannelId) { + self.channels_ordered.push(channel_id); + } +} + +impl<'a> Drop for ChannelPathsInsertGuard<'a> { + fn drop(&mut self) { + self.channels_ordered.sort_by(|a, b| { + let a = channel_path_sorting_key(*a, &self.channels_by_id); + let b = channel_path_sorting_key(*b, &self.channels_by_id); + a.cmp(b) + }); + self.channels_ordered.dedup(); + } +} + +fn channel_path_sorting_key<'a>( + id: ChannelId, + channels_by_id: &'a BTreeMap>, +) -> impl Iterator { + let (parent_path, name) = channels_by_id + .get(&id) + .map_or((&[] as &[_], None), |channel| { + (channel.parent_path.as_slice(), Some(channel.name.as_str())) + }); + parent_path + .iter() + .filter_map(|id| Some(channels_by_id.get(id)?.name.as_str())) + .chain(name) +} + +fn insert_note_changed( + channels_by_id: &mut BTreeMap>, + channel_id: u64, + epoch: u64, + version: &clock::Global, +) { + if let Some(channel) = channels_by_id.get_mut(&channel_id) { + let unseen_version = Arc::make_mut(channel) + .unseen_note_version + .get_or_insert((0, clock::Global::new())); + if epoch > unseen_version.0 { + *unseen_version = (epoch, version.clone()); + } else { + unseen_version.1.join(&version); + } + } +} + +fn insert_new_message( + channels_by_id: &mut BTreeMap>, + channel_id: u64, + message_id: u64, +) { + if let Some(channel) = channels_by_id.get_mut(&channel_id) { + let unseen_message_id = Arc::make_mut(channel).unseen_message_id.get_or_insert(0); + *unseen_message_id = message_id.max(*unseen_message_id); + } +} diff --git a/crates/channel2/src/channel_store_tests.rs b/crates/channel2/src/channel_store_tests.rs new file mode 100644 index 0000000000..036c7015ac --- /dev/null +++ b/crates/channel2/src/channel_store_tests.rs @@ -0,0 +1,380 @@ +use crate::channel_chat::ChannelChatEvent; + +use super::*; +use client2::{test::FakeServer, Client, UserStore}; +use gpui2::{AppContext, Context, Model, TestAppContext}; +use rpc2::proto::{self}; +use settings2::SettingsStore; +use util::http::FakeHttpClient; + +#[gpui2::test] +fn test_update_channels(cx: &mut AppContext) { + let channel_store = init_test(cx); + + update_channels( + &channel_store, + proto::UpdateChannels { + channels: vec![ + proto::Channel { + id: 1, + name: "b".to_string(), + visibility: proto::ChannelVisibility::Members as i32, + role: proto::ChannelRole::Admin.into(), + parent_path: Vec::new(), + }, + proto::Channel { + id: 2, + name: "a".to_string(), + visibility: proto::ChannelVisibility::Members as i32, + role: proto::ChannelRole::Member.into(), + parent_path: Vec::new(), + }, + ], + ..Default::default() + }, + cx, + ); + assert_channels( + &channel_store, + &[ + // + (0, "a".to_string(), proto::ChannelRole::Member), + (0, "b".to_string(), proto::ChannelRole::Admin), + ], + cx, + ); + + update_channels( + &channel_store, + proto::UpdateChannels { + channels: vec![ + proto::Channel { + id: 3, + name: "x".to_string(), + visibility: proto::ChannelVisibility::Members as i32, + role: proto::ChannelRole::Admin.into(), + parent_path: vec![1], + }, + proto::Channel { + id: 4, + name: "y".to_string(), + visibility: proto::ChannelVisibility::Members as i32, + role: proto::ChannelRole::Member.into(), + parent_path: vec![2], + }, + ], + ..Default::default() + }, + cx, + ); + assert_channels( + &channel_store, + &[ + (0, "a".to_string(), proto::ChannelRole::Member), + (1, "y".to_string(), proto::ChannelRole::Member), + (0, "b".to_string(), proto::ChannelRole::Admin), + (1, "x".to_string(), proto::ChannelRole::Admin), + ], + cx, + ); +} + +#[gpui2::test] +fn test_dangling_channel_paths(cx: &mut AppContext) { + let channel_store = init_test(cx); + + update_channels( + &channel_store, + proto::UpdateChannels { + channels: vec![ + proto::Channel { + id: 0, + name: "a".to_string(), + visibility: proto::ChannelVisibility::Members as i32, + role: proto::ChannelRole::Admin.into(), + parent_path: vec![], + }, + proto::Channel { + id: 1, + name: "b".to_string(), + visibility: proto::ChannelVisibility::Members as i32, + role: proto::ChannelRole::Admin.into(), + parent_path: vec![0], + }, + proto::Channel { + id: 2, + name: "c".to_string(), + visibility: proto::ChannelVisibility::Members as i32, + role: proto::ChannelRole::Admin.into(), + parent_path: vec![0, 1], + }, + ], + ..Default::default() + }, + cx, + ); + // Sanity check + assert_channels( + &channel_store, + &[ + // + (0, "a".to_string(), proto::ChannelRole::Admin), + (1, "b".to_string(), proto::ChannelRole::Admin), + (2, "c".to_string(), proto::ChannelRole::Admin), + ], + cx, + ); + + update_channels( + &channel_store, + proto::UpdateChannels { + delete_channels: vec![1, 2], + ..Default::default() + }, + cx, + ); + + // Make sure that the 1/2/3 path is gone + assert_channels( + &channel_store, + &[(0, "a".to_string(), proto::ChannelRole::Admin)], + cx, + ); +} + +#[gpui2::test] +async fn test_channel_messages(cx: &mut TestAppContext) { + let user_id = 5; + let channel_id = 5; + let channel_store = cx.update(init_test); + let client = channel_store.update(cx, |s, _| s.client()); + let server = FakeServer::for_client(user_id, &client, cx).await; + + // Get the available channels. + server.send(proto::UpdateChannels { + channels: vec![proto::Channel { + id: channel_id, + name: "the-channel".to_string(), + visibility: proto::ChannelVisibility::Members as i32, + role: proto::ChannelRole::Member.into(), + parent_path: vec![], + }], + ..Default::default() + }); + cx.executor().run_until_parked(); + cx.update(|cx| { + assert_channels( + &channel_store, + &[(0, "the-channel".to_string(), proto::ChannelRole::Member)], + cx, + ); + }); + + let get_users = server.receive::().await.unwrap(); + assert_eq!(get_users.payload.user_ids, vec![5]); + server.respond( + get_users.receipt(), + proto::UsersResponse { + users: vec![proto::User { + id: 5, + github_login: "nathansobo".into(), + avatar_url: "http://avatar.com/nathansobo".into(), + }], + }, + ); + + // Join a channel and populate its existing messages. + let channel = channel_store.update(cx, |store, cx| { + let channel_id = store.ordered_channels().next().unwrap().1.id; + store.open_channel_chat(channel_id, cx) + }); + let join_channel = server.receive::().await.unwrap(); + server.respond( + join_channel.receipt(), + proto::JoinChannelChatResponse { + messages: vec![ + proto::ChannelMessage { + id: 10, + body: "a".into(), + timestamp: 1000, + sender_id: 5, + mentions: vec![], + nonce: Some(1.into()), + }, + proto::ChannelMessage { + id: 11, + body: "b".into(), + timestamp: 1001, + sender_id: 6, + mentions: vec![], + nonce: Some(2.into()), + }, + ], + done: false, + }, + ); + + cx.executor().start_waiting(); + + // Client requests all users for the received messages + let mut get_users = server.receive::().await.unwrap(); + get_users.payload.user_ids.sort(); + assert_eq!(get_users.payload.user_ids, vec![6]); + server.respond( + get_users.receipt(), + proto::UsersResponse { + users: vec![proto::User { + id: 6, + github_login: "maxbrunsfeld".into(), + avatar_url: "http://avatar.com/maxbrunsfeld".into(), + }], + }, + ); + + let channel = channel.await.unwrap(); + channel.update(cx, |channel, _| { + assert_eq!( + channel + .messages_in_range(0..2) + .map(|message| (message.sender.github_login.clone(), message.body.clone())) + .collect::>(), + &[ + ("nathansobo".into(), "a".into()), + ("maxbrunsfeld".into(), "b".into()) + ] + ); + }); + + // Receive a new message. + server.send(proto::ChannelMessageSent { + channel_id, + message: Some(proto::ChannelMessage { + id: 12, + body: "c".into(), + timestamp: 1002, + sender_id: 7, + mentions: vec![], + nonce: Some(3.into()), + }), + }); + + // Client requests user for message since they haven't seen them yet + let get_users = server.receive::().await.unwrap(); + assert_eq!(get_users.payload.user_ids, vec![7]); + server.respond( + get_users.receipt(), + proto::UsersResponse { + users: vec![proto::User { + id: 7, + github_login: "as-cii".into(), + avatar_url: "http://avatar.com/as-cii".into(), + }], + }, + ); + + assert_eq!( + channel.next_event(cx), + ChannelChatEvent::MessagesUpdated { + old_range: 2..2, + new_count: 1, + } + ); + channel.update(cx, |channel, _| { + assert_eq!( + channel + .messages_in_range(2..3) + .map(|message| (message.sender.github_login.clone(), message.body.clone())) + .collect::>(), + &[("as-cii".into(), "c".into())] + ) + }); + + // Scroll up to view older messages. + channel.update(cx, |channel, cx| { + channel.load_more_messages(cx).unwrap().detach(); + }); + let get_messages = server.receive::().await.unwrap(); + assert_eq!(get_messages.payload.channel_id, 5); + assert_eq!(get_messages.payload.before_message_id, 10); + server.respond( + get_messages.receipt(), + proto::GetChannelMessagesResponse { + done: true, + messages: vec![ + proto::ChannelMessage { + id: 8, + body: "y".into(), + timestamp: 998, + sender_id: 5, + nonce: Some(4.into()), + mentions: vec![], + }, + proto::ChannelMessage { + id: 9, + body: "z".into(), + timestamp: 999, + sender_id: 6, + nonce: Some(5.into()), + mentions: vec![], + }, + ], + }, + ); + + assert_eq!( + channel.next_event(cx), + ChannelChatEvent::MessagesUpdated { + old_range: 0..0, + new_count: 2, + } + ); + channel.update(cx, |channel, _| { + assert_eq!( + channel + .messages_in_range(0..2) + .map(|message| (message.sender.github_login.clone(), message.body.clone())) + .collect::>(), + &[ + ("nathansobo".into(), "y".into()), + ("maxbrunsfeld".into(), "z".into()) + ] + ); + }); +} + +fn init_test(cx: &mut AppContext) -> Model { + let http = FakeHttpClient::with_404_response(); + let client = Client::new(http.clone(), cx); + let user_store = cx.build_model(|cx| UserStore::new(client.clone(), http, cx)); + + let settings_store = SettingsStore::test(cx); + cx.set_global(settings_store); + client2::init(&client, cx); + crate::init(&client, user_store, cx); + + ChannelStore::global(cx) +} + +fn update_channels( + channel_store: &Model, + message: proto::UpdateChannels, + cx: &mut AppContext, +) { + let task = channel_store.update(cx, |store, cx| store.update_channels(message, cx)); + assert!(task.is_none()); +} + +#[track_caller] +fn assert_channels( + channel_store: &Model, + expected_channels: &[(usize, String, proto::ChannelRole)], + cx: &mut AppContext, +) { + let actual = channel_store.update(cx, |store, _| { + store + .ordered_channels() + .map(|(depth, channel)| (depth, channel.name.to_string(), channel.role)) + .collect::>() + }); + assert_eq!(actual, expected_channels); +} diff --git a/crates/client2/src/user.rs b/crates/client2/src/user.rs index baf3a19dad..8ff134e6b7 100644 --- a/crates/client2/src/user.rs +++ b/crates/client2/src/user.rs @@ -292,22 +292,18 @@ impl UserStore { .upgrade() .ok_or_else(|| anyhow!("can't upgrade user store handle"))?; for contact in message.contacts { - let should_notify = contact.should_notify; - updated_contacts.push(( - Arc::new(Contact::from_proto(contact, &this, &mut cx).await?), - should_notify, + updated_contacts.push(Arc::new( + Contact::from_proto(contact, &this, &mut cx).await?, )); } let mut incoming_requests = Vec::new(); for request in message.incoming_requests { incoming_requests.push({ - let user = this - .update(&mut cx, |this, cx| { - this.get_user(request.requester_id, cx) - })? - .await?; - (user, request.should_notify) + this.update(&mut cx, |this, cx| { + this.get_user(request.requester_id, cx) + })? + .await? }); } @@ -331,13 +327,7 @@ impl UserStore { this.contacts .retain(|contact| !removed_contacts.contains(&contact.user.id)); // Update existing contacts and insert new ones - for (updated_contact, should_notify) in updated_contacts { - if should_notify { - cx.emit(Event::Contact { - user: updated_contact.user.clone(), - kind: ContactEventKind::Accepted, - }); - } + for updated_contact in updated_contacts { match this.contacts.binary_search_by_key( &&updated_contact.user.github_login, |contact| &contact.user.github_login, @@ -360,14 +350,7 @@ impl UserStore { } }); // Update existing incoming requests and insert new ones - for (user, should_notify) in incoming_requests { - if should_notify { - cx.emit(Event::Contact { - user: user.clone(), - kind: ContactEventKind::Requested, - }); - } - + for user in incoming_requests { match this .incoming_contact_requests .binary_search_by_key(&&user.github_login, |contact| { diff --git a/crates/gpui2/src/app/test_context.rs b/crates/gpui2/src/app/test_context.rs index e731dccc6e..856fce75b4 100644 --- a/crates/gpui2/src/app/test_context.rs +++ b/crates/gpui2/src/app/test_context.rs @@ -189,3 +189,22 @@ impl TestAppContext { .unwrap(); } } + +impl Model { + pub fn next_event(&self, cx: &mut TestAppContext) -> T::Event + where + T::Event: Send + Clone, + { + let (tx, mut rx) = futures::channel::mpsc::unbounded(); + let _subscription = self.update(cx, |_, cx| { + cx.subscribe(self, move |_, _, event, _| { + tx.unbounded_send(event.clone()).ok(); + }) + }); + + cx.executor().run_until_parked(); + rx.try_next() + .expect("no event received") + .expect("model was dropped") + } +} diff --git a/crates/rpc2/proto/zed.proto b/crates/rpc2/proto/zed.proto index 3501e70e6a..206777879b 100644 --- a/crates/rpc2/proto/zed.proto +++ b/crates/rpc2/proto/zed.proto @@ -89,88 +89,96 @@ message Envelope { FormatBuffersResponse format_buffers_response = 70; GetCompletions get_completions = 71; GetCompletionsResponse get_completions_response = 72; - ApplyCompletionAdditionalEdits apply_completion_additional_edits = 73; - ApplyCompletionAdditionalEditsResponse apply_completion_additional_edits_response = 74; - GetCodeActions get_code_actions = 75; - GetCodeActionsResponse get_code_actions_response = 76; - GetHover get_hover = 77; - GetHoverResponse get_hover_response = 78; - ApplyCodeAction apply_code_action = 79; - ApplyCodeActionResponse apply_code_action_response = 80; - PrepareRename prepare_rename = 81; - PrepareRenameResponse prepare_rename_response = 82; - PerformRename perform_rename = 83; - PerformRenameResponse perform_rename_response = 84; - SearchProject search_project = 85; - SearchProjectResponse search_project_response = 86; + ResolveCompletionDocumentation resolve_completion_documentation = 73; + ResolveCompletionDocumentationResponse resolve_completion_documentation_response = 74; + ApplyCompletionAdditionalEdits apply_completion_additional_edits = 75; + ApplyCompletionAdditionalEditsResponse apply_completion_additional_edits_response = 76; + GetCodeActions get_code_actions = 77; + GetCodeActionsResponse get_code_actions_response = 78; + GetHover get_hover = 79; + GetHoverResponse get_hover_response = 80; + ApplyCodeAction apply_code_action = 81; + ApplyCodeActionResponse apply_code_action_response = 82; + PrepareRename prepare_rename = 83; + PrepareRenameResponse prepare_rename_response = 84; + PerformRename perform_rename = 85; + PerformRenameResponse perform_rename_response = 86; + SearchProject search_project = 87; + SearchProjectResponse search_project_response = 88; - UpdateContacts update_contacts = 87; - UpdateInviteInfo update_invite_info = 88; - ShowContacts show_contacts = 89; + UpdateContacts update_contacts = 89; + UpdateInviteInfo update_invite_info = 90; + ShowContacts show_contacts = 91; - GetUsers get_users = 90; - FuzzySearchUsers fuzzy_search_users = 91; - UsersResponse users_response = 92; - RequestContact request_contact = 93; - RespondToContactRequest respond_to_contact_request = 94; - RemoveContact remove_contact = 95; + GetUsers get_users = 92; + FuzzySearchUsers fuzzy_search_users = 93; + UsersResponse users_response = 94; + RequestContact request_contact = 95; + RespondToContactRequest respond_to_contact_request = 96; + RemoveContact remove_contact = 97; - Follow follow = 96; - FollowResponse follow_response = 97; - UpdateFollowers update_followers = 98; - Unfollow unfollow = 99; - GetPrivateUserInfo get_private_user_info = 100; - GetPrivateUserInfoResponse get_private_user_info_response = 101; - UpdateDiffBase update_diff_base = 102; + Follow follow = 98; + FollowResponse follow_response = 99; + UpdateFollowers update_followers = 100; + Unfollow unfollow = 101; + GetPrivateUserInfo get_private_user_info = 102; + GetPrivateUserInfoResponse get_private_user_info_response = 103; + UpdateDiffBase update_diff_base = 104; - OnTypeFormatting on_type_formatting = 103; - OnTypeFormattingResponse on_type_formatting_response = 104; + OnTypeFormatting on_type_formatting = 105; + OnTypeFormattingResponse on_type_formatting_response = 106; - UpdateWorktreeSettings update_worktree_settings = 105; + UpdateWorktreeSettings update_worktree_settings = 107; - InlayHints inlay_hints = 106; - InlayHintsResponse inlay_hints_response = 107; - ResolveInlayHint resolve_inlay_hint = 108; - ResolveInlayHintResponse resolve_inlay_hint_response = 109; - RefreshInlayHints refresh_inlay_hints = 110; + InlayHints inlay_hints = 108; + InlayHintsResponse inlay_hints_response = 109; + ResolveInlayHint resolve_inlay_hint = 110; + ResolveInlayHintResponse resolve_inlay_hint_response = 111; + RefreshInlayHints refresh_inlay_hints = 112; - CreateChannel create_channel = 111; - CreateChannelResponse create_channel_response = 112; - InviteChannelMember invite_channel_member = 113; - RemoveChannelMember remove_channel_member = 114; - RespondToChannelInvite respond_to_channel_invite = 115; - UpdateChannels update_channels = 116; - JoinChannel join_channel = 117; - DeleteChannel delete_channel = 118; - GetChannelMembers get_channel_members = 119; - GetChannelMembersResponse get_channel_members_response = 120; - SetChannelMemberAdmin set_channel_member_admin = 121; - RenameChannel rename_channel = 122; - RenameChannelResponse rename_channel_response = 123; + CreateChannel create_channel = 113; + CreateChannelResponse create_channel_response = 114; + InviteChannelMember invite_channel_member = 115; + RemoveChannelMember remove_channel_member = 116; + RespondToChannelInvite respond_to_channel_invite = 117; + UpdateChannels update_channels = 118; + JoinChannel join_channel = 119; + DeleteChannel delete_channel = 120; + GetChannelMembers get_channel_members = 121; + GetChannelMembersResponse get_channel_members_response = 122; + SetChannelMemberRole set_channel_member_role = 123; + RenameChannel rename_channel = 124; + RenameChannelResponse rename_channel_response = 125; - JoinChannelBuffer join_channel_buffer = 124; - JoinChannelBufferResponse join_channel_buffer_response = 125; - UpdateChannelBuffer update_channel_buffer = 126; - LeaveChannelBuffer leave_channel_buffer = 127; - UpdateChannelBufferCollaborators update_channel_buffer_collaborators = 128; - RejoinChannelBuffers rejoin_channel_buffers = 129; - RejoinChannelBuffersResponse rejoin_channel_buffers_response = 130; - AckBufferOperation ack_buffer_operation = 143; + JoinChannelBuffer join_channel_buffer = 126; + JoinChannelBufferResponse join_channel_buffer_response = 127; + UpdateChannelBuffer update_channel_buffer = 128; + LeaveChannelBuffer leave_channel_buffer = 129; + UpdateChannelBufferCollaborators update_channel_buffer_collaborators = 130; + RejoinChannelBuffers rejoin_channel_buffers = 131; + RejoinChannelBuffersResponse rejoin_channel_buffers_response = 132; + AckBufferOperation ack_buffer_operation = 133; - JoinChannelChat join_channel_chat = 131; - JoinChannelChatResponse join_channel_chat_response = 132; - LeaveChannelChat leave_channel_chat = 133; - SendChannelMessage send_channel_message = 134; - SendChannelMessageResponse send_channel_message_response = 135; - ChannelMessageSent channel_message_sent = 136; - GetChannelMessages get_channel_messages = 137; - GetChannelMessagesResponse get_channel_messages_response = 138; - RemoveChannelMessage remove_channel_message = 139; - AckChannelMessage ack_channel_message = 144; + JoinChannelChat join_channel_chat = 134; + JoinChannelChatResponse join_channel_chat_response = 135; + LeaveChannelChat leave_channel_chat = 136; + SendChannelMessage send_channel_message = 137; + SendChannelMessageResponse send_channel_message_response = 138; + ChannelMessageSent channel_message_sent = 139; + GetChannelMessages get_channel_messages = 140; + GetChannelMessagesResponse get_channel_messages_response = 141; + RemoveChannelMessage remove_channel_message = 142; + AckChannelMessage ack_channel_message = 143; + GetChannelMessagesById get_channel_messages_by_id = 144; - LinkChannel link_channel = 140; - UnlinkChannel unlink_channel = 141; - MoveChannel move_channel = 142; // current max: 144 + MoveChannel move_channel = 147; + SetChannelVisibility set_channel_visibility = 148; + + AddNotification add_notification = 149; + GetNotifications get_notifications = 150; + GetNotificationsResponse get_notifications_response = 151; + DeleteNotification delete_notification = 152; + MarkNotificationRead mark_notification_read = 153; // Current max } } @@ -332,6 +340,7 @@ message RoomUpdated { message LiveKitConnectionInfo { string server_url = 1; string token = 2; + bool can_publish = 3; } message ShareProject { @@ -832,6 +841,17 @@ message ResolveState { } } +message ResolveCompletionDocumentation { + uint64 project_id = 1; + uint64 language_server_id = 2; + bytes lsp_completion = 3; +} + +message ResolveCompletionDocumentationResponse { + string text = 1; + bool is_markdown = 2; +} + message ResolveInlayHint { uint64 project_id = 1; uint64 buffer_id = 2; @@ -950,13 +970,10 @@ message LspDiskBasedDiagnosticsUpdated {} message UpdateChannels { repeated Channel channels = 1; - repeated ChannelEdge insert_edge = 2; - repeated ChannelEdge delete_edge = 3; repeated uint64 delete_channels = 4; repeated Channel channel_invitations = 5; repeated uint64 remove_channel_invitations = 6; repeated ChannelParticipants channel_participants = 7; - repeated ChannelPermission channel_permissions = 8; repeated UnseenChannelMessage unseen_channel_messages = 9; repeated UnseenChannelBufferChange unseen_channel_buffer_changes = 10; } @@ -972,14 +989,9 @@ message UnseenChannelBufferChange { repeated VectorClockEntry version = 3; } -message ChannelEdge { - uint64 channel_id = 1; - uint64 parent_id = 2; -} - message ChannelPermission { uint64 channel_id = 1; - bool is_admin = 2; + ChannelRole role = 3; } message ChannelParticipants { @@ -1005,8 +1017,8 @@ message GetChannelMembersResponse { message ChannelMember { uint64 user_id = 1; - bool admin = 2; Kind kind = 3; + ChannelRole role = 4; enum Kind { Member = 0; @@ -1028,7 +1040,7 @@ message CreateChannelResponse { message InviteChannelMember { uint64 channel_id = 1; uint64 user_id = 2; - bool admin = 3; + ChannelRole role = 4; } message RemoveChannelMember { @@ -1036,10 +1048,22 @@ message RemoveChannelMember { uint64 user_id = 2; } -message SetChannelMemberAdmin { +enum ChannelRole { + Admin = 0; + Member = 1; + Guest = 2; + Banned = 3; +} + +message SetChannelMemberRole { uint64 channel_id = 1; uint64 user_id = 2; - bool admin = 3; + ChannelRole role = 3; +} + +message SetChannelVisibility { + uint64 channel_id = 1; + ChannelVisibility visibility = 2; } message RenameChannel { @@ -1068,6 +1092,7 @@ message SendChannelMessage { uint64 channel_id = 1; string body = 2; Nonce nonce = 3; + repeated ChatMention mentions = 4; } message RemoveChannelMessage { @@ -1099,20 +1124,13 @@ message GetChannelMessagesResponse { bool done = 2; } -message LinkChannel { - uint64 channel_id = 1; - uint64 to = 2; -} - -message UnlinkChannel { - uint64 channel_id = 1; - uint64 from = 2; +message GetChannelMessagesById { + repeated uint64 message_ids = 1; } message MoveChannel { uint64 channel_id = 1; - uint64 from = 2; - uint64 to = 3; + optional uint64 to = 2; } message JoinChannelBuffer { @@ -1125,6 +1143,12 @@ message ChannelMessage { uint64 timestamp = 3; uint64 sender_id = 4; Nonce nonce = 5; + repeated ChatMention mentions = 6; +} + +message ChatMention { + Range range = 1; + uint64 user_id = 2; } message RejoinChannelBuffers { @@ -1216,7 +1240,6 @@ message ShowContacts {} message IncomingContactRequest { uint64 requester_id = 1; - bool should_notify = 2; } message UpdateDiagnostics { @@ -1533,16 +1556,23 @@ message Nonce { uint64 lower_half = 2; } +enum ChannelVisibility { + Public = 0; + Members = 1; +} + message Channel { uint64 id = 1; string name = 2; + ChannelVisibility visibility = 3; + ChannelRole role = 4; + repeated uint64 parent_path = 5; } message Contact { uint64 user_id = 1; bool online = 2; bool busy = 3; - bool should_notify = 4; } message WorktreeMetadata { @@ -1557,3 +1587,34 @@ message UpdateDiffBase { uint64 buffer_id = 2; optional string diff_base = 3; } + +message GetNotifications { + optional uint64 before_id = 1; +} + +message AddNotification { + Notification notification = 1; +} + +message GetNotificationsResponse { + repeated Notification notifications = 1; + bool done = 2; +} + +message DeleteNotification { + uint64 notification_id = 1; +} + +message MarkNotificationRead { + uint64 notification_id = 1; +} + +message Notification { + uint64 id = 1; + uint64 timestamp = 2; + string kind = 3; + optional uint64 entity_id = 4; + string content = 5; + bool is_read = 6; + optional bool response = 7; +} diff --git a/crates/rpc2/src/proto.rs b/crates/rpc2/src/proto.rs index f0d7937f6f..77a69122c2 100644 --- a/crates/rpc2/src/proto.rs +++ b/crates/rpc2/src/proto.rs @@ -133,6 +133,9 @@ impl fmt::Display for PeerId { messages!( (Ack, Foreground), + (AckBufferOperation, Background), + (AckChannelMessage, Background), + (AddNotification, Foreground), (AddProjectCollaborator, Foreground), (ApplyCodeAction, Background), (ApplyCodeActionResponse, Background), @@ -143,57 +146,74 @@ messages!( (Call, Foreground), (CallCanceled, Foreground), (CancelCall, Foreground), + (ChannelMessageSent, Foreground), (CopyProjectEntry, Foreground), (CreateBufferForPeer, Foreground), (CreateChannel, Foreground), (CreateChannelResponse, Foreground), - (ChannelMessageSent, Foreground), (CreateProjectEntry, Foreground), (CreateRoom, Foreground), (CreateRoomResponse, Foreground), (DeclineCall, Foreground), + (DeleteChannel, Foreground), + (DeleteNotification, Foreground), (DeleteProjectEntry, Foreground), (Error, Foreground), (ExpandProjectEntry, Foreground), + (ExpandProjectEntryResponse, Foreground), (Follow, Foreground), (FollowResponse, Foreground), (FormatBuffers, Foreground), (FormatBuffersResponse, Foreground), (FuzzySearchUsers, Foreground), + (GetChannelMembers, Foreground), + (GetChannelMembersResponse, Foreground), + (GetChannelMessages, Background), + (GetChannelMessagesById, Background), + (GetChannelMessagesResponse, Background), (GetCodeActions, Background), (GetCodeActionsResponse, Background), - (GetHover, Background), - (GetHoverResponse, Background), - (GetChannelMessages, Background), - (GetChannelMessagesResponse, Background), - (SendChannelMessage, Background), - (SendChannelMessageResponse, Background), (GetCompletions, Background), (GetCompletionsResponse, Background), (GetDefinition, Background), (GetDefinitionResponse, Background), - (GetTypeDefinition, Background), - (GetTypeDefinitionResponse, Background), (GetDocumentHighlights, Background), (GetDocumentHighlightsResponse, Background), - (GetReferences, Background), - (GetReferencesResponse, Background), + (GetHover, Background), + (GetHoverResponse, Background), + (GetNotifications, Foreground), + (GetNotificationsResponse, Foreground), + (GetPrivateUserInfo, Foreground), + (GetPrivateUserInfoResponse, Foreground), (GetProjectSymbols, Background), (GetProjectSymbolsResponse, Background), + (GetReferences, Background), + (GetReferencesResponse, Background), + (GetTypeDefinition, Background), + (GetTypeDefinitionResponse, Background), (GetUsers, Foreground), (Hello, Foreground), (IncomingCall, Foreground), + (InlayHints, Background), + (InlayHintsResponse, Background), (InviteChannelMember, Foreground), - (UsersResponse, Foreground), + (JoinChannel, Foreground), + (JoinChannelBuffer, Foreground), + (JoinChannelBufferResponse, Foreground), + (JoinChannelChat, Foreground), + (JoinChannelChatResponse, Foreground), (JoinProject, Foreground), (JoinProjectResponse, Foreground), (JoinRoom, Foreground), (JoinRoomResponse, Foreground), - (JoinChannelChat, Foreground), - (JoinChannelChatResponse, Foreground), + (LeaveChannelBuffer, Background), (LeaveChannelChat, Foreground), (LeaveProject, Foreground), (LeaveRoom, Foreground), + (MarkNotificationRead, Foreground), + (MoveChannel, Foreground), + (OnTypeFormatting, Background), + (OnTypeFormattingResponse, Background), (OpenBufferById, Background), (OpenBufferByPath, Background), (OpenBufferForSymbol, Background), @@ -201,58 +221,56 @@ messages!( (OpenBufferResponse, Background), (PerformRename, Background), (PerformRenameResponse, Background), - (OnTypeFormatting, Background), - (OnTypeFormattingResponse, Background), - (InlayHints, Background), - (InlayHintsResponse, Background), - (ResolveInlayHint, Background), - (ResolveInlayHintResponse, Background), - (RefreshInlayHints, Foreground), (Ping, Foreground), (PrepareRename, Background), (PrepareRenameResponse, Background), - (ExpandProjectEntryResponse, Foreground), (ProjectEntryResponse, Foreground), + (RefreshInlayHints, Foreground), + (RejoinChannelBuffers, Foreground), + (RejoinChannelBuffersResponse, Foreground), (RejoinRoom, Foreground), (RejoinRoomResponse, Foreground), - (RemoveContact, Foreground), - (RemoveChannelMember, Foreground), - (RemoveChannelMessage, Foreground), (ReloadBuffers, Foreground), (ReloadBuffersResponse, Foreground), + (RemoveChannelMember, Foreground), + (RemoveChannelMessage, Foreground), + (RemoveContact, Foreground), (RemoveProjectCollaborator, Foreground), - (RenameProjectEntry, Foreground), - (RequestContact, Foreground), - (RespondToContactRequest, Foreground), - (RespondToChannelInvite, Foreground), - (JoinChannel, Foreground), - (RoomUpdated, Foreground), - (SaveBuffer, Foreground), (RenameChannel, Foreground), (RenameChannelResponse, Foreground), - (SetChannelMemberAdmin, Foreground), + (RenameProjectEntry, Foreground), + (RequestContact, Foreground), + (ResolveCompletionDocumentation, Background), + (ResolveCompletionDocumentationResponse, Background), + (ResolveInlayHint, Background), + (ResolveInlayHintResponse, Background), + (RespondToChannelInvite, Foreground), + (RespondToContactRequest, Foreground), + (RoomUpdated, Foreground), + (SaveBuffer, Foreground), + (SetChannelMemberRole, Foreground), + (SetChannelVisibility, Foreground), (SearchProject, Background), (SearchProjectResponse, Background), + (SendChannelMessage, Background), + (SendChannelMessageResponse, Background), (ShareProject, Foreground), (ShareProjectResponse, Foreground), (ShowContacts, Foreground), (StartLanguageServer, Foreground), (SynchronizeBuffers, Foreground), (SynchronizeBuffersResponse, Foreground), - (RejoinChannelBuffers, Foreground), - (RejoinChannelBuffersResponse, Foreground), (Test, Foreground), (Unfollow, Foreground), (UnshareProject, Foreground), (UpdateBuffer, Foreground), (UpdateBufferFile, Foreground), - (UpdateContacts, Foreground), - (DeleteChannel, Foreground), - (MoveChannel, Foreground), - (LinkChannel, Foreground), - (UnlinkChannel, Foreground), + (UpdateChannelBuffer, Foreground), + (UpdateChannelBufferCollaborators, Foreground), (UpdateChannels, Foreground), + (UpdateContacts, Foreground), (UpdateDiagnosticSummary, Foreground), + (UpdateDiffBase, Foreground), (UpdateFollowers, Foreground), (UpdateInviteInfo, Foreground), (UpdateLanguageServer, Foreground), @@ -261,18 +279,7 @@ messages!( (UpdateProjectCollaborator, Foreground), (UpdateWorktree, Foreground), (UpdateWorktreeSettings, Foreground), - (UpdateDiffBase, Foreground), - (GetPrivateUserInfo, Foreground), - (GetPrivateUserInfoResponse, Foreground), - (GetChannelMembers, Foreground), - (GetChannelMembersResponse, Foreground), - (JoinChannelBuffer, Foreground), - (JoinChannelBufferResponse, Foreground), - (LeaveChannelBuffer, Background), - (UpdateChannelBuffer, Foreground), - (UpdateChannelBufferCollaborators, Foreground), - (AckBufferOperation, Background), - (AckChannelMessage, Background), + (UsersResponse, Foreground), ); request_messages!( @@ -284,72 +291,78 @@ request_messages!( (Call, Ack), (CancelCall, Ack), (CopyProjectEntry, ProjectEntryResponse), + (CreateChannel, CreateChannelResponse), (CreateProjectEntry, ProjectEntryResponse), (CreateRoom, CreateRoomResponse), - (CreateChannel, CreateChannelResponse), (DeclineCall, Ack), + (DeleteChannel, Ack), (DeleteProjectEntry, ProjectEntryResponse), (ExpandProjectEntry, ExpandProjectEntryResponse), (Follow, FollowResponse), (FormatBuffers, FormatBuffersResponse), + (FuzzySearchUsers, UsersResponse), + (GetChannelMembers, GetChannelMembersResponse), + (GetChannelMessages, GetChannelMessagesResponse), + (GetChannelMessagesById, GetChannelMessagesResponse), (GetCodeActions, GetCodeActionsResponse), - (GetHover, GetHoverResponse), (GetCompletions, GetCompletionsResponse), (GetDefinition, GetDefinitionResponse), - (GetTypeDefinition, GetTypeDefinitionResponse), (GetDocumentHighlights, GetDocumentHighlightsResponse), - (GetReferences, GetReferencesResponse), + (GetHover, GetHoverResponse), + (GetNotifications, GetNotificationsResponse), (GetPrivateUserInfo, GetPrivateUserInfoResponse), (GetProjectSymbols, GetProjectSymbolsResponse), - (FuzzySearchUsers, UsersResponse), + (GetReferences, GetReferencesResponse), + (GetTypeDefinition, GetTypeDefinitionResponse), (GetUsers, UsersResponse), + (IncomingCall, Ack), + (InlayHints, InlayHintsResponse), (InviteChannelMember, Ack), + (JoinChannel, JoinRoomResponse), + (JoinChannelBuffer, JoinChannelBufferResponse), + (JoinChannelChat, JoinChannelChatResponse), (JoinProject, JoinProjectResponse), (JoinRoom, JoinRoomResponse), - (JoinChannelChat, JoinChannelChatResponse), + (LeaveChannelBuffer, Ack), (LeaveRoom, Ack), - (RejoinRoom, RejoinRoomResponse), - (IncomingCall, Ack), + (MarkNotificationRead, Ack), + (MoveChannel, Ack), + (OnTypeFormatting, OnTypeFormattingResponse), (OpenBufferById, OpenBufferResponse), (OpenBufferByPath, OpenBufferResponse), (OpenBufferForSymbol, OpenBufferForSymbolResponse), - (Ping, Ack), (PerformRename, PerformRenameResponse), + (Ping, Ack), (PrepareRename, PrepareRenameResponse), - (OnTypeFormatting, OnTypeFormattingResponse), - (InlayHints, InlayHintsResponse), - (ResolveInlayHint, ResolveInlayHintResponse), (RefreshInlayHints, Ack), + (RejoinChannelBuffers, RejoinChannelBuffersResponse), + (RejoinRoom, RejoinRoomResponse), (ReloadBuffers, ReloadBuffersResponse), - (RequestContact, Ack), (RemoveChannelMember, Ack), - (RemoveContact, Ack), - (RespondToContactRequest, Ack), - (RespondToChannelInvite, Ack), - (SetChannelMemberAdmin, Ack), - (SendChannelMessage, SendChannelMessageResponse), - (GetChannelMessages, GetChannelMessagesResponse), - (GetChannelMembers, GetChannelMembersResponse), - (JoinChannel, JoinRoomResponse), (RemoveChannelMessage, Ack), - (DeleteChannel, Ack), - (RenameProjectEntry, ProjectEntryResponse), + (RemoveContact, Ack), (RenameChannel, RenameChannelResponse), - (LinkChannel, Ack), - (UnlinkChannel, Ack), - (MoveChannel, Ack), + (RenameProjectEntry, ProjectEntryResponse), + (RequestContact, Ack), + ( + ResolveCompletionDocumentation, + ResolveCompletionDocumentationResponse + ), + (ResolveInlayHint, ResolveInlayHintResponse), + (RespondToChannelInvite, Ack), + (RespondToContactRequest, Ack), (SaveBuffer, BufferSaved), (SearchProject, SearchProjectResponse), + (SendChannelMessage, SendChannelMessageResponse), + (SetChannelMemberRole, Ack), + (SetChannelVisibility, Ack), (ShareProject, ShareProjectResponse), (SynchronizeBuffers, SynchronizeBuffersResponse), - (RejoinChannelBuffers, RejoinChannelBuffersResponse), (Test, Test), (UpdateBuffer, Ack), (UpdateParticipantLocation, Ack), (UpdateProject, Ack), (UpdateWorktree, Ack), - (JoinChannelBuffer, JoinChannelBufferResponse), - (LeaveChannelBuffer, Ack) ); entity_messages!( @@ -368,25 +381,26 @@ entity_messages!( GetCodeActions, GetCompletions, GetDefinition, - GetTypeDefinition, GetDocumentHighlights, GetHover, - GetReferences, GetProjectSymbols, + GetReferences, + GetTypeDefinition, + InlayHints, JoinProject, LeaveProject, + OnTypeFormatting, OpenBufferById, OpenBufferByPath, OpenBufferForSymbol, PerformRename, - OnTypeFormatting, - InlayHints, - ResolveInlayHint, - RefreshInlayHints, PrepareRename, + RefreshInlayHints, ReloadBuffers, RemoveProjectCollaborator, RenameProjectEntry, + ResolveCompletionDocumentation, + ResolveInlayHint, SaveBuffer, SearchProject, StartLanguageServer, @@ -395,19 +409,19 @@ entity_messages!( UpdateBuffer, UpdateBufferFile, UpdateDiagnosticSummary, + UpdateDiffBase, UpdateLanguageServer, UpdateProject, UpdateProjectCollaborator, UpdateWorktree, UpdateWorktreeSettings, - UpdateDiffBase ); entity_messages!( channel_id, ChannelMessageSent, - UpdateChannelBuffer, RemoveChannelMessage, + UpdateChannelBuffer, UpdateChannelBufferCollaborators, );