Add notifications2

This commit is contained in:
Mikayla 2023-11-08 21:23:31 -08:00
parent 8330fb5f10
commit 7888dc4592
No known key found for this signature in database
11 changed files with 786 additions and 267 deletions

21
Cargo.lock generated
View File

@ -1748,6 +1748,7 @@ dependencies = [
"lsp2",
"nanoid",
"node_runtime",
"notifications2",
"parking_lot 0.11.2",
"pretty_assertions",
"project2",
@ -5484,6 +5485,26 @@ dependencies = [
"util",
]
[[package]]
name = "notifications2"
version = "0.1.0"
dependencies = [
"anyhow",
"channel2",
"client2",
"clock",
"collections",
"db2",
"feature_flags2",
"gpui2",
"rpc2",
"settings2",
"sum_tree",
"text2",
"time",
"util",
]
[[package]]
name = "ntapi"
version = "0.3.7"

View File

@ -67,6 +67,7 @@ members = [
"crates/multi_buffer2",
"crates/node_runtime",
"crates/notifications",
"crates/notifications2",
"crates/outline",
"crates/picker",
"crates/picker2",

View File

@ -72,10 +72,8 @@ fs = { package = "fs2", path = "../fs2", features = ["test-support"] }
git = { package = "git3", path = "../git3", features = ["test-support"] }
live_kit_client = { package = "live_kit_client2", path = "../live_kit_client2", features = ["test-support"] }
lsp = { package = "lsp2", path = "../lsp2", features = ["test-support"] }
node_runtime = { path = "../node_runtime" }
#todo!(notifications)
#notifications = { path = "../notifications", features = ["test-support"] }
notifications = { package = "notifications2", path = "../notifications2", features = ["test-support"] }
project = { package = "project2", path = "../project2", features = ["test-support"] }
rpc = { package = "rpc2", path = "../rpc2", features = ["test-support"] }

View File

@ -1,115 +1,115 @@
use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer};
use channel::{ChannelChat, ChannelMessageId};
use channel::{ChannelChat, ChannelMessageId, MessageParams};
use gpui::{BackgroundExecutor, Model, TestAppContext};
use rpc::Notification;
// todo!(notifications)
// #[gpui::test]
// async fn test_basic_channel_messages(
// executor: BackgroundExecutor,
// mut cx_a: &mut TestAppContext,
// mut cx_b: &mut TestAppContext,
// mut cx_c: &mut TestAppContext,
// ) {
// let mut server = TestServer::start(executor.clone()).await;
// let client_a = server.create_client(cx_a, "user_a").await;
// let client_b = server.create_client(cx_b, "user_b").await;
// let client_c = server.create_client(cx_c, "user_c").await;
#[gpui::test]
async fn test_basic_channel_messages(
executor: BackgroundExecutor,
mut cx_a: &mut TestAppContext,
mut cx_b: &mut TestAppContext,
mut cx_c: &mut TestAppContext,
) {
let mut server = TestServer::start(executor.clone()).await;
let client_a = server.create_client(cx_a, "user_a").await;
let client_b = server.create_client(cx_b, "user_b").await;
let client_c = server.create_client(cx_c, "user_c").await;
// let channel_id = server
// .make_channel(
// "the-channel",
// None,
// (&client_a, cx_a),
// &mut [(&client_b, cx_b), (&client_c, cx_c)],
// )
// .await;
let channel_id = server
.make_channel(
"the-channel",
None,
(&client_a, cx_a),
&mut [(&client_b, cx_b), (&client_c, cx_c)],
)
.await;
// let channel_chat_a = client_a
// .channel_store()
// .update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx))
// .await
// .unwrap();
// let channel_chat_b = client_b
// .channel_store()
// .update(cx_b, |store, cx| store.open_channel_chat(channel_id, cx))
// .await
// .unwrap();
let channel_chat_a = client_a
.channel_store()
.update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx))
.await
.unwrap();
let channel_chat_b = client_b
.channel_store()
.update(cx_b, |store, cx| store.open_channel_chat(channel_id, cx))
.await
.unwrap();
// let message_id = channel_chat_a
// .update(cx_a, |c, cx| {
// c.send_message(
// MessageParams {
// text: "hi @user_c!".into(),
// mentions: vec![(3..10, client_c.id())],
// },
// cx,
// )
// .unwrap()
// })
// .await
// .unwrap();
// channel_chat_a
// .update(cx_a, |c, cx| c.send_message("two".into(), cx).unwrap())
// .await
// .unwrap();
let message_id = channel_chat_a
.update(cx_a, |c, cx| {
c.send_message(
MessageParams {
text: "hi @user_c!".into(),
mentions: vec![(3..10, client_c.id())],
},
cx,
)
.unwrap()
})
.await
.unwrap();
channel_chat_a
.update(cx_a, |c, cx| c.send_message("two".into(), cx).unwrap())
.await
.unwrap();
// executor.run_until_parked();
// channel_chat_b
// .update(cx_b, |c, cx| c.send_message("three".into(), cx).unwrap())
// .await
// .unwrap();
executor.run_until_parked();
channel_chat_b
.update(cx_b, |c, cx| c.send_message("three".into(), cx).unwrap())
.await
.unwrap();
// executor.run_until_parked();
executor.run_until_parked();
// let channel_chat_c = client_c
// .channel_store()
// .update(cx_c, |store, cx| store.open_channel_chat(channel_id, cx))
// .await
// .unwrap();
let channel_chat_c = client_c
.channel_store()
.update(cx_c, |store, cx| store.open_channel_chat(channel_id, cx))
.await
.unwrap();
// for (chat, cx) in [
// (&channel_chat_a, &mut cx_a),
// (&channel_chat_b, &mut cx_b),
// (&channel_chat_c, &mut cx_c),
// ] {
// chat.update(*cx, |c, _| {
// assert_eq!(
// c.messages()
// .iter()
// .map(|m| (m.body.as_str(), m.mentions.as_slice()))
// .collect::<Vec<_>>(),
// vec![
// ("hi @user_c!", [(3..10, client_c.id())].as_slice()),
// ("two", &[]),
// ("three", &[])
// ],
// "results for user {}",
// c.client().id(),
// );
// });
// }
for (chat, cx) in [
(&channel_chat_a, &mut cx_a),
(&channel_chat_b, &mut cx_b),
(&channel_chat_c, &mut cx_c),
] {
chat.update(*cx, |c, _| {
assert_eq!(
c.messages()
.iter()
.map(|m| (m.body.as_str(), m.mentions.as_slice()))
.collect::<Vec<_>>(),
vec![
("hi @user_c!", [(3..10, client_c.id())].as_slice()),
("two", &[]),
("three", &[])
],
"results for user {}",
c.client().id(),
);
});
}
// client_c.notification_store().update(cx_c, |store, _| {
// assert_eq!(store.notification_count(), 2);
// assert_eq!(store.unread_notification_count(), 1);
// assert_eq!(
// store.notification_at(0).unwrap().notification,
// Notification::ChannelMessageMention {
// message_id,
// sender_id: client_a.id(),
// channel_id,
// }
// );
// assert_eq!(
// store.notification_at(1).unwrap().notification,
// Notification::ChannelInvitation {
// channel_id,
// channel_name: "the-channel".to_string(),
// inviter_id: client_a.id()
// }
// );
// });
// }
client_c.notification_store().update(cx_c, |store, _| {
assert_eq!(store.notification_count(), 2);
assert_eq!(store.unread_notification_count(), 1);
assert_eq!(
store.notification_at(0).unwrap().notification,
Notification::ChannelMessageMention {
message_id,
sender_id: client_a.id(),
channel_id,
}
);
assert_eq!(
store.notification_at(1).unwrap().notification,
Notification::ChannelInvitation {
channel_id,
channel_name: "the-channel".to_string(),
inviter_id: client_a.id()
}
);
});
}
#[gpui::test]
async fn test_rejoin_channel_chat(

View File

@ -1128,6 +1128,8 @@ async fn test_channel_link_notifications(
.await
.unwrap();
executor.run_until_parked();
// the members-only channel is still shown for c, but hidden for b
assert_channels_list_shape(
client_b.channel_store(),

View File

@ -1,160 +1,160 @@
//todo!(notifications)
// use crate::tests::TestServer;
// use gpui::{executor::Deterministic, TestAppContext};
// use notifications::NotificationEvent;
// use parking_lot::Mutex;
// use rpc::{proto, Notification};
// use std::sync::Arc;
use std::sync::Arc;
// #[gpui::test]
// async fn test_notifications(
// deterministic: Arc<Deterministic>,
// cx_a: &mut TestAppContext,
// cx_b: &mut TestAppContext,
// ) {
// deterministic.forbid_parking();
// let mut server = TestServer::start(&deterministic).await;
// let client_a = server.create_client(cx_a, "user_a").await;
// let client_b = server.create_client(cx_b, "user_b").await;
use gpui::{BackgroundExecutor, TestAppContext};
use notifications::NotificationEvent;
use parking_lot::Mutex;
use rpc::{proto, Notification};
// let notification_events_a = Arc::new(Mutex::new(Vec::new()));
// let notification_events_b = Arc::new(Mutex::new(Vec::new()));
// client_a.notification_store().update(cx_a, |_, cx| {
// let events = notification_events_a.clone();
// cx.subscribe(&cx.handle(), move |_, _, event, _| {
// events.lock().push(event.clone());
// })
// .detach()
// });
// client_b.notification_store().update(cx_b, |_, cx| {
// let events = notification_events_b.clone();
// cx.subscribe(&cx.handle(), move |_, _, event, _| {
// events.lock().push(event.clone());
// })
// .detach()
// });
use crate::tests::TestServer;
// // Client A sends a contact request to client B.
// client_a
// .user_store()
// .update(cx_a, |store, cx| store.request_contact(client_b.id(), cx))
// .await
// .unwrap();
#[gpui::test]
async fn test_notifications(
executor: BackgroundExecutor,
cx_a: &mut TestAppContext,
cx_b: &mut TestAppContext,
) {
let mut server = TestServer::start(executor.clone()).await;
let client_a = server.create_client(cx_a, "user_a").await;
let client_b = server.create_client(cx_b, "user_b").await;
// // Client B receives a contact request notification and responds to the
// // request, accepting it.
// deterministic.run_until_parked();
// client_b.notification_store().update(cx_b, |store, cx| {
// assert_eq!(store.notification_count(), 1);
// assert_eq!(store.unread_notification_count(), 1);
let notification_events_a = Arc::new(Mutex::new(Vec::new()));
let notification_events_b = Arc::new(Mutex::new(Vec::new()));
client_a.notification_store().update(cx_a, |_, cx| {
let events = notification_events_a.clone();
cx.subscribe(&cx.handle(), move |_, _, event, _| {
events.lock().push(event.clone());
})
.detach()
});
client_b.notification_store().update(cx_b, |_, cx| {
let events = notification_events_b.clone();
cx.subscribe(&cx.handle(), move |_, _, event, _| {
events.lock().push(event.clone());
})
.detach()
});
// let entry = store.notification_at(0).unwrap();
// assert_eq!(
// entry.notification,
// Notification::ContactRequest {
// sender_id: client_a.id()
// }
// );
// assert!(!entry.is_read);
// assert_eq!(
// &notification_events_b.lock()[0..],
// &[
// NotificationEvent::NewNotification {
// entry: entry.clone(),
// },
// NotificationEvent::NotificationsUpdated {
// old_range: 0..0,
// new_count: 1
// }
// ]
// );
// Client A sends a contact request to client B.
client_a
.user_store()
.update(cx_a, |store, cx| store.request_contact(client_b.id(), cx))
.await
.unwrap();
// store.respond_to_notification(entry.notification.clone(), true, cx);
// });
// Client B receives a contact request notification and responds to the
// request, accepting it.
executor.run_until_parked();
client_b.notification_store().update(cx_b, |store, cx| {
assert_eq!(store.notification_count(), 1);
assert_eq!(store.unread_notification_count(), 1);
// // Client B sees the notification is now read, and that they responded.
// deterministic.run_until_parked();
// client_b.notification_store().read_with(cx_b, |store, _| {
// assert_eq!(store.notification_count(), 1);
// assert_eq!(store.unread_notification_count(), 0);
let entry = store.notification_at(0).unwrap();
assert_eq!(
entry.notification,
Notification::ContactRequest {
sender_id: client_a.id()
}
);
assert!(!entry.is_read);
assert_eq!(
&notification_events_b.lock()[0..],
&[
NotificationEvent::NewNotification {
entry: entry.clone(),
},
NotificationEvent::NotificationsUpdated {
old_range: 0..0,
new_count: 1
}
]
);
// let entry = store.notification_at(0).unwrap();
// assert!(entry.is_read);
// assert_eq!(entry.response, Some(true));
// assert_eq!(
// &notification_events_b.lock()[2..],
// &[
// NotificationEvent::NotificationRead {
// entry: entry.clone(),
// },
// NotificationEvent::NotificationsUpdated {
// old_range: 0..1,
// new_count: 1
// }
// ]
// );
// });
store.respond_to_notification(entry.notification.clone(), true, cx);
});
// // Client A receives a notification that client B accepted their request.
// client_a.notification_store().read_with(cx_a, |store, _| {
// assert_eq!(store.notification_count(), 1);
// assert_eq!(store.unread_notification_count(), 1);
// Client B sees the notification is now read, and that they responded.
executor.run_until_parked();
client_b.notification_store().read_with(cx_b, |store, _| {
assert_eq!(store.notification_count(), 1);
assert_eq!(store.unread_notification_count(), 0);
// let entry = store.notification_at(0).unwrap();
// assert_eq!(
// entry.notification,
// Notification::ContactRequestAccepted {
// responder_id: client_b.id()
// }
// );
// assert!(!entry.is_read);
// });
let entry = store.notification_at(0).unwrap();
assert!(entry.is_read);
assert_eq!(entry.response, Some(true));
assert_eq!(
&notification_events_b.lock()[2..],
&[
NotificationEvent::NotificationRead {
entry: entry.clone(),
},
NotificationEvent::NotificationsUpdated {
old_range: 0..1,
new_count: 1
}
]
);
});
// // Client A creates a channel and invites client B to be a member.
// let channel_id = client_a
// .channel_store()
// .update(cx_a, |store, cx| {
// store.create_channel("the-channel", None, cx)
// })
// .await
// .unwrap();
// client_a
// .channel_store()
// .update(cx_a, |store, cx| {
// store.invite_member(channel_id, client_b.id(), proto::ChannelRole::Member, cx)
// })
// .await
// .unwrap();
// Client A receives a notification that client B accepted their request.
client_a.notification_store().read_with(cx_a, |store, _| {
assert_eq!(store.notification_count(), 1);
assert_eq!(store.unread_notification_count(), 1);
// // Client B receives a channel invitation notification and responds to the
// // invitation, accepting it.
// deterministic.run_until_parked();
// client_b.notification_store().update(cx_b, |store, cx| {
// assert_eq!(store.notification_count(), 2);
// assert_eq!(store.unread_notification_count(), 1);
let entry = store.notification_at(0).unwrap();
assert_eq!(
entry.notification,
Notification::ContactRequestAccepted {
responder_id: client_b.id()
}
);
assert!(!entry.is_read);
});
// let entry = store.notification_at(0).unwrap();
// assert_eq!(
// entry.notification,
// Notification::ChannelInvitation {
// channel_id,
// channel_name: "the-channel".to_string(),
// inviter_id: client_a.id()
// }
// );
// assert!(!entry.is_read);
// Client A creates a channel and invites client B to be a member.
let channel_id = client_a
.channel_store()
.update(cx_a, |store, cx| {
store.create_channel("the-channel", None, cx)
})
.await
.unwrap();
client_a
.channel_store()
.update(cx_a, |store, cx| {
store.invite_member(channel_id, client_b.id(), proto::ChannelRole::Member, cx)
})
.await
.unwrap();
// store.respond_to_notification(entry.notification.clone(), true, cx);
// });
// Client B receives a channel invitation notification and responds to the
// invitation, accepting it.
executor.run_until_parked();
client_b.notification_store().update(cx_b, |store, cx| {
assert_eq!(store.notification_count(), 2);
assert_eq!(store.unread_notification_count(), 1);
// // Client B sees the notification is now read, and that they responded.
// deterministic.run_until_parked();
// client_b.notification_store().read_with(cx_b, |store, _| {
// assert_eq!(store.notification_count(), 2);
// assert_eq!(store.unread_notification_count(), 0);
let entry = store.notification_at(0).unwrap();
assert_eq!(
entry.notification,
Notification::ChannelInvitation {
channel_id,
channel_name: "the-channel".to_string(),
inviter_id: client_a.id()
}
);
assert!(!entry.is_read);
// let entry = store.notification_at(0).unwrap();
// assert!(entry.is_read);
// assert_eq!(entry.response, Some(true));
// });
// }
store.respond_to_notification(entry.notification.clone(), true, cx);
});
// Client B sees the notification is now read, and that they responded.
executor.run_until_parked();
client_b.notification_store().read_with(cx_b, |store, _| {
assert_eq!(store.notification_count(), 2);
assert_eq!(store.unread_notification_count(), 0);
let entry = store.notification_at(0).unwrap();
assert!(entry.is_read);
assert_eq!(entry.response, Some(true));
});
}

View File

@ -220,14 +220,6 @@ impl RandomizedTest for RandomChannelBufferTest {
Ok(())
}
async fn on_client_added(client: &Rc<TestClient>, cx: &mut TestAppContext) {
let channel_store = client.channel_store();
while channel_store.read_with(cx, |store, _| store.channel_count() == 0) {
// todo!(notifications)
// channel_store.next_notification(cx).await;
}
}
async fn on_quiesce(server: &mut TestServer, clients: &mut [(Rc<TestClient>, TestAppContext)]) {
let channels = server.app_state.db.all_channels().await.unwrap();

View File

@ -115,7 +115,7 @@ pub trait RandomizedTest: 'static + Sized {
async fn initialize(server: &mut TestServer, users: &[UserTestPlan]);
async fn on_client_added(client: &Rc<TestClient>, cx: &mut TestAppContext);
async fn on_client_added(_client: &Rc<TestClient>, _cx: &mut TestAppContext) {}
async fn on_quiesce(server: &mut TestServer, client: &mut [(Rc<TestClient>, TestAppContext)]);
}

View File

@ -17,6 +17,7 @@ use gpui::{BackgroundExecutor, Context, Model, TestAppContext, WindowHandle};
use language::LanguageRegistry;
use node_runtime::FakeNodeRuntime;
use notifications::NotificationStore;
use parking_lot::Mutex;
use project::{Project, WorktreeId};
use rpc::{proto::ChannelRole, RECEIVE_TIMEOUT};
@ -47,8 +48,7 @@ pub struct TestClient {
pub username: String,
pub app_state: Arc<workspace::AppState>,
channel_store: Model<ChannelStore>,
// todo!(notifications)
// notification_store: Model<NotificationStore>,
notification_store: Model<NotificationStore>,
state: RefCell<TestClientState>,
}
@ -234,8 +234,7 @@ impl TestServer {
audio::init((), cx);
call::init(client.clone(), user_store.clone(), cx);
channel::init(&client, user_store.clone(), cx);
//todo(notifications)
// notifications::init(client.clone(), user_store, cx);
notifications::init(client.clone(), user_store, cx);
});
client
@ -247,8 +246,7 @@ impl TestServer {
app_state,
username: name.to_string(),
channel_store: cx.read(ChannelStore::global).clone(),
// todo!(notifications)
// notification_store: cx.read(NotificationStore::global).clone(),
notification_store: cx.read(NotificationStore::global).clone(),
state: Default::default(),
};
client.wait_for_current_user(cx).await;
@ -456,10 +454,9 @@ impl TestClient {
&self.channel_store
}
// todo!(notifications)
// pub fn notification_store(&self) -> &Model<NotificationStore> {
// &self.notification_store
// }
pub fn notification_store(&self) -> &Model<NotificationStore> {
&self.notification_store
}
pub fn user_store(&self) -> &Model<UserStore> {
&self.app_state.user_store

View File

@ -0,0 +1,42 @@
[package]
name = "notifications2"
version = "0.1.0"
edition = "2021"
publish = false
[lib]
path = "src/notification_store2.rs"
doctest = false
[features]
test-support = [
"channel/test-support",
"collections/test-support",
"gpui/test-support",
"rpc/test-support",
]
[dependencies]
channel = { package = "channel2", path = "../channel2" }
client = { package = "client2", path = "../client2" }
clock = { path = "../clock" }
collections = { path = "../collections" }
db = { package = "db2", path = "../db2" }
feature_flags = { package = "feature_flags2", path = "../feature_flags2" }
gpui = { package = "gpui2", path = "../gpui2" }
rpc = { package = "rpc2", path = "../rpc2" }
settings = { package = "settings2", path = "../settings2" }
sum_tree = { path = "../sum_tree" }
text = { package = "text2", path = "../text2" }
util = { path = "../util" }
anyhow.workspace = true
time.workspace = true
[dev-dependencies]
client = { package = "client2", path = "../client2", features = ["test-support"] }
collections = { path = "../collections", features = ["test-support"] }
gpui = { package = "gpui2", path = "../gpui2", features = ["test-support"] }
rpc = { package = "rpc2", path = "../rpc2", features = ["test-support"] }
settings = { package = "settings2", path = "../settings2", features = ["test-support"] }
util = { path = "../util", features = ["test-support"] }

View File

@ -0,0 +1,466 @@
use anyhow::{Context, Result};
use channel::{ChannelMessage, ChannelMessageId, ChannelStore};
use client::{Client, UserStore};
use collections::HashMap;
use db::smol::stream::StreamExt;
use gpui::{AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task};
use rpc::{proto, Notification, TypedEnvelope};
use std::{ops::Range, sync::Arc};
use sum_tree::{Bias, SumTree};
use time::OffsetDateTime;
use util::ResultExt;
pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
let notification_store = cx.build_model(|cx| NotificationStore::new(client, user_store, cx));
cx.set_global(notification_store);
}
pub struct NotificationStore {
client: Arc<Client>,
user_store: Model<UserStore>,
channel_messages: HashMap<u64, ChannelMessage>,
channel_store: Model<ChannelStore>,
notifications: SumTree<NotificationEntry>,
loaded_all_notifications: bool,
_watch_connection_status: Task<Option<()>>,
_subscriptions: Vec<client::Subscription>,
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum NotificationEvent {
NotificationsUpdated {
old_range: Range<usize>,
new_count: usize,
},
NewNotification {
entry: NotificationEntry,
},
NotificationRemoved {
entry: NotificationEntry,
},
NotificationRead {
entry: NotificationEntry,
},
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct NotificationEntry {
pub id: u64,
pub notification: Notification,
pub timestamp: OffsetDateTime,
pub is_read: bool,
pub response: Option<bool>,
}
#[derive(Clone, Debug, Default)]
pub struct NotificationSummary {
max_id: u64,
count: usize,
unread_count: usize,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
struct Count(usize);
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
struct UnreadCount(usize);
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
struct NotificationId(u64);
impl NotificationStore {
pub fn global(cx: &AppContext) -> Model<Self> {
cx.global::<Model<Self>>().clone()
}
pub fn new(
client: Arc<Client>,
user_store: Model<UserStore>,
cx: &mut ModelContext<Self>,
) -> Self {
let mut connection_status = client.status();
let watch_connection_status = cx.spawn(|this, mut cx| async move {
while let Some(status) = connection_status.next().await {
let this = this.upgrade()?;
match status {
client::Status::Connected { .. } => {
if let Some(task) = this
.update(&mut cx, |this, cx| this.handle_connect(cx))
.log_err()?
{
task.await.log_err()?;
}
}
_ => this
.update(&mut cx, |this, cx| this.handle_disconnect(cx))
.log_err()?,
}
}
Some(())
});
Self {
channel_store: ChannelStore::global(cx),
notifications: Default::default(),
loaded_all_notifications: false,
channel_messages: Default::default(),
_watch_connection_status: watch_connection_status,
_subscriptions: vec![
client.add_message_handler(cx.weak_model(), Self::handle_new_notification),
client.add_message_handler(cx.weak_model(), Self::handle_delete_notification),
],
user_store,
client,
}
}
pub fn notification_count(&self) -> usize {
self.notifications.summary().count
}
pub fn unread_notification_count(&self) -> usize {
self.notifications.summary().unread_count
}
pub fn channel_message_for_id(&self, id: u64) -> Option<&ChannelMessage> {
self.channel_messages.get(&id)
}
// Get the nth newest notification.
pub fn notification_at(&self, ix: usize) -> Option<&NotificationEntry> {
let count = self.notifications.summary().count;
if ix >= count {
return None;
}
let ix = count - 1 - ix;
let mut cursor = self.notifications.cursor::<Count>();
cursor.seek(&Count(ix), Bias::Right, &());
cursor.item()
}
pub fn notification_for_id(&self, id: u64) -> Option<&NotificationEntry> {
let mut cursor = self.notifications.cursor::<NotificationId>();
cursor.seek(&NotificationId(id), Bias::Left, &());
if let Some(item) = cursor.item() {
if item.id == id {
return Some(item);
}
}
None
}
pub fn load_more_notifications(
&self,
clear_old: bool,
cx: &mut ModelContext<Self>,
) -> Option<Task<Result<()>>> {
if self.loaded_all_notifications && !clear_old {
return None;
}
let before_id = if clear_old {
None
} else {
self.notifications.first().map(|entry| entry.id)
};
let request = self.client.request(proto::GetNotifications { before_id });
Some(cx.spawn(|this, mut cx| async move {
let this = this
.upgrade()
.context("Notification store was dropped while loading notifications")?;
let response = request.await?;
this.update(&mut cx, |this, _| {
this.loaded_all_notifications = response.done
})?;
Self::add_notifications(
this,
response.notifications,
AddNotificationsOptions {
is_new: false,
clear_old,
includes_first: response.done,
},
cx,
)
.await?;
Ok(())
}))
}
fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Option<Task<Result<()>>> {
self.notifications = Default::default();
self.channel_messages = Default::default();
cx.notify();
self.load_more_notifications(true, cx)
}
fn handle_disconnect(&mut self, cx: &mut ModelContext<Self>) {
cx.notify()
}
async fn handle_new_notification(
this: Model<Self>,
envelope: TypedEnvelope<proto::AddNotification>,
_: Arc<Client>,
cx: AsyncAppContext,
) -> Result<()> {
Self::add_notifications(
this,
envelope.payload.notification.into_iter().collect(),
AddNotificationsOptions {
is_new: true,
clear_old: false,
includes_first: false,
},
cx,
)
.await
}
async fn handle_delete_notification(
this: Model<Self>,
envelope: TypedEnvelope<proto::DeleteNotification>,
_: Arc<Client>,
mut cx: AsyncAppContext,
) -> Result<()> {
this.update(&mut cx, |this, cx| {
this.splice_notifications([(envelope.payload.notification_id, None)], false, cx);
Ok(())
})?
}
async fn add_notifications(
this: Model<Self>,
notifications: Vec<proto::Notification>,
options: AddNotificationsOptions,
mut cx: AsyncAppContext,
) -> Result<()> {
let mut user_ids = Vec::new();
let mut message_ids = Vec::new();
let notifications = notifications
.into_iter()
.filter_map(|message| {
Some(NotificationEntry {
id: message.id,
is_read: message.is_read,
timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)
.ok()?,
notification: Notification::from_proto(&message)?,
response: message.response,
})
})
.collect::<Vec<_>>();
if notifications.is_empty() {
return Ok(());
}
for entry in &notifications {
match entry.notification {
Notification::ChannelInvitation { inviter_id, .. } => {
user_ids.push(inviter_id);
}
Notification::ContactRequest {
sender_id: requester_id,
} => {
user_ids.push(requester_id);
}
Notification::ContactRequestAccepted {
responder_id: contact_id,
} => {
user_ids.push(contact_id);
}
Notification::ChannelMessageMention {
sender_id,
message_id,
..
} => {
user_ids.push(sender_id);
message_ids.push(message_id);
}
}
}
let (user_store, channel_store) = this.read_with(&cx, |this, _| {
(this.user_store.clone(), this.channel_store.clone())
})?;
user_store
.update(&mut cx, |store, cx| store.get_users(user_ids, cx))?
.await?;
let messages = channel_store
.update(&mut cx, |store, cx| {
store.fetch_channel_messages(message_ids, cx)
})?
.await?;
this.update(&mut cx, |this, cx| {
if options.clear_old {
cx.emit(NotificationEvent::NotificationsUpdated {
old_range: 0..this.notifications.summary().count,
new_count: 0,
});
this.notifications = SumTree::default();
this.channel_messages.clear();
this.loaded_all_notifications = false;
}
if options.includes_first {
this.loaded_all_notifications = true;
}
this.channel_messages
.extend(messages.into_iter().filter_map(|message| {
if let ChannelMessageId::Saved(id) = message.id {
Some((id, message))
} else {
None
}
}));
this.splice_notifications(
notifications
.into_iter()
.map(|notification| (notification.id, Some(notification))),
options.is_new,
cx,
);
})
.log_err();
Ok(())
}
fn splice_notifications(
&mut self,
notifications: impl IntoIterator<Item = (u64, Option<NotificationEntry>)>,
is_new: bool,
cx: &mut ModelContext<'_, NotificationStore>,
) {
let mut cursor = self.notifications.cursor::<(NotificationId, Count)>();
let mut new_notifications = SumTree::new();
let mut old_range = 0..0;
for (i, (id, new_notification)) in notifications.into_iter().enumerate() {
new_notifications.append(cursor.slice(&NotificationId(id), Bias::Left, &()), &());
if i == 0 {
old_range.start = cursor.start().1 .0;
}
let old_notification = cursor.item();
if let Some(old_notification) = old_notification {
if old_notification.id == id {
cursor.next(&());
if let Some(new_notification) = &new_notification {
if new_notification.is_read {
cx.emit(NotificationEvent::NotificationRead {
entry: new_notification.clone(),
});
}
} else {
cx.emit(NotificationEvent::NotificationRemoved {
entry: old_notification.clone(),
});
}
}
} else if let Some(new_notification) = &new_notification {
if is_new {
cx.emit(NotificationEvent::NewNotification {
entry: new_notification.clone(),
});
}
}
if let Some(notification) = new_notification {
new_notifications.push(notification, &());
}
}
old_range.end = cursor.start().1 .0;
let new_count = new_notifications.summary().count - old_range.start;
new_notifications.append(cursor.suffix(&()), &());
drop(cursor);
self.notifications = new_notifications;
cx.emit(NotificationEvent::NotificationsUpdated {
old_range,
new_count,
});
}
pub fn respond_to_notification(
&mut self,
notification: Notification,
response: bool,
cx: &mut ModelContext<Self>,
) {
match notification {
Notification::ContactRequest { sender_id } => {
self.user_store
.update(cx, |store, cx| {
store.respond_to_contact_request(sender_id, response, cx)
})
.detach();
}
Notification::ChannelInvitation { channel_id, .. } => {
self.channel_store
.update(cx, |store, cx| {
store.respond_to_channel_invite(channel_id, response, cx)
})
.detach();
}
_ => {}
}
}
}
impl EventEmitter<NotificationEvent> for NotificationStore {}
impl sum_tree::Item for NotificationEntry {
type Summary = NotificationSummary;
fn summary(&self) -> Self::Summary {
NotificationSummary {
max_id: self.id,
count: 1,
unread_count: if self.is_read { 0 } else { 1 },
}
}
}
impl sum_tree::Summary for NotificationSummary {
type Context = ();
fn add_summary(&mut self, summary: &Self, _: &()) {
self.max_id = self.max_id.max(summary.max_id);
self.count += summary.count;
self.unread_count += summary.unread_count;
}
}
impl<'a> sum_tree::Dimension<'a, NotificationSummary> for NotificationId {
fn add_summary(&mut self, summary: &NotificationSummary, _: &()) {
debug_assert!(summary.max_id > self.0);
self.0 = summary.max_id;
}
}
impl<'a> sum_tree::Dimension<'a, NotificationSummary> for Count {
fn add_summary(&mut self, summary: &NotificationSummary, _: &()) {
self.0 += summary.count;
}
}
impl<'a> sum_tree::Dimension<'a, NotificationSummary> for UnreadCount {
fn add_summary(&mut self, summary: &NotificationSummary, _: &()) {
self.0 += summary.unread_count;
}
}
struct AddNotificationsOptions {
is_new: bool,
clear_old: bool,
includes_first: bool,
}