Make notification db representation more flexible

This commit is contained in:
Max Brunsfeld 2023-10-12 17:17:45 -07:00
parent fed3ffb681
commit 3241128840
17 changed files with 197 additions and 175 deletions

1
Cargo.lock generated
View File

@ -6423,6 +6423,7 @@ dependencies = [
"rsa 0.4.0",
"serde",
"serde_derive",
"serde_json",
"smol",
"smol-timeout",
"strum",

View File

@ -314,7 +314,7 @@ CREATE TABLE IF NOT EXISTS "observed_channel_messages" (
CREATE UNIQUE INDEX "index_observed_channel_messages_user_and_channel_id" ON "observed_channel_messages" ("user_id", "channel_id");
CREATE TABLE "notification_kinds" (
"id" INTEGER PRIMARY KEY NOT NULL,
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"name" VARCHAR NOT NULL
);
@ -322,13 +322,12 @@ CREATE UNIQUE INDEX "index_notification_kinds_on_name" ON "notification_kinds" (
CREATE TABLE "notifications" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"is_read" BOOLEAN NOT NULL DEFAULT FALSE,
"created_at" TIMESTAMP NOT NULL default CURRENT_TIMESTAMP,
"recipient_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE,
"actor_id" INTEGER REFERENCES users (id) ON DELETE CASCADE,
"kind" INTEGER NOT NULL REFERENCES notification_kinds (id),
"is_read" BOOLEAN NOT NULL DEFAULT FALSE,
"entity_id_1" INTEGER,
"entity_id_2" INTEGER,
"entity_id_3" INTEGER
"content" TEXT
);
CREATE INDEX "index_notifications_on_recipient_id" ON "notifications" ("recipient_id");

View File

@ -1,5 +1,5 @@
CREATE TABLE "notification_kinds" (
"id" INTEGER PRIMARY KEY NOT NULL,
"id" SERIAL PRIMARY KEY,
"name" VARCHAR NOT NULL
);
@ -7,13 +7,12 @@ CREATE UNIQUE INDEX "index_notification_kinds_on_name" ON "notification_kinds" (
CREATE TABLE notifications (
"id" SERIAL PRIMARY KEY,
"is_read" BOOLEAN NOT NULL DEFAULT FALSE,
"created_at" TIMESTAMP NOT NULL DEFAULT now(),
"recipient_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE,
"actor_id" INTEGER REFERENCES users (id) ON DELETE CASCADE,
"kind" INTEGER NOT NULL REFERENCES notification_kinds (id),
"is_read" BOOLEAN NOT NULL DEFAULT FALSE,
"entity_id_1" INTEGER,
"entity_id_2" INTEGER,
"entity_id_3" INTEGER
"content" TEXT
);
CREATE INDEX "index_notifications_on_recipient_id" ON "notifications" ("recipient_id");

View File

@ -55,6 +55,8 @@ pub struct Database {
rooms: DashMap<RoomId, Arc<Mutex<()>>>,
rng: Mutex<StdRng>,
executor: Executor,
notification_kinds_by_id: HashMap<NotificationKindId, &'static str>,
notification_kinds_by_name: HashMap<String, NotificationKindId>,
#[cfg(test)]
runtime: Option<tokio::runtime::Runtime>,
}
@ -69,6 +71,8 @@ impl Database {
pool: sea_orm::Database::connect(options).await?,
rooms: DashMap::with_capacity(16384),
rng: Mutex::new(StdRng::seed_from_u64(0)),
notification_kinds_by_id: HashMap::default(),
notification_kinds_by_name: HashMap::default(),
executor,
#[cfg(test)]
runtime: None,
@ -121,6 +125,11 @@ impl Database {
Ok(new_migrations)
}
pub async fn initialize_static_data(&mut self) -> Result<()> {
self.initialize_notification_enum().await?;
Ok(())
}
pub async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
where
F: Send + Fn(TransactionHandle) -> Fut,

View File

@ -81,3 +81,4 @@ id_type!(UserId);
id_type!(ChannelBufferCollaboratorId);
id_type!(FlagId);
id_type!(NotificationId);
id_type!(NotificationKindId);

View File

@ -165,18 +165,18 @@ impl Database {
.exec_without_returning(&*tx)
.await?;
if rows_affected == 1 {
self.create_notification(
receiver_id,
rpc::Notification::ContactRequest {
requester_id: sender_id.to_proto(),
},
&*tx,
)
.await
} else {
Err(anyhow!("contact already requested"))?
if rows_affected == 0 {
Err(anyhow!("contact already requested"))?;
}
self.create_notification(
receiver_id,
rpc::Notification::ContactRequest {
actor_id: sender_id.to_proto(),
},
&*tx,
)
.await
})
.await
}
@ -260,7 +260,7 @@ impl Database {
responder_id: UserId,
requester_id: UserId,
accept: bool,
) -> Result<()> {
) -> Result<proto::Notification> {
self.transaction(|tx| async move {
let (id_a, id_b, a_to_b) = if responder_id < requester_id {
(responder_id, requester_id, false)
@ -298,11 +298,18 @@ impl Database {
result.rows_affected
};
if rows_affected == 1 {
Ok(())
} else {
if rows_affected == 0 {
Err(anyhow!("no such contact request"))?
}
self.create_notification(
requester_id,
rpc::Notification::ContactRequestAccepted {
actor_id: responder_id.to_proto(),
},
&*tx,
)
.await
})
.await
}

View File

@ -1,21 +1,25 @@
use super::*;
use rpc::{Notification, NotificationKind};
use rpc::Notification;
impl Database {
pub async fn ensure_notification_kinds(&self) -> Result<()> {
self.transaction(|tx| async move {
notification_kind::Entity::insert_many(NotificationKind::all().map(|kind| {
notification_kind::ActiveModel {
id: ActiveValue::Set(kind as i32),
name: ActiveValue::Set(kind.to_string()),
}
}))
.on_conflict(OnConflict::new().do_nothing().to_owned())
.exec(&*tx)
.await?;
Ok(())
})
.await
pub async fn initialize_notification_enum(&mut self) -> Result<()> {
notification_kind::Entity::insert_many(Notification::all_kinds().iter().map(|kind| {
notification_kind::ActiveModel {
name: ActiveValue::Set(kind.to_string()),
..Default::default()
}
}))
.on_conflict(OnConflict::new().do_nothing().to_owned())
.exec_without_returning(&self.pool)
.await?;
let mut rows = notification_kind::Entity::find().stream(&self.pool).await?;
while let Some(row) = rows.next().await {
let row = row?;
self.notification_kinds_by_name.insert(row.name, row.id);
}
Ok(())
}
pub async fn get_notifications(
@ -33,14 +37,16 @@ impl Database {
.await?;
while let Some(row) = rows.next().await {
let row = row?;
let Some(kind) = self.notification_kinds_by_id.get(&row.kind) else {
continue;
};
result.notifications.push(proto::Notification {
id: row.id.to_proto(),
kind: row.kind as u32,
kind: kind.to_string(),
timestamp: row.created_at.assume_utc().unix_timestamp() as u64,
is_read: row.is_read,
entity_id_1: row.entity_id_1.map(|id| id as u64),
entity_id_2: row.entity_id_2.map(|id| id as u64),
entity_id_3: row.entity_id_3.map(|id| id as u64),
content: row.content,
actor_id: row.actor_id.map(|id| id.to_proto()),
});
}
result.notifications.reverse();
@ -55,26 +61,31 @@ impl Database {
notification: Notification,
tx: &DatabaseTransaction,
) -> Result<proto::Notification> {
let (kind, associated_entities) = notification.to_parts();
let notification = notification.to_any();
let kind = *self
.notification_kinds_by_name
.get(notification.kind.as_ref())
.ok_or_else(|| anyhow!("invalid notification kind {:?}", notification.kind))?;
let model = notification::ActiveModel {
recipient_id: ActiveValue::Set(recipient_id),
kind: ActiveValue::Set(kind as i32),
entity_id_1: ActiveValue::Set(associated_entities[0].map(|id| id as i32)),
entity_id_2: ActiveValue::Set(associated_entities[1].map(|id| id as i32)),
entity_id_3: ActiveValue::Set(associated_entities[2].map(|id| id as i32)),
..Default::default()
kind: ActiveValue::Set(kind),
content: ActiveValue::Set(notification.content.clone()),
actor_id: ActiveValue::Set(notification.actor_id.map(|id| UserId::from_proto(id))),
is_read: ActiveValue::NotSet,
created_at: ActiveValue::NotSet,
id: ActiveValue::NotSet,
}
.save(&*tx)
.await?;
Ok(proto::Notification {
id: model.id.as_ref().to_proto(),
kind: *model.kind.as_ref() as u32,
kind: notification.kind.to_string(),
timestamp: model.created_at.as_ref().assume_utc().unix_timestamp() as u64,
is_read: false,
entity_id_1: model.entity_id_1.as_ref().map(|id| id as u64),
entity_id_2: model.entity_id_2.as_ref().map(|id| id as u64),
entity_id_3: model.entity_id_3.as_ref().map(|id| id as u64),
content: notification.content,
actor_id: notification.actor_id,
})
}
}

View File

@ -1,4 +1,4 @@
use crate::db::{NotificationId, UserId};
use crate::db::{NotificationId, NotificationKindId, UserId};
use sea_orm::entity::prelude::*;
use time::PrimitiveDateTime;
@ -7,13 +7,12 @@ use time::PrimitiveDateTime;
pub struct Model {
#[sea_orm(primary_key)]
pub id: NotificationId,
pub recipient_id: UserId,
pub kind: i32,
pub is_read: bool,
pub created_at: PrimitiveDateTime,
pub entity_id_1: Option<i32>,
pub entity_id_2: Option<i32>,
pub entity_id_3: Option<i32>,
pub recipient_id: UserId,
pub actor_id: Option<UserId>,
pub kind: NotificationKindId,
pub content: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -1,10 +1,11 @@
use crate::db::NotificationKindId;
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "notification_kinds")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub id: NotificationKindId,
pub name: String,
}

View File

@ -31,7 +31,7 @@ impl TestDb {
let mut db = runtime.block_on(async {
let mut options = ConnectOptions::new(url);
options.max_connections(5);
let db = Database::new(options, Executor::Deterministic(background))
let mut db = Database::new(options, Executor::Deterministic(background))
.await
.unwrap();
let sql = include_str!(concat!(
@ -45,6 +45,7 @@ impl TestDb {
))
.await
.unwrap();
db.initialize_notification_enum().await.unwrap();
db
});
@ -79,11 +80,12 @@ impl TestDb {
options
.max_connections(5)
.idle_timeout(Duration::from_secs(0));
let db = Database::new(options, Executor::Deterministic(background))
let mut db = Database::new(options, Executor::Deterministic(background))
.await
.unwrap();
let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
db.migrate(Path::new(migrations_path), false).await.unwrap();
db.initialize_notification_enum().await.unwrap();
db
});

View File

@ -119,7 +119,9 @@ impl AppState {
pub async fn new(config: Config) -> Result<Arc<Self>> {
let mut db_options = db::ConnectOptions::new(config.database_url.clone());
db_options.max_connections(config.database_max_connections);
let db = Database::new(db_options, Executor::Production).await?;
let mut db = Database::new(db_options, Executor::Production).await?;
db.initialize_notification_enum().await?;
let live_kit_client = if let Some(((server, key), secret)) = config
.live_kit_server
.as_ref()

View File

@ -291,8 +291,6 @@ impl Server {
let pool = self.connection_pool.clone();
let live_kit_client = self.app_state.live_kit_client.clone();
self.app_state.db.ensure_notification_kinds().await?;
let span = info_span!("start server");
self.executor.spawn_detached(
async move {

View File

@ -185,18 +185,22 @@ impl NotificationPanel {
let text;
let actor;
match entry.notification {
Notification::ContactRequest { requester_id } => {
Notification::ContactRequest {
actor_id: requester_id,
} => {
actor = user_store.get_cached_user(requester_id)?;
icon = "icons/plus.svg";
text = format!("{} wants to add you as a contact", actor.github_login);
}
Notification::ContactRequestAccepted { contact_id } => {
Notification::ContactRequestAccepted {
actor_id: contact_id,
} => {
actor = user_store.get_cached_user(contact_id)?;
icon = "icons/plus.svg";
text = format!("{} accepted your contact invite", actor.github_login);
}
Notification::ChannelInvitation {
inviter_id,
actor_id: inviter_id,
channel_id,
} => {
actor = user_store.get_cached_user(inviter_id)?;
@ -209,7 +213,7 @@ impl NotificationPanel {
);
}
Notification::ChannelMessageMention {
sender_id,
actor_id: sender_id,
channel_id,
message_id,
} => {

View File

@ -3,7 +3,7 @@ use channel::{ChannelMessage, ChannelMessageId, ChannelStore};
use client::{Client, UserStore};
use collections::HashMap;
use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle};
use rpc::{proto, Notification, NotificationKind, TypedEnvelope};
use rpc::{proto, AnyNotification, Notification, TypedEnvelope};
use std::{ops::Range, sync::Arc};
use sum_tree::{Bias, SumTree};
use time::OffsetDateTime;
@ -112,14 +112,11 @@ impl NotificationStore {
is_read: message.is_read,
timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)
.ok()?,
notification: Notification::from_parts(
NotificationKind::from_i32(message.kind as i32)?,
[
message.entity_id_1,
message.entity_id_2,
message.entity_id_3,
],
)?,
notification: Notification::from_any(&AnyNotification {
actor_id: message.actor_id,
kind: message.kind.into(),
content: message.content,
})?,
})
})
.collect::<Vec<_>>();
@ -129,17 +126,24 @@ impl NotificationStore {
for entry in &notifications {
match entry.notification {
Notification::ChannelInvitation { inviter_id, .. } => {
Notification::ChannelInvitation {
actor_id: inviter_id,
..
} => {
user_ids.push(inviter_id);
}
Notification::ContactRequest { requester_id } => {
Notification::ContactRequest {
actor_id: requester_id,
} => {
user_ids.push(requester_id);
}
Notification::ContactRequestAccepted { contact_id } => {
Notification::ContactRequestAccepted {
actor_id: contact_id,
} => {
user_ids.push(contact_id);
}
Notification::ChannelMessageMention {
sender_id,
actor_id: sender_id,
message_id,
..
} => {

View File

@ -17,6 +17,7 @@ clock = { path = "../clock" }
collections = { path = "../collections" }
gpui = { path = "../gpui", optional = true }
util = { path = "../util" }
anyhow.workspace = true
async-lock = "2.4"
async-tungstenite = "0.16"
@ -27,6 +28,7 @@ prost.workspace = true
rand.workspace = true
rsa = "0.4"
serde.workspace = true
serde_json.workspace = true
serde_derive.workspace = true
smol-timeout = "0.6"
strum.workspace = true

View File

@ -1571,10 +1571,9 @@ message AddNotifications {
message Notification {
uint64 id = 1;
uint32 kind = 2;
uint64 timestamp = 3;
bool is_read = 4;
optional uint64 entity_id_1 = 5;
optional uint64 entity_id_2 = 6;
optional uint64 entity_id_3 = 7;
uint64 timestamp = 2;
bool is_read = 3;
string kind = 4;
string content = 5;
optional uint64 actor_id = 6;
}

View File

@ -1,110 +1,94 @@
use strum::{Display, EnumIter, EnumString, IntoEnumIterator};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::borrow::Cow;
use strum::{EnumVariantNames, IntoStaticStr, VariantNames as _};
// An integer indicating a type of notification. The variants' numerical
// values are stored in the database, so they should never be removed
// or changed.
#[repr(i32)]
#[derive(Copy, Clone, Debug, EnumIter, EnumString, Display)]
pub enum NotificationKind {
ContactRequest = 0,
ContactRequestAccepted = 1,
ChannelInvitation = 2,
ChannelMessageMention = 3,
}
const KIND: &'static str = "kind";
const ACTOR_ID: &'static str = "actor_id";
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, EnumVariantNames, IntoStaticStr, Serialize, Deserialize)]
#[serde(tag = "kind")]
pub enum Notification {
ContactRequest {
requester_id: u64,
actor_id: u64,
},
ContactRequestAccepted {
contact_id: u64,
actor_id: u64,
},
ChannelInvitation {
inviter_id: u64,
actor_id: u64,
channel_id: u64,
},
ChannelMessageMention {
sender_id: u64,
actor_id: u64,
channel_id: u64,
message_id: u64,
},
}
#[derive(Debug)]
pub struct AnyNotification {
pub kind: Cow<'static, str>,
pub actor_id: Option<u64>,
pub content: String,
}
impl Notification {
/// Load this notification from its generic representation, which is
/// used to represent it in the database, and in the wire protocol.
///
/// The order in which a given notification type's fields are listed must
/// match the order they're listed in the `to_parts` method, and it must
/// not change, because they're stored in that order in the database.
pub fn from_parts(kind: NotificationKind, entity_ids: [Option<u64>; 3]) -> Option<Self> {
use NotificationKind::*;
Some(match kind {
ContactRequest => Self::ContactRequest {
requester_id: entity_ids[0]?,
},
ContactRequestAccepted => Self::ContactRequest {
requester_id: entity_ids[0]?,
},
ChannelInvitation => Self::ChannelInvitation {
inviter_id: entity_ids[0]?,
channel_id: entity_ids[1]?,
},
ChannelMessageMention => Self::ChannelMessageMention {
sender_id: entity_ids[0]?,
channel_id: entity_ids[1]?,
message_id: entity_ids[2]?,
},
})
}
/// Convert this notification into its generic representation, which is
/// used to represent it in the database, and in the wire protocol.
///
/// The order in which a given notification type's fields are listed must
/// match the order they're listed in the `from_parts` method, and it must
/// not change, because they're stored in that order in the database.
pub fn to_parts(&self) -> (NotificationKind, [Option<u64>; 3]) {
use NotificationKind::*;
match self {
Self::ContactRequest { requester_id } => {
(ContactRequest, [Some(*requester_id), None, None])
}
Self::ContactRequestAccepted { contact_id } => {
(ContactRequest, [Some(*contact_id), None, None])
}
Self::ChannelInvitation {
inviter_id,
channel_id,
} => (
ChannelInvitation,
[Some(*inviter_id), Some(*channel_id), None],
),
Self::ChannelMessageMention {
sender_id,
channel_id,
message_id,
} => (
ChannelMessageMention,
[Some(*sender_id), Some(*channel_id), Some(*message_id)],
),
pub fn to_any(&self) -> AnyNotification {
let kind: &'static str = self.into();
let mut value = serde_json::to_value(self).unwrap();
let mut actor_id = None;
if let Some(value) = value.as_object_mut() {
value.remove("kind");
actor_id = value
.remove("actor_id")
.and_then(|value| Some(value.as_i64()? as u64));
}
AnyNotification {
kind: Cow::Borrowed(kind),
actor_id,
content: serde_json::to_string(&value).unwrap(),
}
}
}
impl NotificationKind {
pub fn all() -> impl Iterator<Item = Self> {
Self::iter()
pub fn from_any(notification: &AnyNotification) -> Option<Self> {
let mut value = serde_json::from_str::<Value>(&notification.content).ok()?;
let object = value.as_object_mut()?;
object.insert(KIND.into(), notification.kind.to_string().into());
if let Some(actor_id) = notification.actor_id {
object.insert(ACTOR_ID.into(), actor_id.into());
}
serde_json::from_value(value).ok()
}
pub fn from_i32(i: i32) -> Option<Self> {
Self::iter().find(|kind| *kind as i32 == i)
pub fn all_kinds() -> &'static [&'static str] {
Self::VARIANTS
}
}
#[test]
fn test_notification() {
// Notifications can be serialized and deserialized.
for notification in [
Notification::ContactRequest { actor_id: 1 },
Notification::ContactRequestAccepted { actor_id: 2 },
Notification::ChannelInvitation {
actor_id: 0,
channel_id: 100,
},
Notification::ChannelMessageMention {
actor_id: 200,
channel_id: 30,
message_id: 1,
},
] {
let serialized = notification.to_any();
let deserialized = Notification::from_any(&serialized).unwrap();
assert_eq!(deserialized, notification);
}
// When notifications are serialized, redundant data is not stored
// in the JSON.
let notification = Notification::ContactRequest { actor_id: 1 };
assert_eq!(notification.to_any().content, "{}");
}