Restore channel chat model and panel view

This commit is contained in:
Max Brunsfeld 2023-09-07 11:16:51 -07:00
parent 98999b1e9a
commit 3422eb65e8
14 changed files with 1227 additions and 51 deletions

1
Cargo.lock generated
View File

@ -1553,6 +1553,7 @@ dependencies = [
"settings",
"theme",
"theme_selector",
"time 0.3.27",
"util",
"vcs_menu",
"workspace",

View File

@ -1,10 +1,13 @@
mod channel_buffer;
mod channel_chat;
mod channel_store;
pub mod channel_buffer;
use std::sync::Arc;
pub use channel_buffer::{ChannelBuffer, ChannelBufferEvent};
pub use channel_chat::{ChannelChat, ChannelChatEvent, ChannelMessage, ChannelMessageId};
pub use channel_store::{Channel, ChannelEvent, ChannelId, ChannelMembership, ChannelStore};
pub use channel_store::*;
use client::Client;
use std::sync::Arc;
#[cfg(test)]
mod channel_store_tests;

View File

@ -23,13 +23,13 @@ pub struct ChannelBuffer {
subscription: Option<client::Subscription>,
}
pub enum Event {
pub enum ChannelBufferEvent {
CollaboratorsChanged,
Disconnected,
}
impl Entity for ChannelBuffer {
type Event = Event;
type Event = ChannelBufferEvent;
fn release(&mut self, _: &mut AppContext) {
if self.connected {
@ -101,7 +101,7 @@ impl ChannelBuffer {
}
}
self.collaborators = collaborators;
cx.emit(Event::CollaboratorsChanged);
cx.emit(ChannelBufferEvent::CollaboratorsChanged);
cx.notify();
}
@ -141,7 +141,7 @@ impl ChannelBuffer {
this.update(&mut cx, |this, cx| {
this.collaborators.push(collaborator);
cx.emit(Event::CollaboratorsChanged);
cx.emit(ChannelBufferEvent::CollaboratorsChanged);
cx.notify();
});
@ -165,7 +165,7 @@ impl ChannelBuffer {
true
}
});
cx.emit(Event::CollaboratorsChanged);
cx.emit(ChannelBufferEvent::CollaboratorsChanged);
cx.notify();
});
@ -185,7 +185,7 @@ impl ChannelBuffer {
break;
}
}
cx.emit(Event::CollaboratorsChanged);
cx.emit(ChannelBufferEvent::CollaboratorsChanged);
cx.notify();
});
@ -230,7 +230,7 @@ impl ChannelBuffer {
if self.connected {
self.connected = false;
self.subscription.take();
cx.emit(Event::Disconnected);
cx.emit(ChannelBufferEvent::Disconnected);
cx.notify()
}
}

View File

