diff --git a/Cargo.lock b/Cargo.lock index 52a4bae21f..ded64052c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1748,6 +1748,7 @@ dependencies = [ "lsp2", "nanoid", "node_runtime", + "notifications2", "parking_lot 0.11.2", "pretty_assertions", "project2", @@ -5484,6 +5485,26 @@ dependencies = [ "util", ] +[[package]] +name = "notifications2" +version = "0.1.0" +dependencies = [ + "anyhow", + "channel2", + "client2", + "clock", + "collections", + "db2", + "feature_flags2", + "gpui2", + "rpc2", + "settings2", + "sum_tree", + "text2", + "time", + "util", +] + [[package]] name = "ntapi" version = "0.3.7" diff --git a/Cargo.toml b/Cargo.toml index 5f4bebfe49..1b8081d066 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,6 +67,7 @@ members = [ "crates/multi_buffer2", "crates/node_runtime", "crates/notifications", + "crates/notifications2", "crates/outline", "crates/picker", "crates/picker2", diff --git a/crates/collab2/Cargo.toml b/crates/collab2/Cargo.toml index fe050a2aa8..4ce0a843f5 100644 --- a/crates/collab2/Cargo.toml +++ b/crates/collab2/Cargo.toml @@ -72,10 +72,8 @@ fs = { package = "fs2", path = "../fs2", features = ["test-support"] } git = { package = "git3", path = "../git3", features = ["test-support"] } live_kit_client = { package = "live_kit_client2", path = "../live_kit_client2", features = ["test-support"] } lsp = { package = "lsp2", path = "../lsp2", features = ["test-support"] } - node_runtime = { path = "../node_runtime" } -#todo!(notifications) -#notifications = { path = "../notifications", features = ["test-support"] } +notifications = { package = "notifications2", path = "../notifications2", features = ["test-support"] } project = { package = "project2", path = "../project2", features = ["test-support"] } rpc = { package = "rpc2", path = "../rpc2", features = ["test-support"] } diff --git a/crates/collab2/src/tests/channel_message_tests.rs b/crates/collab2/src/tests/channel_message_tests.rs index 4d030dd679..f5da0e3ee6 100644 --- a/crates/collab2/src/tests/channel_message_tests.rs +++ b/crates/collab2/src/tests/channel_message_tests.rs @@ -1,115 +1,115 @@ use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer}; -use channel::{ChannelChat, ChannelMessageId}; +use channel::{ChannelChat, ChannelMessageId, MessageParams}; use gpui::{BackgroundExecutor, Model, TestAppContext}; +use rpc::Notification; -// todo!(notifications) -// #[gpui::test] -// async fn test_basic_channel_messages( -// executor: BackgroundExecutor, -// mut cx_a: &mut TestAppContext, -// mut cx_b: &mut TestAppContext, -// mut cx_c: &mut TestAppContext, -// ) { -// let mut server = TestServer::start(executor.clone()).await; -// let client_a = server.create_client(cx_a, "user_a").await; -// let client_b = server.create_client(cx_b, "user_b").await; -// let client_c = server.create_client(cx_c, "user_c").await; +#[gpui::test] +async fn test_basic_channel_messages( + executor: BackgroundExecutor, + mut cx_a: &mut TestAppContext, + mut cx_b: &mut TestAppContext, + mut cx_c: &mut TestAppContext, +) { + let mut server = TestServer::start(executor.clone()).await; + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + let client_c = server.create_client(cx_c, "user_c").await; -// let channel_id = server -// .make_channel( -// "the-channel", -// None, -// (&client_a, cx_a), -// &mut [(&client_b, cx_b), (&client_c, cx_c)], -// ) -// .await; + let channel_id = server + .make_channel( + "the-channel", + None, + (&client_a, cx_a), + &mut [(&client_b, cx_b), (&client_c, cx_c)], + ) + .await; -// let channel_chat_a = client_a -// .channel_store() -// .update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx)) -// .await -// .unwrap(); -// let channel_chat_b = client_b -// .channel_store() -// .update(cx_b, |store, cx| store.open_channel_chat(channel_id, cx)) -// .await -// .unwrap(); + let channel_chat_a = client_a + .channel_store() + .update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx)) + .await + .unwrap(); + let channel_chat_b = client_b + .channel_store() + .update(cx_b, |store, cx| store.open_channel_chat(channel_id, cx)) + .await + .unwrap(); -// let message_id = channel_chat_a -// .update(cx_a, |c, cx| { -// c.send_message( -// MessageParams { -// text: "hi @user_c!".into(), -// mentions: vec![(3..10, client_c.id())], -// }, -// cx, -// ) -// .unwrap() -// }) -// .await -// .unwrap(); -// channel_chat_a -// .update(cx_a, |c, cx| c.send_message("two".into(), cx).unwrap()) -// .await -// .unwrap(); + let message_id = channel_chat_a + .update(cx_a, |c, cx| { + c.send_message( + MessageParams { + text: "hi @user_c!".into(), + mentions: vec![(3..10, client_c.id())], + }, + cx, + ) + .unwrap() + }) + .await + .unwrap(); + channel_chat_a + .update(cx_a, |c, cx| c.send_message("two".into(), cx).unwrap()) + .await + .unwrap(); -// executor.run_until_parked(); -// channel_chat_b -// .update(cx_b, |c, cx| c.send_message("three".into(), cx).unwrap()) -// .await -// .unwrap(); + executor.run_until_parked(); + channel_chat_b + .update(cx_b, |c, cx| c.send_message("three".into(), cx).unwrap()) + .await + .unwrap(); -// executor.run_until_parked(); + executor.run_until_parked(); -// let channel_chat_c = client_c -// .channel_store() -// .update(cx_c, |store, cx| store.open_channel_chat(channel_id, cx)) -// .await -// .unwrap(); + let channel_chat_c = client_c + .channel_store() + .update(cx_c, |store, cx| store.open_channel_chat(channel_id, cx)) + .await + .unwrap(); -// for (chat, cx) in [ -// (&channel_chat_a, &mut cx_a), -// (&channel_chat_b, &mut cx_b), -// (&channel_chat_c, &mut cx_c), -// ] { -// chat.update(*cx, |c, _| { -// assert_eq!( -// c.messages() -// .iter() -// .map(|m| (m.body.as_str(), m.mentions.as_slice())) -// .collect::>(), -// vec![ -// ("hi @user_c!", [(3..10, client_c.id())].as_slice()), -// ("two", &[]), -// ("three", &[]) -// ], -// "results for user {}", -// c.client().id(), -// ); -// }); -// } + for (chat, cx) in [ + (&channel_chat_a, &mut cx_a), + (&channel_chat_b, &mut cx_b), + (&channel_chat_c, &mut cx_c), + ] { + chat.update(*cx, |c, _| { + assert_eq!( + c.messages() + .iter() + .map(|m| (m.body.as_str(), m.mentions.as_slice())) + .collect::>(), + vec![ + ("hi @user_c!", [(3..10, client_c.id())].as_slice()), + ("two", &[]), + ("three", &[]) + ], + "results for user {}", + c.client().id(), + ); + }); + } -// client_c.notification_store().update(cx_c, |store, _| { -// assert_eq!(store.notification_count(), 2); -// assert_eq!(store.unread_notification_count(), 1); -// assert_eq!( -// store.notification_at(0).unwrap().notification, -// Notification::ChannelMessageMention { -// message_id, -// sender_id: client_a.id(), -// channel_id, -// } -// ); -// assert_eq!( -// store.notification_at(1).unwrap().notification, -// Notification::ChannelInvitation { -// channel_id, -// channel_name: "the-channel".to_string(), -// inviter_id: client_a.id() -// } -// ); -// }); -// } + client_c.notification_store().update(cx_c, |store, _| { + assert_eq!(store.notification_count(), 2); + assert_eq!(store.unread_notification_count(), 1); + assert_eq!( + store.notification_at(0).unwrap().notification, + Notification::ChannelMessageMention { + message_id, + sender_id: client_a.id(), + channel_id, + } + ); + assert_eq!( + store.notification_at(1).unwrap().notification, + Notification::ChannelInvitation { + channel_id, + channel_name: "the-channel".to_string(), + inviter_id: client_a.id() + } + ); + }); +} #[gpui::test] async fn test_rejoin_channel_chat( diff --git a/crates/collab2/src/tests/channel_tests.rs b/crates/collab2/src/tests/channel_tests.rs index 31c092bd08..8ce5d99b80 100644 --- a/crates/collab2/src/tests/channel_tests.rs +++ b/crates/collab2/src/tests/channel_tests.rs @@ -1128,6 +1128,8 @@ async fn test_channel_link_notifications( .await .unwrap(); + executor.run_until_parked(); + // the members-only channel is still shown for c, but hidden for b assert_channels_list_shape( client_b.channel_store(), diff --git a/crates/collab2/src/tests/notification_tests.rs b/crates/collab2/src/tests/notification_tests.rs index 021591ee09..f6066e6409 100644 --- a/crates/collab2/src/tests/notification_tests.rs +++ b/crates/collab2/src/tests/notification_tests.rs @@ -1,160 +1,160 @@ -//todo!(notifications) -// use crate::tests::TestServer; -// use gpui::{executor::Deterministic, TestAppContext}; -// use notifications::NotificationEvent; -// use parking_lot::Mutex; -// use rpc::{proto, Notification}; -// use std::sync::Arc; +use std::sync::Arc; -// #[gpui::test] -// async fn test_notifications( -// deterministic: Arc, -// cx_a: &mut TestAppContext, -// cx_b: &mut TestAppContext, -// ) { -// deterministic.forbid_parking(); -// let mut server = TestServer::start(&deterministic).await; -// let client_a = server.create_client(cx_a, "user_a").await; -// let client_b = server.create_client(cx_b, "user_b").await; +use gpui::{BackgroundExecutor, TestAppContext}; +use notifications::NotificationEvent; +use parking_lot::Mutex; +use rpc::{proto, Notification}; -// let notification_events_a = Arc::new(Mutex::new(Vec::new())); -// let notification_events_b = Arc::new(Mutex::new(Vec::new())); -// client_a.notification_store().update(cx_a, |_, cx| { -// let events = notification_events_a.clone(); -// cx.subscribe(&cx.handle(), move |_, _, event, _| { -// events.lock().push(event.clone()); -// }) -// .detach() -// }); -// client_b.notification_store().update(cx_b, |_, cx| { -// let events = notification_events_b.clone(); -// cx.subscribe(&cx.handle(), move |_, _, event, _| { -// events.lock().push(event.clone()); -// }) -// .detach() -// }); +use crate::tests::TestServer; -// // Client A sends a contact request to client B. -// client_a -// .user_store() -// .update(cx_a, |store, cx| store.request_contact(client_b.id(), cx)) -// .await -// .unwrap(); +#[gpui::test] +async fn test_notifications( + executor: BackgroundExecutor, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + let mut server = TestServer::start(executor.clone()).await; + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; -// // Client B receives a contact request notification and responds to the -// // request, accepting it. -// deterministic.run_until_parked(); -// client_b.notification_store().update(cx_b, |store, cx| { -// assert_eq!(store.notification_count(), 1); -// assert_eq!(store.unread_notification_count(), 1); + let notification_events_a = Arc::new(Mutex::new(Vec::new())); + let notification_events_b = Arc::new(Mutex::new(Vec::new())); + client_a.notification_store().update(cx_a, |_, cx| { + let events = notification_events_a.clone(); + cx.subscribe(&cx.handle(), move |_, _, event, _| { + events.lock().push(event.clone()); + }) + .detach() + }); + client_b.notification_store().update(cx_b, |_, cx| { + let events = notification_events_b.clone(); + cx.subscribe(&cx.handle(), move |_, _, event, _| { + events.lock().push(event.clone()); + }) + .detach() + }); -// let entry = store.notification_at(0).unwrap(); -// assert_eq!( -// entry.notification, -// Notification::ContactRequest { -// sender_id: client_a.id() -// } -// ); -// assert!(!entry.is_read); -// assert_eq!( -// ¬ification_events_b.lock()[0..], -// &[ -// NotificationEvent::NewNotification { -// entry: entry.clone(), -// }, -// NotificationEvent::NotificationsUpdated { -// old_range: 0..0, -// new_count: 1 -// } -// ] -// ); + // Client A sends a contact request to client B. + client_a + .user_store() + .update(cx_a, |store, cx| store.request_contact(client_b.id(), cx)) + .await + .unwrap(); -// store.respond_to_notification(entry.notification.clone(), true, cx); -// }); + // Client B receives a contact request notification and responds to the + // request, accepting it. + executor.run_until_parked(); + client_b.notification_store().update(cx_b, |store, cx| { + assert_eq!(store.notification_count(), 1); + assert_eq!(store.unread_notification_count(), 1); -// // Client B sees the notification is now read, and that they responded. -// deterministic.run_until_parked(); -// client_b.notification_store().read_with(cx_b, |store, _| { -// assert_eq!(store.notification_count(), 1); -// assert_eq!(store.unread_notification_count(), 0); + let entry = store.notification_at(0).unwrap(); + assert_eq!( + entry.notification, + Notification::ContactRequest { + sender_id: client_a.id() + } + ); + assert!(!entry.is_read); + assert_eq!( + ¬ification_events_b.lock()[0..], + &[ + NotificationEvent::NewNotification { + entry: entry.clone(), + }, + NotificationEvent::NotificationsUpdated { + old_range: 0..0, + new_count: 1 + } + ] + ); -// let entry = store.notification_at(0).unwrap(); -// assert!(entry.is_read); -// assert_eq!(entry.response, Some(true)); -// assert_eq!( -// ¬ification_events_b.lock()[2..], -// &[ -// NotificationEvent::NotificationRead { -// entry: entry.clone(), -// }, -// NotificationEvent::NotificationsUpdated { -// old_range: 0..1, -// new_count: 1 -// } -// ] -// ); -// }); + store.respond_to_notification(entry.notification.clone(), true, cx); + }); -// // Client A receives a notification that client B accepted their request. -// client_a.notification_store().read_with(cx_a, |store, _| { -// assert_eq!(store.notification_count(), 1); -// assert_eq!(store.unread_notification_count(), 1); + // Client B sees the notification is now read, and that they responded. + executor.run_until_parked(); + client_b.notification_store().read_with(cx_b, |store, _| { + assert_eq!(store.notification_count(), 1); + assert_eq!(store.unread_notification_count(), 0); -// let entry = store.notification_at(0).unwrap(); -// assert_eq!( -// entry.notification, -// Notification::ContactRequestAccepted { -// responder_id: client_b.id() -// } -// ); -// assert!(!entry.is_read); -// }); + let entry = store.notification_at(0).unwrap(); + assert!(entry.is_read); + assert_eq!(entry.response, Some(true)); + assert_eq!( + ¬ification_events_b.lock()[2..], + &[ + NotificationEvent::NotificationRead { + entry: entry.clone(), + }, + NotificationEvent::NotificationsUpdated { + old_range: 0..1, + new_count: 1 + } + ] + ); + }); -// // Client A creates a channel and invites client B to be a member. -// let channel_id = client_a -// .channel_store() -// .update(cx_a, |store, cx| { -// store.create_channel("the-channel", None, cx) -// }) -// .await -// .unwrap(); -// client_a -// .channel_store() -// .update(cx_a, |store, cx| { -// store.invite_member(channel_id, client_b.id(), proto::ChannelRole::Member, cx) -// }) -// .await -// .unwrap(); + // Client A receives a notification that client B accepted their request. + client_a.notification_store().read_with(cx_a, |store, _| { + assert_eq!(store.notification_count(), 1); + assert_eq!(store.unread_notification_count(), 1); -// // Client B receives a channel invitation notification and responds to the -// // invitation, accepting it. -// deterministic.run_until_parked(); -// client_b.notification_store().update(cx_b, |store, cx| { -// assert_eq!(store.notification_count(), 2); -// assert_eq!(store.unread_notification_count(), 1); + let entry = store.notification_at(0).unwrap(); + assert_eq!( + entry.notification, + Notification::ContactRequestAccepted { + responder_id: client_b.id() + } + ); + assert!(!entry.is_read); + }); -// let entry = store.notification_at(0).unwrap(); -// assert_eq!( -// entry.notification, -// Notification::ChannelInvitation { -// channel_id, -// channel_name: "the-channel".to_string(), -// inviter_id: client_a.id() -// } -// ); -// assert!(!entry.is_read); + // Client A creates a channel and invites client B to be a member. + let channel_id = client_a + .channel_store() + .update(cx_a, |store, cx| { + store.create_channel("the-channel", None, cx) + }) + .await + .unwrap(); + client_a + .channel_store() + .update(cx_a, |store, cx| { + store.invite_member(channel_id, client_b.id(), proto::ChannelRole::Member, cx) + }) + .await + .unwrap(); -// store.respond_to_notification(entry.notification.clone(), true, cx); -// }); + // Client B receives a channel invitation notification and responds to the + // invitation, accepting it. + executor.run_until_parked(); + client_b.notification_store().update(cx_b, |store, cx| { + assert_eq!(store.notification_count(), 2); + assert_eq!(store.unread_notification_count(), 1); -// // Client B sees the notification is now read, and that they responded. -// deterministic.run_until_parked(); -// client_b.notification_store().read_with(cx_b, |store, _| { -// assert_eq!(store.notification_count(), 2); -// assert_eq!(store.unread_notification_count(), 0); + let entry = store.notification_at(0).unwrap(); + assert_eq!( + entry.notification, + Notification::ChannelInvitation { + channel_id, + channel_name: "the-channel".to_string(), + inviter_id: client_a.id() + } + ); + assert!(!entry.is_read); -// let entry = store.notification_at(0).unwrap(); -// assert!(entry.is_read); -// assert_eq!(entry.response, Some(true)); -// }); -// } + store.respond_to_notification(entry.notification.clone(), true, cx); + }); + + // Client B sees the notification is now read, and that they responded. + executor.run_until_parked(); + client_b.notification_store().read_with(cx_b, |store, _| { + assert_eq!(store.notification_count(), 2); + assert_eq!(store.unread_notification_count(), 0); + + let entry = store.notification_at(0).unwrap(); + assert!(entry.is_read); + assert_eq!(entry.response, Some(true)); + }); +} diff --git a/crates/collab2/src/tests/random_channel_buffer_tests.rs b/crates/collab2/src/tests/random_channel_buffer_tests.rs index 01f8daa5d2..14b5da0287 100644 --- a/crates/collab2/src/tests/random_channel_buffer_tests.rs +++ b/crates/collab2/src/tests/random_channel_buffer_tests.rs @@ -220,14 +220,6 @@ impl RandomizedTest for RandomChannelBufferTest { Ok(()) } - async fn on_client_added(client: &Rc, cx: &mut TestAppContext) { - let channel_store = client.channel_store(); - while channel_store.read_with(cx, |store, _| store.channel_count() == 0) { - // todo!(notifications) - // channel_store.next_notification(cx).await; - } - } - async fn on_quiesce(server: &mut TestServer, clients: &mut [(Rc, TestAppContext)]) { let channels = server.app_state.db.all_channels().await.unwrap(); diff --git a/crates/collab2/src/tests/randomized_test_helpers.rs b/crates/collab2/src/tests/randomized_test_helpers.rs index ac63738a36..91bd9cf6f6 100644 --- a/crates/collab2/src/tests/randomized_test_helpers.rs +++ b/crates/collab2/src/tests/randomized_test_helpers.rs @@ -115,7 +115,7 @@ pub trait RandomizedTest: 'static + Sized { async fn initialize(server: &mut TestServer, users: &[UserTestPlan]); - async fn on_client_added(client: &Rc, cx: &mut TestAppContext); + async fn on_client_added(_client: &Rc, _cx: &mut TestAppContext) {} async fn on_quiesce(server: &mut TestServer, client: &mut [(Rc, TestAppContext)]); } diff --git a/crates/collab2/src/tests/test_server.rs b/crates/collab2/src/tests/test_server.rs index 76a587ffde..d0ab917d68 100644 --- a/crates/collab2/src/tests/test_server.rs +++ b/crates/collab2/src/tests/test_server.rs @@ -17,6 +17,7 @@ use gpui::{BackgroundExecutor, Context, Model, TestAppContext, WindowHandle}; use language::LanguageRegistry; use node_runtime::FakeNodeRuntime; +use notifications::NotificationStore; use parking_lot::Mutex; use project::{Project, WorktreeId}; use rpc::{proto::ChannelRole, RECEIVE_TIMEOUT}; @@ -47,8 +48,7 @@ pub struct TestClient { pub username: String, pub app_state: Arc, channel_store: Model, - // todo!(notifications) - // notification_store: Model, + notification_store: Model, state: RefCell, } @@ -234,8 +234,7 @@ impl TestServer { audio::init((), cx); call::init(client.clone(), user_store.clone(), cx); channel::init(&client, user_store.clone(), cx); - //todo(notifications) - // notifications::init(client.clone(), user_store, cx); + notifications::init(client.clone(), user_store, cx); }); client @@ -247,8 +246,7 @@ impl TestServer { app_state, username: name.to_string(), channel_store: cx.read(ChannelStore::global).clone(), - // todo!(notifications) - // notification_store: cx.read(NotificationStore::global).clone(), + notification_store: cx.read(NotificationStore::global).clone(), state: Default::default(), }; client.wait_for_current_user(cx).await; @@ -456,10 +454,9 @@ impl TestClient { &self.channel_store } - // todo!(notifications) - // pub fn notification_store(&self) -> &Model { - // &self.notification_store - // } + pub fn notification_store(&self) -> &Model { + &self.notification_store + } pub fn user_store(&self) -> &Model { &self.app_state.user_store diff --git a/crates/notifications2/Cargo.toml b/crates/notifications2/Cargo.toml new file mode 100644 index 0000000000..0720772a61 --- /dev/null +++ b/crates/notifications2/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "notifications2" +version = "0.1.0" +edition = "2021" +publish = false + +[lib] +path = "src/notification_store2.rs" +doctest = false + +[features] +test-support = [ + "channel/test-support", + "collections/test-support", + "gpui/test-support", + "rpc/test-support", +] + +[dependencies] +channel = { package = "channel2", path = "../channel2" } +client = { package = "client2", path = "../client2" } +clock = { path = "../clock" } +collections = { path = "../collections" } +db = { package = "db2", path = "../db2" } +feature_flags = { package = "feature_flags2", path = "../feature_flags2" } +gpui = { package = "gpui2", path = "../gpui2" } +rpc = { package = "rpc2", path = "../rpc2" } +settings = { package = "settings2", path = "../settings2" } +sum_tree = { path = "../sum_tree" } +text = { package = "text2", path = "../text2" } +util = { path = "../util" } + +anyhow.workspace = true +time.workspace = true + +[dev-dependencies] +client = { package = "client2", path = "../client2", features = ["test-support"] } +collections = { path = "../collections", features = ["test-support"] } +gpui = { package = "gpui2", path = "../gpui2", features = ["test-support"] } +rpc = { package = "rpc2", path = "../rpc2", features = ["test-support"] } +settings = { package = "settings2", path = "../settings2", features = ["test-support"] } +util = { path = "../util", features = ["test-support"] } diff --git a/crates/notifications2/src/notification_store2.rs b/crates/notifications2/src/notification_store2.rs new file mode 100644 index 0000000000..ca474cd0c4 --- /dev/null +++ b/crates/notifications2/src/notification_store2.rs @@ -0,0 +1,466 @@ +use anyhow::{Context, Result}; +use channel::{ChannelMessage, ChannelMessageId, ChannelStore}; +use client::{Client, UserStore}; +use collections::HashMap; +use db::smol::stream::StreamExt; +use gpui::{AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task}; +use rpc::{proto, Notification, TypedEnvelope}; +use std::{ops::Range, sync::Arc}; +use sum_tree::{Bias, SumTree}; +use time::OffsetDateTime; +use util::ResultExt; + +pub fn init(client: Arc, user_store: Model, cx: &mut AppContext) { + let notification_store = cx.build_model(|cx| NotificationStore::new(client, user_store, cx)); + cx.set_global(notification_store); +} + +pub struct NotificationStore { + client: Arc, + user_store: Model, + channel_messages: HashMap, + channel_store: Model, + notifications: SumTree, + loaded_all_notifications: bool, + _watch_connection_status: Task>, + _subscriptions: Vec, +} + +#[derive(Clone, PartialEq, Eq, Debug)] +pub enum NotificationEvent { + NotificationsUpdated { + old_range: Range, + new_count: usize, + }, + NewNotification { + entry: NotificationEntry, + }, + NotificationRemoved { + entry: NotificationEntry, + }, + NotificationRead { + entry: NotificationEntry, + }, +} + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct NotificationEntry { + pub id: u64, + pub notification: Notification, + pub timestamp: OffsetDateTime, + pub is_read: bool, + pub response: Option, +} + +#[derive(Clone, Debug, Default)] +pub struct NotificationSummary { + max_id: u64, + count: usize, + unread_count: usize, +} + +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] +struct Count(usize); + +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] +struct UnreadCount(usize); + +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] +struct NotificationId(u64); + +impl NotificationStore { + pub fn global(cx: &AppContext) -> Model { + cx.global::>().clone() + } + + pub fn new( + client: Arc, + user_store: Model, + cx: &mut ModelContext, + ) -> Self { + let mut connection_status = client.status(); + let watch_connection_status = cx.spawn(|this, mut cx| async move { + while let Some(status) = connection_status.next().await { + let this = this.upgrade()?; + match status { + client::Status::Connected { .. } => { + if let Some(task) = this + .update(&mut cx, |this, cx| this.handle_connect(cx)) + .log_err()? + { + task.await.log_err()?; + } + } + _ => this + .update(&mut cx, |this, cx| this.handle_disconnect(cx)) + .log_err()?, + } + } + Some(()) + }); + + Self { + channel_store: ChannelStore::global(cx), + notifications: Default::default(), + loaded_all_notifications: false, + channel_messages: Default::default(), + _watch_connection_status: watch_connection_status, + _subscriptions: vec![ + client.add_message_handler(cx.weak_model(), Self::handle_new_notification), + client.add_message_handler(cx.weak_model(), Self::handle_delete_notification), + ], + user_store, + client, + } + } + + pub fn notification_count(&self) -> usize { + self.notifications.summary().count + } + + pub fn unread_notification_count(&self) -> usize { + self.notifications.summary().unread_count + } + + pub fn channel_message_for_id(&self, id: u64) -> Option<&ChannelMessage> { + self.channel_messages.get(&id) + } + + // Get the nth newest notification. + pub fn notification_at(&self, ix: usize) -> Option<&NotificationEntry> { + let count = self.notifications.summary().count; + if ix >= count { + return None; + } + let ix = count - 1 - ix; + let mut cursor = self.notifications.cursor::(); + cursor.seek(&Count(ix), Bias::Right, &()); + cursor.item() + } + + pub fn notification_for_id(&self, id: u64) -> Option<&NotificationEntry> { + let mut cursor = self.notifications.cursor::(); + cursor.seek(&NotificationId(id), Bias::Left, &()); + if let Some(item) = cursor.item() { + if item.id == id { + return Some(item); + } + } + None + } + + pub fn load_more_notifications( + &self, + clear_old: bool, + cx: &mut ModelContext, + ) -> Option>> { + if self.loaded_all_notifications && !clear_old { + return None; + } + + let before_id = if clear_old { + None + } else { + self.notifications.first().map(|entry| entry.id) + }; + let request = self.client.request(proto::GetNotifications { before_id }); + Some(cx.spawn(|this, mut cx| async move { + let this = this + .upgrade() + .context("Notification store was dropped while loading notifications")?; + + let response = request.await?; + this.update(&mut cx, |this, _| { + this.loaded_all_notifications = response.done + })?; + Self::add_notifications( + this, + response.notifications, + AddNotificationsOptions { + is_new: false, + clear_old, + includes_first: response.done, + }, + cx, + ) + .await?; + Ok(()) + })) + } + + fn handle_connect(&mut self, cx: &mut ModelContext) -> Option>> { + self.notifications = Default::default(); + self.channel_messages = Default::default(); + cx.notify(); + self.load_more_notifications(true, cx) + } + + fn handle_disconnect(&mut self, cx: &mut ModelContext) { + cx.notify() + } + + async fn handle_new_notification( + this: Model, + envelope: TypedEnvelope, + _: Arc, + cx: AsyncAppContext, + ) -> Result<()> { + Self::add_notifications( + this, + envelope.payload.notification.into_iter().collect(), + AddNotificationsOptions { + is_new: true, + clear_old: false, + includes_first: false, + }, + cx, + ) + .await + } + + async fn handle_delete_notification( + this: Model, + envelope: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + this.update(&mut cx, |this, cx| { + this.splice_notifications([(envelope.payload.notification_id, None)], false, cx); + Ok(()) + })? + } + + async fn add_notifications( + this: Model, + notifications: Vec, + options: AddNotificationsOptions, + mut cx: AsyncAppContext, + ) -> Result<()> { + let mut user_ids = Vec::new(); + let mut message_ids = Vec::new(); + + let notifications = notifications + .into_iter() + .filter_map(|message| { + Some(NotificationEntry { + id: message.id, + is_read: message.is_read, + timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64) + .ok()?, + notification: Notification::from_proto(&message)?, + response: message.response, + }) + }) + .collect::>(); + if notifications.is_empty() { + return Ok(()); + } + + for entry in ¬ifications { + match entry.notification { + Notification::ChannelInvitation { inviter_id, .. } => { + user_ids.push(inviter_id); + } + Notification::ContactRequest { + sender_id: requester_id, + } => { + user_ids.push(requester_id); + } + Notification::ContactRequestAccepted { + responder_id: contact_id, + } => { + user_ids.push(contact_id); + } + Notification::ChannelMessageMention { + sender_id, + message_id, + .. + } => { + user_ids.push(sender_id); + message_ids.push(message_id); + } + } + } + + let (user_store, channel_store) = this.read_with(&cx, |this, _| { + (this.user_store.clone(), this.channel_store.clone()) + })?; + + user_store + .update(&mut cx, |store, cx| store.get_users(user_ids, cx))? + .await?; + let messages = channel_store + .update(&mut cx, |store, cx| { + store.fetch_channel_messages(message_ids, cx) + })? + .await?; + this.update(&mut cx, |this, cx| { + if options.clear_old { + cx.emit(NotificationEvent::NotificationsUpdated { + old_range: 0..this.notifications.summary().count, + new_count: 0, + }); + this.notifications = SumTree::default(); + this.channel_messages.clear(); + this.loaded_all_notifications = false; + } + + if options.includes_first { + this.loaded_all_notifications = true; + } + + this.channel_messages + .extend(messages.into_iter().filter_map(|message| { + if let ChannelMessageId::Saved(id) = message.id { + Some((id, message)) + } else { + None + } + })); + + this.splice_notifications( + notifications + .into_iter() + .map(|notification| (notification.id, Some(notification))), + options.is_new, + cx, + ); + }) + .log_err(); + + Ok(()) + } + + fn splice_notifications( + &mut self, + notifications: impl IntoIterator)>, + is_new: bool, + cx: &mut ModelContext<'_, NotificationStore>, + ) { + let mut cursor = self.notifications.cursor::<(NotificationId, Count)>(); + let mut new_notifications = SumTree::new(); + let mut old_range = 0..0; + + for (i, (id, new_notification)) in notifications.into_iter().enumerate() { + new_notifications.append(cursor.slice(&NotificationId(id), Bias::Left, &()), &()); + + if i == 0 { + old_range.start = cursor.start().1 .0; + } + + let old_notification = cursor.item(); + if let Some(old_notification) = old_notification { + if old_notification.id == id { + cursor.next(&()); + + if let Some(new_notification) = &new_notification { + if new_notification.is_read { + cx.emit(NotificationEvent::NotificationRead { + entry: new_notification.clone(), + }); + } + } else { + cx.emit(NotificationEvent::NotificationRemoved { + entry: old_notification.clone(), + }); + } + } + } else if let Some(new_notification) = &new_notification { + if is_new { + cx.emit(NotificationEvent::NewNotification { + entry: new_notification.clone(), + }); + } + } + + if let Some(notification) = new_notification { + new_notifications.push(notification, &()); + } + } + + old_range.end = cursor.start().1 .0; + let new_count = new_notifications.summary().count - old_range.start; + new_notifications.append(cursor.suffix(&()), &()); + drop(cursor); + + self.notifications = new_notifications; + cx.emit(NotificationEvent::NotificationsUpdated { + old_range, + new_count, + }); + } + + pub fn respond_to_notification( + &mut self, + notification: Notification, + response: bool, + cx: &mut ModelContext, + ) { + match notification { + Notification::ContactRequest { sender_id } => { + self.user_store + .update(cx, |store, cx| { + store.respond_to_contact_request(sender_id, response, cx) + }) + .detach(); + } + Notification::ChannelInvitation { channel_id, .. } => { + self.channel_store + .update(cx, |store, cx| { + store.respond_to_channel_invite(channel_id, response, cx) + }) + .detach(); + } + _ => {} + } + } +} + +impl EventEmitter for NotificationStore {} + +impl sum_tree::Item for NotificationEntry { + type Summary = NotificationSummary; + + fn summary(&self) -> Self::Summary { + NotificationSummary { + max_id: self.id, + count: 1, + unread_count: if self.is_read { 0 } else { 1 }, + } + } +} + +impl sum_tree::Summary for NotificationSummary { + type Context = (); + + fn add_summary(&mut self, summary: &Self, _: &()) { + self.max_id = self.max_id.max(summary.max_id); + self.count += summary.count; + self.unread_count += summary.unread_count; + } +} + +impl<'a> sum_tree::Dimension<'a, NotificationSummary> for NotificationId { + fn add_summary(&mut self, summary: &NotificationSummary, _: &()) { + debug_assert!(summary.max_id > self.0); + self.0 = summary.max_id; + } +} + +impl<'a> sum_tree::Dimension<'a, NotificationSummary> for Count { + fn add_summary(&mut self, summary: &NotificationSummary, _: &()) { + self.0 += summary.count; + } +} + +impl<'a> sum_tree::Dimension<'a, NotificationSummary> for UnreadCount { + fn add_summary(&mut self, summary: &NotificationSummary, _: &()) { + self.0 += summary.unread_count; + } +} + +struct AddNotificationsOptions { + is_new: bool, + clear_old: bool, + includes_first: bool, +}