@ -0,0 +1,459 @@
use crate::Channel;
use anyhow::{anyhow, Result};
use client::{
proto,
user::{User, UserStore},
Client, Subscription, TypedEnvelope,
};
use futures::lock::Mutex;
use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task};
use rand::prelude::*;
use std::{collections::HashSet, mem, ops::Range, sync::Arc};
use sum_tree::{Bias, SumTree};
use time::OffsetDateTime;
use util::{post_inc, ResultExt as _, TryFutureExt};
pub struct ChannelChat {
channel: Arc<Channel>,
messages: SumTree<ChannelMessage>,
loaded_all_messages: bool,
next_pending_message_id: usize,
user_store: ModelHandle<UserStore>,
rpc: Arc<Client>,
outgoing_messages_lock: Arc<Mutex<()>>,
rng: StdRng,
_subscription: Subscription,
}
#[derive(Clone, Debug)]
pub struct ChannelMessage {
pub id: ChannelMessageId,
pub body: String,
pub timestamp: OffsetDateTime,
pub sender: Arc<User>,
pub nonce: u128,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum ChannelMessageId {
Saved(u64),
Pending(usize),
}
#[derive(Clone, Debug, Default)]
pub struct ChannelMessageSummary {
max_id: ChannelMessageId,
count: usize,
}
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
struct Count(usize);
#[derive(Clone, Debug, PartialEq)]
pub enum ChannelChatEvent {
MessagesUpdated {
old_range: Range<usize>,
new_count: usize,
},
}
impl Entity for ChannelChat {
type Event = ChannelChatEvent;
fn release(&mut self, _: &mut AppContext) {
self.rpc
.send(proto::LeaveChannelChat {
channel_id: self.channel.id,
})
.log_err();
}
}
impl ChannelChat {
pub fn init(rpc: &Arc<Client>) {
rpc.add_model_message_handler(Self::handle_message_sent);
}
pub async fn new(
channel: Arc<Channel>,
user_store: ModelHandle<UserStore>,
client: Arc<Client>,
mut cx: AsyncAppContext,
) -> Result<ModelHandle<Self>> {
let channel_id = channel.id;
let subscription = client.subscribe_to_entity(channel_id).unwrap();
let response = client
.request(proto::JoinChannelChat { channel_id })
.await?;
let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
let loaded_all_messages = response.done;
Ok(cx.add_model(|cx| {
let mut this = Self {
channel,
user_store,
rpc: client,
outgoing_messages_lock: Default::default(),
messages: Default::default(),
loaded_all_messages,
next_pending_message_id: 0,
rng: StdRng::from_entropy(),
_subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()),
};
this.insert_messages(messages, cx);
this
}))
}
pub fn name(&self) -> &str {
&self.channel.name
}
pub fn send_message(
&mut self,
body: String,
cx: &mut ModelContext<Self>,
) -> Result<Task<Result<()>>> {
if body.is_empty() {
Err(anyhow!("message body can't be empty"))?;
}
let current_user = self
.user_store
.read(cx)
.current_user()
.ok_or_else(|| anyhow!("current_user is not present"))?;
let channel_id = self.channel.id;
let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
let nonce = self.rng.gen();
self.insert_messages(
SumTree::from_item(
ChannelMessage {
id: pending_id,
body: body.clone(),
sender: current_user,
timestamp: OffsetDateTime::now_utc(),
nonce,
},
&(),
),
cx,
);
let user_store = self.user_store.clone();
let rpc = self.rpc.clone();
let outgoing_messages_lock = self.outgoing_messages_lock.clone();
Ok(cx.spawn(|this, mut cx| async move {
let outgoing_message_guard = outgoing_messages_lock.lock().await;
let request = rpc.request(proto::SendChannelMessage {
channel_id,
body,
nonce: Some(nonce.into()),
});
let response = request.await?;
drop(outgoing_message_guard);
let message = ChannelMessage::from_proto(
response.message.ok_or_else(|| anyhow!("invalid message"))?,
&user_store,
&mut cx,
)
.await?;
this.update(&mut cx, |this, cx| {
this.insert_messages(SumTree::from_item(message, &()), cx);
Ok(())
})
}))
}
pub fn load_more_messages(&mut self, cx: &mut ModelContext<Self>) -> bool {
if !self.loaded_all_messages {
let rpc = self.rpc.clone();
let user_store = self.user_store.clone();
let channel_id = self.channel.id;
if let Some(before_message_id) =
self.messages.first().and_then(|message| match message.id {
ChannelMessageId::Saved(id) => Some(id),
ChannelMessageId::Pending(_) => None,
})
{
cx.spawn(|this, mut cx| {
async move {
let response = rpc
.request(proto::GetChannelMessages {
channel_id,
before_message_id,
})
.await?;
let loaded_all_messages = response.done;
let messages =
messages_from_proto(response.messages, &user_store, &mut cx).await?;
this.update(&mut cx, |this, cx| {
this.loaded_all_messages = loaded_all_messages;
this.insert_messages(messages, cx);
});
anyhow::Ok(())
}
.log_err()
})
.detach();
return true;
}
}
false
}
pub fn rejoin(&mut self, cx: &mut ModelContext<Self>) {
let user_store = self.user_store.clone();
let rpc = self.rpc.clone();
let channel_id = self.channel.id;
cx.spawn(|this, mut cx| {
async move {
let response = rpc.request(proto::JoinChannelChat { channel_id }).await?;
let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
let loaded_all_messages = response.done;
let pending_messages = this.update(&mut cx, |this, cx| {
if let Some((first_new_message, last_old_message)) =
messages.first().zip(this.messages.last())
{
if first_new_message.id > last_old_message.id {
let old_messages = mem::take(&mut this.messages);
cx.emit(ChannelChatEvent::MessagesUpdated {
old_range: 0..old_messages.summary().count,
new_count: 0,
});
this.loaded_all_messages = loaded_all_messages;
}
}
this.insert_messages(messages, cx);
if loaded_all_messages {
this.loaded_all_messages = loaded_all_messages;
}
this.pending_messages().cloned().collect::<Vec<_>>()
});
for pending_message in pending_messages {
let request = rpc.request(proto::SendChannelMessage {
channel_id,
body: pending_message.body,
nonce: Some(pending_message.nonce.into()),
});
let response = request.await?;
let message = ChannelMessage::from_proto(
response.message.ok_or_else(|| anyhow!("invalid message"))?,
&user_store,
&mut cx,
)
.await?;
this.update(&mut cx, |this, cx| {
this.insert_messages(SumTree::from_item(message, &()), cx);
});
}
anyhow::Ok(())
}
.log_err()
})
.detach();
}
pub fn message_count(&self) -> usize {
self.messages.summary().count
}
pub fn messages(&self) -> &SumTree<ChannelMessage> {
&self.messages
}
pub fn message(&self, ix: usize) -> &ChannelMessage {
let mut cursor = self.messages.cursor::<Count>();
cursor.seek(&Count(ix), Bias::Right, &());
cursor.item().unwrap()
}
pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
let mut cursor = self.messages.cursor::<Count>();
cursor.seek(&Count(range.start), Bias::Right, &());
cursor.take(range.len())
}
pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
let mut cursor = self.messages.cursor::<ChannelMessageId>();
cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
cursor
}
async fn handle_message_sent(
this: ModelHandle<Self>,
message: TypedEnvelope<proto::ChannelMessageSent>,
_: Arc<Client>,
mut cx: AsyncAppContext,
) -> Result<()> {
let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
let message = message
.payload
.message
.ok_or_else(|| anyhow!("empty message"))?;
let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
this.update(&mut cx, |this, cx| {
this.insert_messages(SumTree::from_item(message, &()), cx)
});
Ok(())
}
fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut ModelContext<Self>) {
if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
let nonces = messages
.cursor::<()>()
.map(|m| m.nonce)
.collect::<HashSet<_>>();
let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>();
let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
let start_ix = old_cursor.start().1 .0;
let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
let removed_count = removed_messages.summary().count;
let new_count = messages.summary().count;
let end_ix = start_ix + removed_count;
new_messages.append(messages, &());
let mut ranges = Vec::<Range<usize>>::new();
if new_messages.last().unwrap().is_pending() {
new_messages.append(old_cursor.suffix(&()), &());
} else {
new_messages.append(
old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
&(),
);
while let Some(message) = old_cursor.item() {
let message_ix = old_cursor.start().1 .0;
if nonces.contains(&message.nonce) {
if ranges.last().map_or(false, |r| r.end == message_ix) {
ranges.last_mut().unwrap().end += 1;
} else {
ranges.push(message_ix..message_ix + 1);
}
} else {
new_messages.push(message.clone(), &());
}
old_cursor.next(&());
}
}
drop(old_cursor);
self.messages = new_messages;
for range in ranges.into_iter().rev() {
cx.emit(ChannelChatEvent::MessagesUpdated {
old_range: range,
new_count: 0,
});
}
cx.emit(ChannelChatEvent::MessagesUpdated {
old_range: start_ix..end_ix,
new_count,
});
cx.notify();
}
}
}
async fn messages_from_proto(
proto_messages: Vec<proto::ChannelMessage>,
user_store: &ModelHandle<UserStore>,
cx: &mut AsyncAppContext,
) -> Result<SumTree<ChannelMessage>> {
let unique_user_ids = proto_messages
.iter()
.map(|m| m.sender_id)
.collect::<HashSet<_>>()
.into_iter()
.collect();
user_store
.update(cx, |user_store, cx| {
user_store.get_users(unique_user_ids, cx)
})
.await?;
let mut messages = Vec::with_capacity(proto_messages.len());
for message in proto_messages {
messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
}
let mut result = SumTree::new();
result.extend(messages, &());
Ok(result)
}
impl ChannelMessage {
pub async fn from_proto(
message: proto::ChannelMessage,
user_store: &ModelHandle<UserStore>,
cx: &mut AsyncAppContext,
) -> Result<Self> {
let sender = user_store
.update(cx, |user_store, cx| {
user_store.get_user(message.sender_id, cx)
})
.await?;
Ok(ChannelMessage {
id: ChannelMessageId::Saved(message.id),
body: message.body,
timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
sender,
nonce: message
.nonce
.ok_or_else(|| anyhow!("nonce is required"))?
.into(),
})
}
pub fn is_pending(&self) -> bool {
matches!(self.id, ChannelMessageId::Pending(_))
}
}
impl sum_tree::Item for ChannelMessage {
type Summary = ChannelMessageSummary;
fn summary(&self) -> Self::Summary {
ChannelMessageSummary {
max_id: self.id,
count: 1,
}
}
}
impl Default for ChannelMessageId {
fn default() -> Self {
Self::Saved(0)
}
}
impl sum_tree::Summary for ChannelMessageSummary {
type Context = ();
fn add_summary(&mut self, summary: &Self, _: &()) {
self.max_id = summary.max_id;
self.count += summary.count;
}
}
impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
debug_assert!(summary.max_id > *self);
*self = summary.max_id;
}
}
impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
self.0 += summary.count;
}
}

View File

@ -1,4 +1,4 @@
use crate::channel_buffer::ChannelBuffer;
use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
use anyhow::{anyhow, Result};
use client::{Client, Subscription, User, UserId, UserStore};
use collections::{hash_map, HashMap, HashSet};
@ -20,7 +20,8 @@ pub struct ChannelStore {
channels_with_admin_privileges: HashSet<ChannelId>,
outgoing_invites: HashSet<(ChannelId, UserId)>,
update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
opened_buffers: HashMap<ChannelId, OpenedChannelBuffer>,
opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
client: Arc<Client>,
user_store: ModelHandle<UserStore>,
_rpc_subscription: Subscription,
@ -50,15 +51,9 @@ impl Entity for ChannelStore {
type Event = ChannelEvent;
}
pub enum ChannelMemberStatus {
Invited,
Member,
NotMember,
}
enum OpenedChannelBuffer {
Open(WeakModelHandle<ChannelBuffer>),
Loading(Shared<Task<Result<ModelHandle<ChannelBuffer>, Arc<anyhow::Error>>>>),
enum OpenedModelHandle<E: Entity> {
Open(WeakModelHandle<E>),
Loading(Shared<Task<Result<ModelHandle<E>, Arc<anyhow::Error>>>>),
}
impl ChannelStore {
@ -94,6 +89,7 @@ impl ChannelStore {
channels_with_admin_privileges: Default::default(),
outgoing_invites: Default::default(),
opened_buffers: Default::default(),
opened_chats: Default::default(),
update_channels_tx,
client,
user_store,
@ -154,7 +150,7 @@ impl ChannelStore {
pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
if let Some(buffer) = self.opened_buffers.get(&channel_id) {
if let OpenedChannelBuffer::Open(buffer) = buffer {
if let OpenedModelHandle::Open(buffer) = buffer {
return buffer.upgrade(cx).is_some();
}
}
@ -166,24 +162,58 @@ impl ChannelStore {
channel_id: ChannelId,
cx: &mut ModelContext<Self>,
) -> Task<Result<ModelHandle<ChannelBuffer>>> {
// Make sure that a given channel buffer is only opened once per
let client = self.client.clone();
self.open_channel_resource(
channel_id,
|this| &mut this.opened_buffers,
|channel, cx| ChannelBuffer::new(channel, client, cx),
cx,
)
}
pub fn open_channel_chat(
&mut self,
channel_id: ChannelId,
cx: &mut ModelContext<Self>,
) -> Task<Result<ModelHandle<ChannelChat>>> {
let client = self.client.clone();
let user_store = self.user_store.clone();
self.open_channel_resource(
channel_id,
|this| &mut this.opened_chats,
|channel, cx| ChannelChat::new(channel, user_store, client, cx),
cx,
)
}
fn open_channel_resource<T: Entity, F, Fut>(
&mut self,
channel_id: ChannelId,
map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
load: F,
cx: &mut ModelContext<Self>,
) -> Task<Result<ModelHandle<T>>>
where
F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
Fut: Future<Output = Result<ModelHandle<T>>>,
{
// Make sure that a given channel resource is only opened once per
// app instance, even if this method is called multiple times
// with the same channel id while the first task is still running.
let task = loop {
match self.opened_buffers.entry(channel_id) {
match map(self).entry(channel_id) {
hash_map::Entry::Occupied(e) => match e.get() {
OpenedChannelBuffer::Open(buffer) => {
OpenedModelHandle::Open(buffer) => {
if let Some(buffer) = buffer.upgrade(cx) {
break Task::ready(Ok(buffer)).shared();
} else {
self.opened_buffers.remove(&channel_id);
map(self).remove(&channel_id);
continue;
}
}
OpenedChannelBuffer::Loading(task) => break task.clone(),
OpenedModelHandle::Loading(task) => break task.clone(),
},
hash_map::Entry::Vacant(e) => {
let client = self.client.clone();
let task = cx
.spawn(|this, cx| async move {
let channel = this.read_with(&cx, |this, _| {
@ -192,12 +222,10 @@ impl ChannelStore {
})
})?;
ChannelBuffer::new(channel, client, cx)
.await
.map_err(Arc::new)
load(channel, cx).await.map_err(Arc::new)
})
.shared();
e.insert(OpenedChannelBuffer::Loading(task.clone()));
e.insert(OpenedModelHandle::Loading(task.clone()));
cx.spawn({
let task = task.clone();
|this, mut cx| async move {
@ -208,14 +236,14 @@ impl ChannelStore {
this.opened_buffers.remove(&channel_id);
})
.detach();
this.opened_buffers.insert(
map(this).insert(
channel_id,
OpenedChannelBuffer::Open(buffer.downgrade()),
OpenedModelHandle::Open(buffer.downgrade()),
);
}
Err(error) => {
log::error!("failed to open channel buffer {error:?}");
this.opened_buffers.remove(&channel_id);
map(this).remove(&channel_id);
}
});
}
@ -496,7 +524,7 @@ impl ChannelStore {
let mut buffer_versions = Vec::new();
for buffer in self.opened_buffers.values() {
if let OpenedChannelBuffer::Open(buffer) = buffer {
if let OpenedModelHandle::Open(buffer) = buffer {
if let Some(buffer) = buffer.upgrade(cx) {
let channel_buffer = buffer.read(cx);
let buffer = channel_buffer.buffer().read(cx);
@ -522,7 +550,7 @@ impl ChannelStore {
this.update(&mut cx, |this, cx| {
this.opened_buffers.retain(|_, buffer| match buffer {
OpenedChannelBuffer::Open(channel_buffer) => {
OpenedModelHandle::Open(channel_buffer) => {
let Some(channel_buffer) = channel_buffer.upgrade(cx) else {
return false;
};
@ -583,7 +611,7 @@ impl ChannelStore {
false
})
}
OpenedChannelBuffer::Loading(_) => true,
OpenedModelHandle::Loading(_) => true,
});
});
anyhow::Ok(())
@ -605,7 +633,7 @@ impl ChannelStore {
if let Some(this) = this.upgrade(&cx) {
this.update(&mut cx, |this, cx| {
for (_, buffer) in this.opened_buffers.drain() {
if let OpenedChannelBuffer::Open(buffer) = buffer {
if let OpenedModelHandle::Open(buffer) = buffer {
if let Some(buffer) = buffer.upgrade(cx) {
buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
}
@ -654,7 +682,7 @@ impl ChannelStore {
for channel_id in &payload.remove_channels {
let channel_id = *channel_id;
if let Some(OpenedChannelBuffer::Open(buffer)) =
if let Some(OpenedModelHandle::Open(buffer)) =
self.opened_buffers.remove(&channel_id)
{
if let Some(buffer) = buffer.upgrade(cx) {

View File

@ -1,6 +1,8 @@
use crate::channel_chat::ChannelChatEvent;
use super::*;
use client::{Client, UserStore};
use gpui::{AppContext, ModelHandle};
use client::{test::FakeServer, Client, UserStore};
use gpui::{AppContext, ModelHandle, TestAppContext};
use rpc::proto;
use util::http::FakeHttpClient;
@ -137,6 +139,220 @@ fn test_dangling_channel_paths(cx: &mut AppContext) {
assert_channels(&channel_store, &[(0, "a".to_string(), true)], cx);
}
#[gpui::test]
async fn test_channel_messages(cx: &mut TestAppContext) {
cx.foreground().forbid_parking();
let user_id = 5;
let http_client = FakeHttpClient::with_404_response();
let client = cx.update(|cx| Client::new(http_client.clone(), cx));
let server = FakeServer::for_client(user_id, &client, cx).await;
let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
crate::init(&client);
let channel_store = cx.add_model(|cx| ChannelStore::new(client, user_store, cx));
let channel_id = 5;
// Get the available channels.
server.send(proto::UpdateChannels {
channels: vec![proto::Channel {
id: channel_id,
name: "the-channel".to_string(),
parent_id: None,
}],
..Default::default()
});
channel_store.next_notification(cx).await;
cx.read(|cx| {
assert_channels(&channel_store, &[(0, "the-channel".to_string(), false)], cx);
});
let get_users = server.receive::<proto::GetUsers>().await.unwrap();
assert_eq!(get_users.payload.user_ids, vec![5]);
server
.respond(
get_users.receipt(),
proto::UsersResponse {
users: vec![proto::User {
id: 5,
github_login: "nathansobo".into(),
avatar_url: "http://avatar.com/nathansobo".into(),
}],
},
)
.await;
// Join a channel and populate its existing messages.
let channel = channel_store
.update(cx, |store, cx| {
let channel_id = store.channels().next().unwrap().1.id;
store.open_channel_chat(channel_id, cx)
})
.await
.unwrap();
channel.read_with(cx, |channel, _| assert!(channel.messages().is_empty()));
let join_channel = server.receive::<proto::JoinChannelChat>().await.unwrap();
server
.respond(
join_channel.receipt(),
proto::JoinChannelChatResponse {
messages: vec![
proto::ChannelMessage {
id: 10,
body: "a".into(),
timestamp: 1000,
sender_id: 5,
nonce: Some(1.into()),
},
proto::ChannelMessage {
id: 11,
body: "b".into(),
timestamp: 1001,
sender_id: 6,
nonce: Some(2.into()),
},
],
done: false,
},
)
.await;
// Client requests all users for the received messages
let mut get_users = server.receive::<proto::GetUsers>().await.unwrap();
get_users.payload.user_ids.sort();
assert_eq!(get_users.payload.user_ids, vec![6]);
server
.respond(
get_users.receipt(),
proto::UsersResponse {
users: vec![proto::User {
id: 6,
github_login: "maxbrunsfeld".into(),
avatar_url: "http://avatar.com/maxbrunsfeld".into(),
}],
},
)
.await;
assert_eq!(
channel.next_event(cx).await,
ChannelChatEvent::MessagesUpdated {
old_range: 0..0,
new_count: 2,
}
);
channel.read_with(cx, |channel, _| {
assert_eq!(
channel
.messages_in_range(0..2)
.map(|message| (message.sender.github_login.clone(), message.body.clone()))
.collect::<Vec<_>>(),
&[
("nathansobo".into(), "a".into()),
("maxbrunsfeld".into(), "b".into())
]
);
});
// Receive a new message.
server.send(proto::ChannelMessageSent {
channel_id,
message: Some(proto::ChannelMessage {
id: 12,
body: "c".into(),
timestamp: 1002,
sender_id: 7,
nonce: Some(3.into()),
}),
});
// Client requests user for message since they haven't seen them yet
let get_users = server.receive::<proto::GetUsers>().await.unwrap();
assert_eq!(get_users.payload.user_ids, vec![7]);
server
.respond(
get_users.receipt(),
proto::UsersResponse {
users: vec![proto::User {
id: 7,
github_login: "as-cii".into(),
avatar_url: "http://avatar.com/as-cii".into(),
}],
},
)
.await;
assert_eq!(
channel.next_event(cx).await,
ChannelChatEvent::MessagesUpdated {
old_range: 2..2,
new_count: 1,
}
);
channel.read_with(cx, |channel, _| {
assert_eq!(
channel
.messages_in_range(2..3)
.map(|message| (message.sender.github_login.clone(), message.body.clone()))
.collect::<Vec<_>>(),
&[("as-cii".into(), "c".into())]
)
});
// Scroll up to view older messages.
channel.update(cx, |channel, cx| {
assert!(channel.load_more_messages(cx));
});
let get_messages = server.receive::<proto::GetChannelMessages>().await.unwrap();
assert_eq!(get_messages.payload.channel_id, 5);
assert_eq!(get_messages.payload.before_message_id, 10);
server
.respond(
get_messages.receipt(),
proto::GetChannelMessagesResponse {
done: true,
messages: vec![
proto::ChannelMessage {
id: 8,
body: "y".into(),
timestamp: 998,
sender_id: 5,
nonce: Some(4.into()),
},
proto::ChannelMessage {
id: 9,
body: "z".into(),
timestamp: 999,
sender_id: 6,
nonce: Some(5.into()),
},
],
},
)
.await;
assert_eq!(
channel.next_event(cx).await,
ChannelChatEvent::MessagesUpdated {
old_range: 0..0,
new_count: 2,
}
);
channel.read_with(cx, |channel, _| {
assert_eq!(
channel
.messages_in_range(0..2)
.map(|message| (message.sender.github_login.clone(), message.body.clone()))
.collect::<Vec<_>>(),
&[
("nathansobo".into(), "y".into()),
("maxbrunsfeld".into(), "z".into())
]
);
});
}
fn update_channels(
channel_store: &ModelHandle<ChannelStore>,
message: proto::UpdateChannels,

View File

@ -6,7 +6,7 @@ use crate::{
};
use anyhow::anyhow;
use call::ActiveCall;
use channel::{channel_buffer::ChannelBuffer, ChannelStore};
use channel::{ChannelBuffer, ChannelStore};
use client::{
self, proto::PeerId, Client, Connection, Credentials, EstablishConnectionError, UserStore,
};

View File

@ -55,6 +55,7 @@ schemars.workspace = true
postage.workspace = true
serde.workspace = true
serde_derive.workspace = true
time.workspace = true
[dev-dependencies]
call = { path = "../call", features = ["test-support"] }

View File

@ -1,8 +1,5 @@
use anyhow::{anyhow, Result};
use channel::{
channel_buffer::{self, ChannelBuffer},
ChannelId,
};
use channel::{ChannelBuffer, ChannelBufferEvent, ChannelId};
use client::proto;
use clock::ReplicaId;
use collections::HashMap;
@ -118,14 +115,14 @@ impl ChannelView {
fn handle_channel_buffer_event(
&mut self,
_: ModelHandle<ChannelBuffer>,
event: &channel_buffer::Event,
event: &ChannelBufferEvent,
cx: &mut ViewContext<Self>,
) {
match event {
channel_buffer::Event::CollaboratorsChanged => {
ChannelBufferEvent::CollaboratorsChanged => {
self.refresh_replica_id_map(cx);
}
channel_buffer::Event::Disconnected => self.editor.update(cx, |editor, cx| {
ChannelBufferEvent::Disconnected => self.editor.update(cx, |editor, cx| {
editor.set_read_only(true);
cx.notify();
}),

View File

@ -0,0 +1,391 @@
use channel::{ChannelChat, ChannelChatEvent, ChannelMessage, ChannelStore};
use client::Client;
use editor::Editor;
use gpui::{
actions,
elements::*,
platform::{CursorStyle, MouseButton},
views::{ItemType, Select, SelectStyle},
AnyViewHandle, AppContext, Entity, ModelHandle, Subscription, View, ViewContext, ViewHandle,
};
use language::language_settings::SoftWrap;
use menu::Confirm;
use std::sync::Arc;
use theme::Theme;
use time::{OffsetDateTime, UtcOffset};
use util::{ResultExt, TryFutureExt};
const MESSAGE_LOADING_THRESHOLD: usize = 50;
pub struct ChatPanel {
client: Arc<Client>,
channel_store: ModelHandle<ChannelStore>,
active_channel: Option<(ModelHandle<ChannelChat>, Subscription)>,
message_list: ListState<ChatPanel>,
input_editor: ViewHandle<Editor>,
channel_select: ViewHandle<Select>,
local_timezone: UtcOffset,
}
pub enum Event {}
actions!(chat_panel, [LoadMoreMessages]);
pub fn init(cx: &mut AppContext) {
cx.add_action(ChatPanel::send);
cx.add_action(ChatPanel::load_more_messages);
}
impl ChatPanel {
pub fn new(
rpc: Arc<Client>,
channel_list: ModelHandle<ChannelStore>,
cx: &mut ViewContext<Self>,
) -> Self {
let input_editor = cx.add_view(|cx| {
let mut editor = Editor::auto_height(
4,
Some(Arc::new(|theme| theme.chat_panel.input_editor.clone())),
cx,
);
editor.set_soft_wrap_mode(SoftWrap::EditorWidth, cx);
editor
});
let channel_select = cx.add_view(|cx| {
let channel_list = channel_list.clone();
Select::new(0, cx, {
move |ix, item_type, is_hovered, cx| {
Self::render_channel_name(
&channel_list,
ix,
item_type,
is_hovered,
&theme::current(cx).chat_panel.channel_select,
cx,
)
}
})
.with_style(move |cx| {
let style = &theme::current(cx).chat_panel.channel_select;
SelectStyle {
header: style.header.container,
menu: style.menu,
}
})
});
let mut message_list =
ListState::<Self>::new(0, Orientation::Bottom, 1000., move |this, ix, cx| {
let message = this.active_channel.as_ref().unwrap().0.read(cx).message(ix);
this.render_message(message, cx)
});
message_list.set_scroll_handler(|visible_range, this, cx| {
if visible_range.start < MESSAGE_LOADING_THRESHOLD {
this.load_more_messages(&LoadMoreMessages, cx);
}
});
let mut this = Self {
client: rpc,
channel_store: channel_list,
active_channel: Default::default(),
message_list,
input_editor,
channel_select,
local_timezone: cx.platform().local_timezone(),
};
this.init_active_channel(cx);
cx.observe(&this.channel_store, |this, _, cx| {
this.init_active_channel(cx);
})
.detach();
cx.observe(&this.channel_select, |this, channel_select, cx| {
let selected_ix = channel_select.read(cx).selected_index();
let selected_channel_id = this
.channel_store
.read(cx)
.channel_at_index(selected_ix)
.map(|e| e.1.id);
if let Some(selected_channel_id) = selected_channel_id {
let open_chat = this.channel_store.update(cx, |store, cx| {
store.open_channel_chat(selected_channel_id, cx)
});
cx.spawn(|this, mut cx| async move {
let chat = open_chat.await?;
this.update(&mut cx, |this, cx| {
this.set_active_channel(chat, cx);
})
})
.detach_and_log_err(cx);
}
})
.detach();
this
}
fn init_active_channel(&mut self, cx: &mut ViewContext<Self>) {
let channel_count = self.channel_store.read(cx).channel_count();
self.message_list.reset(0);
self.active_channel = None;
self.channel_select.update(cx, |select, cx| {
select.set_item_count(channel_count, cx);
});
}
fn set_active_channel(
&mut self,
channel: ModelHandle<ChannelChat>,
cx: &mut ViewContext<Self>,
) {
if self.active_channel.as_ref().map(|e| &e.0) != Some(&channel) {
{
let channel = channel.read(cx);
self.message_list.reset(channel.message_count());
let placeholder = format!("Message #{}", channel.name());
self.input_editor.update(cx, move |editor, cx| {
editor.set_placeholder_text(placeholder, cx);
});
}
let subscription = cx.subscribe(&channel, Self::channel_did_change);
self.active_channel = Some((channel, subscription));
}
}
fn channel_did_change(
&mut self,
_: ModelHandle<ChannelChat>,
event: &ChannelChatEvent,
cx: &mut ViewContext<Self>,
) {
match event {
ChannelChatEvent::MessagesUpdated {
old_range,
new_count,
} => {
self.message_list.splice(old_range.clone(), *new_count);
}
}
cx.notify();
}
fn render_channel(&self, cx: &mut ViewContext<Self>) -> AnyElement<Self> {
let theme = theme::current(cx);
Flex::column()
.with_child(
ChildView::new(&self.channel_select, cx)
.contained()
.with_style(theme.chat_panel.channel_select.container),
)
.with_child(self.render_active_channel_messages())
.with_child(self.render_input_box(&theme, cx))
.into_any()
}
fn render_active_channel_messages(&self) -> AnyElement<Self> {
let messages = if self.active_channel.is_some() {
List::new(self.message_list.clone()).into_any()
} else {
Empty::new().into_any()
};
messages.flex(1., true).into_any()
}
fn render_message(&self, message: &ChannelMessage, cx: &AppContext) -> AnyElement<Self> {
let now = OffsetDateTime::now_utc();
let theme = theme::current(cx);
let theme = if message.is_pending() {
&theme.chat_panel.pending_message
} else {
&theme.chat_panel.message
};
Flex::column()
.with_child(
Flex::row()
.with_child(
Label::new(
message.sender.github_login.clone(),
theme.sender.text.clone(),
)
.contained()
.with_style(theme.sender.container),
)
.with_child(
Label::new(
format_timestamp(message.timestamp, now, self.local_timezone),
theme.timestamp.text.clone(),
)
.contained()
.with_style(theme.timestamp.container),
),
)
.with_child(Text::new(message.body.clone(), theme.body.clone()))
.contained()
.with_style(theme.container)
.into_any()
}
fn render_input_box(&self, theme: &Arc<Theme>, cx: &AppContext) -> AnyElement<Self> {
ChildView::new(&self.input_editor, cx)
.contained()
.with_style(theme.chat_panel.input_editor.container)
.into_any()
}
fn render_channel_name(
channel_store: &ModelHandle<ChannelStore>,
ix: usize,
item_type: ItemType,
is_hovered: bool,
theme: &theme::ChannelSelect,
cx: &AppContext,
) -> AnyElement<Select> {
let channel = &channel_store.read(cx).channel_at_index(ix).unwrap().1;
let theme = match (item_type, is_hovered) {
(ItemType::Header, _) => &theme.header,
(ItemType::Selected, false) => &theme.active_item,
(ItemType::Selected, true) => &theme.hovered_active_item,
(ItemType::Unselected, false) => &theme.item,
(ItemType::Unselected, true) => &theme.hovered_item,
};
Flex::row()
.with_child(
Label::new("#".to_string(), theme.hash.text.clone())
.contained()
.with_style(theme.hash.container),
)
.with_child(Label::new(channel.name.clone(), theme.name.clone()))
.contained()
.with_style(theme.container)
.into_any()
}
fn render_sign_in_prompt(
&self,
theme: &Arc<Theme>,
cx: &mut ViewContext<Self>,
) -> AnyElement<Self> {
enum SignInPromptLabel {}
MouseEventHandler::new::<SignInPromptLabel, _>(0, cx, |mouse_state, _| {
Label::new(
"Sign in to use chat".to_string(),
theme
.chat_panel
.sign_in_prompt
.style_for(mouse_state)
.clone(),
)
})
.with_cursor_style(CursorStyle::PointingHand)
.on_click(MouseButton::Left, move |_, this, cx| {
let client = this.client.clone();
cx.spawn(|this, mut cx| async move {
if client
.authenticate_and_connect(true, &cx)
.log_err()
.await
.is_some()
{
this.update(&mut cx, |this, cx| {
if cx.handle().is_focused(cx) {
cx.focus(&this.input_editor);
}
})
.ok();
}
})
.detach();
})
.aligned()
.into_any()
}
fn send(&mut self, _: &Confirm, cx: &mut ViewContext<Self>) {
if let Some((channel, _)) = self.active_channel.as_ref() {
let body = self.input_editor.update(cx, |editor, cx| {
let body = editor.text(cx);
editor.clear(cx);
body
});
if let Some(task) = channel
.update(cx, |channel, cx| channel.send_message(body, cx))
.log_err()
{
task.detach();
}
}
}
fn load_more_messages(&mut self, _: &LoadMoreMessages, cx: &mut ViewContext<Self>) {
if let Some((channel, _)) = self.active_channel.as_ref() {
channel.update(cx, |channel, cx| {
channel.load_more_messages(cx);
})
}
}
}
impl Entity for ChatPanel {
type Event = Event;
}
impl View for ChatPanel {
fn ui_name() -> &'static str {
"ChatPanel"
}
fn render(&mut self, cx: &mut ViewContext<Self>) -> AnyElement<Self> {
let theme = theme::current(cx);
let element = if self.client.user_id().is_some() {
self.render_channel(cx)
} else {
self.render_sign_in_prompt(&theme, cx)
};
element
.contained()
.with_style(theme.chat_panel.container)
.constrained()
.with_min_width(150.)
.into_any()
}
fn focus_in(&mut self, _: AnyViewHandle, cx: &mut ViewContext<Self>) {
if matches!(
*self.client.status().borrow(),
client::Status::Connected { .. }
) {
cx.focus(&self.input_editor);
}
}
}
fn format_timestamp(
mut timestamp: OffsetDateTime,
mut now: OffsetDateTime,
local_timezone: UtcOffset,
) -> String {
timestamp = timestamp.to_offset(local_timezone);
now = now.to_offset(local_timezone);
let today = now.date();
let date = timestamp.date();
let mut hour = timestamp.hour();
let mut part = "am";
if hour > 12 {
hour -= 12;
part = "pm";
}
if date == today {
format!("{:02}:{:02}{}", hour, timestamp.minute(), part)
} else if date.next_day() == Some(today) {
format!("yesterday at {:02}:{:02}{}", hour, timestamp.minute(), part)
} else {
format!("{:02}/{}/{}", date.month() as u32, date.day(), date.year())
}
}

View File

@ -1,4 +1,5 @@
pub mod channel_view;
pub mod chat_panel;
pub mod collab_panel;
mod collab_titlebar_item;
mod contact_notification;

View File

@ -155,7 +155,16 @@ message Envelope {
RemoveChannelBufferCollaborator remove_channel_buffer_collaborator = 136;
UpdateChannelBufferCollaborator update_channel_buffer_collaborator = 139;
RejoinChannelBuffers rejoin_channel_buffers = 140;
RejoinChannelBuffersResponse rejoin_channel_buffers_response = 141; // Current max
RejoinChannelBuffersResponse rejoin_channel_buffers_response = 141;
JoinChannelChat join_channel_chat = 142;
JoinChannelChatResponse join_channel_chat_response = 143;
LeaveChannelChat leave_channel_chat = 144;
SendChannelMessage send_channel_message = 145;
SendChannelMessageResponse send_channel_message_response = 146;
ChannelMessageSent channel_message_sent = 147;
GetChannelMessages get_channel_messages = 148;
GetChannelMessagesResponse get_channel_messages_response = 149; // Current max
}
}
@ -1021,10 +1030,56 @@ message RenameChannel {
string name = 2;
}
message JoinChannelChat {
uint64 channel_id = 1;
}
message JoinChannelChatResponse {
repeated ChannelMessage messages = 1;
bool done = 2;
}
message LeaveChannelChat {
uint64 channel_id = 1;
}
message SendChannelMessage {
uint64 channel_id = 1;
string body = 2;
Nonce nonce = 3;
}
message SendChannelMessageResponse {
ChannelMessage message = 1;
}
message ChannelMessageSent {
uint64 channel_id = 1;
ChannelMessage message = 2;
}
message GetChannelMessages {
uint64 channel_id = 1;
uint64 before_message_id = 2;
}
message GetChannelMessagesResponse {
repeated ChannelMessage messages = 1;
bool done = 2;
}
message JoinChannelBuffer {
uint64 channel_id = 1;
}
message ChannelMessage {
uint64 id = 1;
string body = 2;
uint64 timestamp = 3;
uint64 sender_id = 4;
Nonce nonce = 5;
}
message RejoinChannelBuffers {
repeated ChannelBufferVersion buffers = 1;
}

View File

@ -147,6 +147,7 @@ messages!(
(CreateBufferForPeer, Foreground),
(CreateChannel, Foreground),
(ChannelResponse, Foreground),
(ChannelMessageSent, Foreground),
(CreateProjectEntry, Foreground),
(CreateRoom, Foreground),
(CreateRoomResponse, Foreground),
@ -163,6 +164,10 @@ messages!(
(GetCodeActionsResponse, Background),
(GetHover, Background),
(GetHoverResponse, Background),
(GetChannelMessages, Background),
(GetChannelMessagesResponse, Background),
(SendChannelMessage, Background),
(SendChannelMessageResponse, Background),
(GetCompletions, Background),
(GetCompletionsResponse, Background),
(GetDefinition, Background),
@ -184,6 +189,9 @@ messages!(
(JoinProjectResponse, Foreground),
(JoinRoom, Foreground),
(JoinRoomResponse, Foreground),
(JoinChannelChat, Foreground),
(JoinChannelChatResponse, Foreground),
(LeaveChannelChat, Foreground),
(LeaveProject, Foreground),
(LeaveRoom, Foreground),
(OpenBufferById, Background),
@ -293,6 +301,7 @@ request_messages!(
(InviteChannelMember, Ack),
(JoinProject, JoinProjectResponse),
(JoinRoom, JoinRoomResponse),
(JoinChannelChat, JoinChannelChatResponse),
(LeaveRoom, Ack),
(RejoinRoom, RejoinRoomResponse),
(IncomingCall, Ack),
@ -313,6 +322,8 @@ request_messages!(
(RespondToContactRequest, Ack),
(RespondToChannelInvite, Ack),
(SetChannelMemberAdmin, Ack),
(SendChannelMessage, SendChannelMessageResponse),
(GetChannelMessages, GetChannelMessagesResponse),
(GetChannelMembers, GetChannelMembersResponse),
(JoinChannel, JoinRoomResponse),
(RemoveChannel, Ack),
@ -388,6 +399,7 @@ entity_messages!(
entity_messages!(
channel_id,
ChannelMessageSent,
UpdateChannelBuffer,
RemoveChannelBufferCollaborator,
AddChannelBufferCollaborator,

View File

@ -49,6 +49,7 @@ pub struct Theme {
pub copilot: Copilot,
pub collab_panel: CollabPanel,
pub project_panel: ProjectPanel,
pub chat_panel: ChatPanel,
pub command_palette: CommandPalette,
pub picker: Picker,
pub editor: Editor,
@ -611,6 +612,17 @@ pub struct IconButton {
pub button_width: f32,
}
#[derive(Deserialize, Default, JsonSchema)]
pub struct ChatPanel {
#[serde(flatten)]
pub container: ContainerStyle,
pub channel_select: ChannelSelect,
pub input_editor: FieldEditor,
pub message: ChatMessage,
pub pending_message: ChatMessage,
pub sign_in_prompt: Interactive<TextStyle>,
}
#[derive(Deserialize, Default, JsonSchema)]
pub struct ChatMessage {
#[serde(flatten)]