diff --git a/packages/backend/server/migrations/20240815064332_userspace/migration.sql b/packages/backend/server/migrations/20240815064332_userspace/migration.sql new file mode 100644 index 0000000000..331e79427a --- /dev/null +++ b/packages/backend/server/migrations/20240815064332_userspace/migration.sql @@ -0,0 +1,13 @@ +-- CreateTable +CREATE TABLE "user_snapshots" ( + "user_id" VARCHAR NOT NULL, + "id" VARCHAR NOT NULL, + "blob" BYTEA NOT NULL, + "created_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updated_at" TIMESTAMPTZ(3) NOT NULL, + + CONSTRAINT "user_snapshots_pkey" PRIMARY KEY ("user_id","id") +); + +-- AddForeignKey +ALTER TABLE "user_snapshots" ADD CONSTRAINT "user_snapshots_user_id_fkey" FOREIGN KEY ("user_id") REFERENCES "users"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/packages/backend/server/migrations/20240815084618_update_doc_tables/migration.sql b/packages/backend/server/migrations/20240815084618_update_doc_tables/migration.sql new file mode 100644 index 0000000000..4d605374ee --- /dev/null +++ b/packages/backend/server/migrations/20240815084618_update_doc_tables/migration.sql @@ -0,0 +1,14 @@ +/* + Warnings: + + - The primary key for the `updates` table will be changed. If it partially fails, the table could be left without primary key constraint. + +*/ +-- AlterTable +ALTER TABLE "snapshots" ALTER COLUMN "seq" DROP NOT NULL; + +-- AlterTable +ALTER TABLE "updates" DROP CONSTRAINT "updates_pkey", +ALTER COLUMN "created_at" DROP DEFAULT, +ALTER COLUMN "seq" DROP NOT NULL, +ADD CONSTRAINT "updates_pkey" PRIMARY KEY ("workspace_id", "guid", "created_at"); diff --git a/packages/backend/server/schema.prisma b/packages/backend/server/schema.prisma index 0245c1673c..09ff855690 100644 --- a/packages/backend/server/schema.prisma +++ b/packages/backend/server/schema.prisma @@ -32,6 +32,7 @@ model User { sessions UserSession[] aiSessions AiSession[] updatedRuntimeConfigs RuntimeConfig[] + userSnapshots UserSnapshot[] @@index([email]) @@map("users") @@ -235,25 +236,49 @@ model Snapshot { workspaceId String @map("workspace_id") @db.VarChar id String @default(uuid()) @map("guid") @db.VarChar blob Bytes @db.ByteA - seq Int @default(0) @db.Integer state Bytes? @db.ByteA createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(3) // the `updated_at` field will not record the time of record changed, // but the created time of last seen update that has been merged into snapshot. updatedAt DateTime @map("updated_at") @db.Timestamptz(3) + // @deprecated use updatedAt only + seq Int? @default(0) @db.Integer + + // we need to clear all hanging updates and snapshots before enable the foreign key on workspaceId + // workspace Workspace @relation(fields: [workspaceId], references: [id], onDelete: Cascade) + @@id([id, workspaceId]) @@map("snapshots") } +// user snapshots are special snapshots for user storage like personal app settings, distinguished from workspace snapshots +// basically they share the same structure with workspace snapshots +// but for convenience, we don't fork the updates queue and hisotry for user snapshots, until we have to +// which means all operation on user snapshot will happen in-pace +model UserSnapshot { + userId String @map("user_id") @db.VarChar + id String @map("id") @db.VarChar + blob Bytes @db.ByteA + createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(3) + updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz(3) + + user User @relation(fields: [userId], references: [id], onDelete: Cascade) + + @@id([userId, id]) + @@map("user_snapshots") +} + model Update { workspaceId String @map("workspace_id") @db.VarChar id String @map("guid") @db.VarChar - seq Int @db.Integer blob Bytes @db.ByteA - createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(3) + createdAt DateTime @map("created_at") @db.Timestamptz(3) - @@id([workspaceId, id, seq]) + // @deprecated use createdAt only + seq Int? @db.Integer + + @@id([workspaceId, id, createdAt]) @@map("updates") } diff --git a/packages/backend/server/src/app.module.ts b/packages/backend/server/src/app.module.ts index 7c00ab56c5..e9a111501e 100644 --- a/packages/backend/server/src/app.module.ts +++ b/packages/backend/server/src/app.module.ts @@ -10,7 +10,7 @@ import { get } from 'lodash-es'; import { AppController } from './app.controller'; import { AuthModule } from './core/auth'; import { ADD_ENABLED_FEATURES, ServerConfigModule } from './core/config'; -import { DocModule } from './core/doc'; +import { DocStorageModule } from './core/doc'; import { FeatureModule } from './core/features'; import { PermissionModule } from './core/permission'; import { QuotaModule } from './core/quota'; @@ -42,6 +42,7 @@ import { ENABLED_PLUGINS } from './plugins/registry'; export const FunctionalityModules = [ ConfigModule.forRoot(), + ScheduleModule.forRoot(), EventModule, CacheModule, MutexModule, @@ -155,7 +156,7 @@ export function buildAppModule() { .use(UserModule, AuthModule, PermissionModule) // business modules - .use(DocModule) + .use(DocStorageModule) // sync server only .useIf(config => config.flavor.sync, SyncModule) @@ -163,7 +164,6 @@ export function buildAppModule() { // graphql server only .useIf( config => config.flavor.graphql, - ScheduleModule.forRoot(), GqlModule, StorageModule, ServerConfigModule, diff --git a/packages/backend/server/src/core/doc/adapters/userspace.ts b/packages/backend/server/src/core/doc/adapters/userspace.ts new file mode 100644 index 0000000000..14165ba5c9 --- /dev/null +++ b/packages/backend/server/src/core/doc/adapters/userspace.ts @@ -0,0 +1,175 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaClient } from '@prisma/client'; + +import { Mutex } from '../../../fundamentals'; +import { DocStorageOptions } from '../options'; +import { DocRecord, DocStorageAdapter } from '../storage'; + +@Injectable() +export class PgUserspaceDocStorageAdapter extends DocStorageAdapter { + constructor( + private readonly db: PrismaClient, + private readonly mutex: Mutex, + options: DocStorageOptions + ) { + super(options); + } + + // no updates queue for userspace, directly merge them inplace + // no history record for userspace + protected async getDocUpdates() { + return []; + } + + protected async markUpdatesMerged() { + return 0; + } + + async listDocHistories() { + return []; + } + + async getDocHistory() { + return null; + } + + protected async createDocHistory() { + return false; + } + + override async getDoc(spaceId: string, docId: string) { + return this.getDocSnapshot(spaceId, docId); + } + + async pushDocUpdates(userId: string, docId: string, updates: Uint8Array[]) { + if (!updates.length) { + return 0; + } + + await using _lock = await this.lockDocForUpdate(userId, docId); + const snapshot = await this.getDocSnapshot(userId, docId); + const now = Date.now(); + const pendings = updates.map((update, i) => ({ + bin: update, + timestamp: now + i, + })); + + const { timestamp, bin } = await this.squash( + snapshot ? [snapshot, ...pendings] : pendings + ); + + await this.setDocSnapshot({ + spaceId: userId, + docId, + bin, + timestamp, + }); + + return timestamp; + } + + async deleteDoc(userId: string, docId: string) { + await this.db.userSnapshot.deleteMany({ + where: { + userId, + id: docId, + }, + }); + } + + async deleteSpace(userId: string) { + await this.db.userSnapshot.deleteMany({ + where: { + userId, + }, + }); + } + + async getSpaceDocTimestamps(userId: string, after?: number) { + const snapshots = await this.db.userSnapshot.findMany({ + select: { + id: true, + updatedAt: true, + }, + where: { + userId, + ...(after + ? { + updatedAt: { + gt: new Date(after), + }, + } + : {}), + }, + }); + + const result: Record = {}; + + snapshots.forEach(s => { + result[s.id] = s.updatedAt.getTime(); + }); + + return result; + } + + protected async getDocSnapshot(userId: string, docId: string) { + const snapshot = await this.db.userSnapshot.findUnique({ + where: { + userId_id: { + userId, + id: docId, + }, + }, + }); + + if (!snapshot) { + return null; + } + + return { + spaceId: userId, + docId, + bin: snapshot.blob, + timestamp: snapshot.updatedAt.getTime(), + }; + } + + protected async setDocSnapshot(snapshot: DocRecord) { + // we always get lock before writing to user snapshot table, + // so a simple upsert without testing on updatedAt is safe + await this.db.snapshot.upsert({ + where: { + id_workspaceId: { + workspaceId: snapshot.spaceId, + id: snapshot.docId, + }, + }, + update: { + blob: Buffer.from(snapshot.bin), + updatedAt: new Date(snapshot.timestamp), + }, + create: { + workspaceId: snapshot.spaceId, + id: snapshot.docId, + blob: Buffer.from(snapshot.bin), + createdAt: new Date(snapshot.timestamp), + updatedAt: new Date(snapshot.timestamp), + }, + }); + + return true; + } + + protected override async lockDocForUpdate( + workspaceId: string, + docId: string + ) { + const lock = await this.mutex.lock(`userspace:${workspaceId}:${docId}`); + + if (!lock) { + throw new Error('Too many concurrent writings'); + } + + return lock; + } +} diff --git a/packages/backend/server/src/core/doc/adapters/workspace.ts b/packages/backend/server/src/core/doc/adapters/workspace.ts new file mode 100644 index 0000000000..2325595c08 --- /dev/null +++ b/packages/backend/server/src/core/doc/adapters/workspace.ts @@ -0,0 +1,594 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { PrismaClient } from '@prisma/client'; +import { chunk } from 'lodash-es'; + +import { + Cache, + DocHistoryNotFound, + DocNotFound, + FailedToSaveUpdates, + FailedToUpsertSnapshot, + metrics, + Mutex, +} from '../../../fundamentals'; +import { retryable } from '../../../fundamentals/utils/promise'; +import { DocStorageOptions } from '../options'; +import { + DocRecord, + DocStorageAdapter, + DocUpdate, + HistoryFilter, +} from '../storage'; + +const UPDATES_QUEUE_CACHE_KEY = 'doc:manager:updates'; + +@Injectable() +export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter { + private readonly logger = new Logger(PgWorkspaceDocStorageAdapter.name); + + constructor( + private readonly db: PrismaClient, + private readonly mutex: Mutex, + private readonly cache: Cache, + protected override readonly options: DocStorageOptions + ) { + super(options); + } + + async pushDocUpdates( + workspaceId: string, + docId: string, + updates: Uint8Array[] + ) { + if (!updates.length) { + return 0; + } + + let pendings = updates; + let done = 0; + let timestamp = Date.now(); + try { + await retryable(async () => { + if (done !== 0) { + pendings = pendings.slice(done); + } + + // TODO(@forehalo): remove in next release + const lastSeq = await this.getUpdateSeq( + workspaceId, + docId, + updates.length + ); + + let turn = 0; + const batchCount = 10; + for (const batch of chunk(pendings, batchCount)) { + const now = Date.now(); + await this.db.update.createMany({ + data: batch.map((update, i) => { + const subSeq = turn * batchCount + i + 1; + // `seq` is the last seq num of the batch + // example for 11 batched updates, start from seq num 20 + // seq for first update in the batch should be: + // 31 - 11 + subSeq(0 * 10 + 0 + 1) = 21 + // ^ last seq num ^ updates.length ^ turn ^ batchCount ^i + const seq = lastSeq - updates.length + subSeq; + const createdAt = now + subSeq; + timestamp = Math.max(timestamp, createdAt); + + return { + workspaceId, + id: docId, + blob: Buffer.from(update), + seq, + createdAt: new Date(createdAt), + }; + }), + }); + turn++; + done += batch.length; + await this.updateCachedUpdatesCount(workspaceId, docId, batch.length); + } + }); + } catch (e) { + this.logger.error('Failed to insert doc updates', e); + metrics.doc.counter('doc_update_insert_failed').add(1); + throw new FailedToSaveUpdates(); + } + + return timestamp; + } + + protected async getDocUpdates(workspaceId: string, docId: string) { + const rows = await this.db.update.findMany({ + where: { + workspaceId, + id: docId, + }, + orderBy: { + createdAt: 'asc', + }, + }); + + return rows.map(row => ({ + bin: row.blob, + timestamp: row.createdAt.getTime(), + })); + } + + async deleteDoc(workspaceId: string, docId: string) { + const ident = { where: { workspaceId, id: docId } }; + await this.db.$transaction([ + this.db.snapshot.deleteMany(ident), + this.db.update.deleteMany(ident), + this.db.snapshotHistory.deleteMany(ident), + ]); + } + + async deleteSpace(workspaceId: string) { + const ident = { where: { workspaceId } }; + await this.db.$transaction([ + this.db.workspace.deleteMany({ + where: { + id: workspaceId, + }, + }), + this.db.snapshot.deleteMany(ident), + this.db.update.deleteMany(ident), + this.db.snapshotHistory.deleteMany(ident), + ]); + } + + async getSpaceDocTimestamps(workspaceId: string, after?: number) { + const snapshots = await this.db.snapshot.findMany({ + select: { + id: true, + updatedAt: true, + }, + where: { + workspaceId, + ...(after + ? { + updatedAt: { + gt: new Date(after), + }, + } + : {}), + }, + }); + + const updates = await this.db.update.groupBy({ + where: { + workspaceId, + ...(after + ? { + createdAt: { + gt: new Date(after), + }, + } + : {}), + }, + by: ['id'], + _max: { + createdAt: true, + }, + }); + + const result: Record = {}; + + snapshots.forEach(s => { + result[s.id] = s.updatedAt.getTime(); + }); + + updates.forEach(u => { + if (u._max.createdAt) { + result[u.id] = u._max.createdAt.getTime(); + } + }); + + return result; + } + + protected async markUpdatesMerged( + workspaceId: string, + docId: string, + updates: DocUpdate[] + ) { + const result = await this.db.update.deleteMany({ + where: { + workspaceId, + id: docId, + createdAt: { + in: updates.map(u => new Date(u.timestamp)), + }, + }, + }); + + await this.updateCachedUpdatesCount(workspaceId, docId, -result.count); + return result.count; + } + + async listDocHistories( + workspaceId: string, + docId: string, + query: HistoryFilter + ) { + const histories = await this.db.snapshotHistory.findMany({ + select: { + timestamp: true, + }, + where: { + workspaceId, + id: docId, + timestamp: { + lt: query.before ? new Date(query.before) : new Date(), + }, + }, + orderBy: { + timestamp: 'desc', + }, + take: query.limit, + }); + + return histories.map(h => h.timestamp.getTime()); + } + + async getDocHistory(workspaceId: string, docId: string, timestamp: number) { + const history = await this.db.snapshotHistory.findUnique({ + where: { + workspaceId_id_timestamp: { + workspaceId, + id: docId, + timestamp: new Date(timestamp), + }, + }, + }); + + if (!history) { + return null; + } + + return { + spaceId: workspaceId, + docId, + bin: history.blob, + timestamp, + }; + } + + override async rollbackDoc( + spaceId: string, + docId: string, + timestamp: number + ): Promise { + await using _lock = await this.lockDocForUpdate(spaceId, docId); + const toSnapshot = await this.getDocHistory(spaceId, docId, timestamp); + if (!toSnapshot) { + throw new DocHistoryNotFound({ spaceId, docId, timestamp }); + } + + const fromSnapshot = await this.getDocSnapshot(spaceId, docId); + + if (!fromSnapshot) { + throw new DocNotFound({ spaceId, docId }); + } + + // force create a new history record after rollback + await this.createDocHistory(fromSnapshot, true); + // WARN: + // we should never do the snapshot updating in recovering, + // which is not the solution in CRDT. + // let user revert in client and update the data in sync system + // const change = this.generateChangeUpdate(fromSnapshot.bin, toSnapshot.bin); + // await this.pushDocUpdates(spaceId, docId, [change]); + + metrics.doc + .counter('history_recovered_counter', { + description: 'How many times history recovered request happened', + }) + .add(1); + } + + protected async createDocHistory(snapshot: DocRecord, force = false) { + const last = await this.lastDocHistory(snapshot.spaceId, snapshot.docId); + + let shouldCreateHistory = false; + + if (!last) { + // never created + shouldCreateHistory = true; + } else { + const lastHistoryTimestamp = last.timestamp.getTime(); + if (lastHistoryTimestamp === snapshot.timestamp) { + // no change + shouldCreateHistory = false; + } else if ( + // force + force || + // last history created before interval in configs + lastHistoryTimestamp < + snapshot.timestamp - this.options.historyMinInterval(snapshot.spaceId) + ) { + shouldCreateHistory = true; + } + } + + if (shouldCreateHistory) { + if (this.isEmptyBin(snapshot.bin)) { + this.logger.debug( + `Doc is empty, skip creating history record for ${snapshot.docId} in workspace ${snapshot.spaceId}` + ); + return false; + } + + await this.db.snapshotHistory + .create({ + select: { + timestamp: true, + }, + data: { + workspaceId: snapshot.spaceId, + id: snapshot.docId, + timestamp: new Date(snapshot.timestamp), + blob: Buffer.from(snapshot.bin), + expiredAt: new Date( + Date.now() + (await this.options.historyMaxAge(snapshot.spaceId)) + ), + }, + }) + .catch(() => { + // safe to ignore + // only happens when duplicated history record created in multi processes + }); + + metrics.doc + .counter('history_created_counter', { + description: 'How many times the snapshot history created', + }) + .add(1); + this.logger.debug( + `History created for ${snapshot.docId} in workspace ${snapshot.spaceId}.` + ); + return true; + } + + return false; + } + + protected async getDocSnapshot(workspaceId: string, docId: string) { + const snapshot = await this.db.snapshot.findUnique({ + where: { + id_workspaceId: { + workspaceId, + id: docId, + }, + }, + }); + + if (!snapshot) { + return null; + } + + return { + spaceId: workspaceId, + docId, + bin: snapshot.blob, + timestamp: snapshot.updatedAt.getTime(), + }; + } + + protected async setDocSnapshot(snapshot: DocRecord) { + const { spaceId, docId, bin, timestamp } = snapshot; + + if (this.isEmptyBin(bin)) { + return false; + } + + const updatedAt = new Date(timestamp); + + // CONCERNS: + // i. Because we save the real user's last seen action time as `updatedAt`, + // it's possible to simply compare the `updatedAt` to determine if the snapshot is older than the one we are going to save. + // + // ii. Prisma doesn't support `upsert` with additional `where` condition along side unique constraint. + // In our case, we need to manually check the `updatedAt` to avoid overriding the newer snapshot. + // where: { id_workspaceId: {}, updatedAt: { lt: updatedAt } } + // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + try { + const result: { updatedAt: Date }[] = await this.db.$queryRaw` + INSERT INTO "snapshots" ("workspace_id", "guid", "blob", "created_at", "updated_at") + VALUES (${spaceId}, ${docId}, ${bin}, DEFAULT, ${updatedAt}) + ON CONFLICT ("workspace_id", "guid") + DO UPDATE SET "blob" = ${bin}, "updated_at" = ${updatedAt} + WHERE "snapshots"."workspace_id" = ${spaceId} AND "snapshots"."guid" = ${docId} AND "snapshots"."updated_at" <= ${updatedAt} + RETURNING "snapshots"."workspace_id" as "workspaceId", "snapshots"."guid" as "id", "snapshots"."updated_at" as "updatedAt" + `; + + // const result = await this.db.snapshot.upsert({ + // select: { + // updatedAt: true, + // seq: true, + // }, + // where: { + // id_workspaceId: { + // workspaceId, + // id: guid, + // }, + // ⬇️ NOT SUPPORTED BY PRISMA YET + // updatedAt: { + // lt: updatedAt, + // }, + // }, + // update: { + // blob, + // state, + // updatedAt, + // }, + // create: { + // workspaceId, + // id: guid, + // blob, + // state, + // updatedAt, + // seq, + // }, + // }); + + // if the condition `snapshot.updatedAt > updatedAt` is true, by which means the snapshot has already been updated by other process, + // the updates has been applied to current `doc` must have been seen by the other process as well. + // The `updatedSnapshot` will be `undefined` in this case. + const updatedSnapshot = result.at(0); + return !!updatedSnapshot; + } catch (e) { + metrics.doc.counter('snapshot_upsert_failed').add(1); + this.logger.error('Failed to upsert snapshot', e); + throw new FailedToUpsertSnapshot(); + } + } + + protected override async lockDocForUpdate( + workspaceId: string, + docId: string + ) { + const lock = await this.mutex.lock(`doc:update:${workspaceId}:${docId}`); + + if (!lock) { + throw new Error('Too many concurrent writings'); + } + + return lock; + } + + protected async lastDocHistory(workspaceId: string, id: string) { + return this.db.snapshotHistory.findFirst({ + where: { + workspaceId, + id, + }, + select: { + timestamp: true, + state: true, + }, + orderBy: { + timestamp: 'desc', + }, + }); + } + + // for auto merging + async randomDoc() { + const key = await this.cache.mapRandomKey(UPDATES_QUEUE_CACHE_KEY); + + if (key) { + const cachedCount = await this.cache.mapIncrease( + UPDATES_QUEUE_CACHE_KEY, + key, + 0 + ); + + if (cachedCount > 0) { + const [workspaceId, id] = key.split('::'); + const count = await this.db.update.count({ + where: { + workspaceId, + id, + }, + }); + + // FIXME(@forehalo): somehow the update count in cache is not accurate + if (count === 0) { + metrics.doc + .counter('doc_update_count_inconsistent_with_cache') + .add(1); + await this.cache.mapDelete(UPDATES_QUEUE_CACHE_KEY, key); + return null; + } + + return { workspaceId, docId: id }; + } + } + + return null; + } + + private async updateCachedUpdatesCount( + workspaceId: string, + guid: string, + count: number + ) { + const result = await this.cache.mapIncrease( + UPDATES_QUEUE_CACHE_KEY, + `${workspaceId}::${guid}`, + count + ); + + if (result <= 0) { + await this.cache.mapDelete( + UPDATES_QUEUE_CACHE_KEY, + `${workspaceId}::${guid}` + ); + } + } + + /** + * @deprecated + */ + private readonly seqMap = new Map(); + /** + * + * @deprecated updates do not rely on seq number anymore + * + * keep in next release to avoid downtime when upgrading instances + */ + private async getUpdateSeq(workspaceId: string, guid: string, batch = 1) { + const MAX_SEQ_NUM = 0x3fffffff; // u31 + + try { + const { seq } = await this.db.snapshot.update({ + select: { + seq: true, + }, + where: { + id_workspaceId: { + workspaceId, + id: guid, + }, + }, + data: { + seq: { + increment: batch, + }, + }, + }); + + if (!seq) { + return batch; + } + + // reset + if (seq >= MAX_SEQ_NUM) { + await this.db.snapshot.update({ + select: { + seq: true, + }, + where: { + id_workspaceId: { + workspaceId, + id: guid, + }, + }, + data: { + seq: 0, + }, + }); + } + + return seq; + } catch { + // not existing snapshot just count it from 1 + const last = this.seqMap.get(workspaceId + guid) ?? 0; + this.seqMap.set(workspaceId + guid, last + batch); + return last + batch; + } + } +} diff --git a/packages/backend/server/src/core/doc/history.ts b/packages/backend/server/src/core/doc/history.ts deleted file mode 100644 index d0ba7cedc8..0000000000 --- a/packages/backend/server/src/core/doc/history.ts +++ /dev/null @@ -1,253 +0,0 @@ -import { isDeepStrictEqual } from 'node:util'; - -import { Injectable, Logger } from '@nestjs/common'; -import { Cron, CronExpression } from '@nestjs/schedule'; -import { PrismaClient } from '@prisma/client'; - -import type { EventPayload } from '../../fundamentals'; -import { - Config, - DocHistoryNotFound, - DocNotFound, - metrics, - OnEvent, -} from '../../fundamentals'; -import { PermissionService } from '../permission'; -import { QuotaService } from '../quota'; -import { isEmptyBuffer } from './manager'; - -@Injectable() -export class DocHistoryManager { - private readonly logger = new Logger(DocHistoryManager.name); - constructor( - private readonly config: Config, - private readonly db: PrismaClient, - private readonly quota: QuotaService, - private readonly permission: PermissionService - ) {} - - @OnEvent('workspace.deleted') - onWorkspaceDeleted(workspaceId: EventPayload<'workspace.deleted'>) { - return this.db.snapshotHistory.deleteMany({ - where: { - workspaceId, - }, - }); - } - - @OnEvent('snapshot.deleted') - onSnapshotDeleted({ workspaceId, id }: EventPayload<'snapshot.deleted'>) { - return this.db.snapshotHistory.deleteMany({ - where: { - workspaceId, - id, - }, - }); - } - - @OnEvent('snapshot.updated') - async onDocUpdated( - { workspaceId, id, previous }: EventPayload<'snapshot.updated'>, - forceCreate = false - ) { - const last = await this.last(workspaceId, id); - - let shouldCreateHistory = false; - - if (!last) { - // never created - shouldCreateHistory = true; - } else if (last.timestamp === previous.updatedAt) { - // no change - shouldCreateHistory = false; - } else if ( - // force - forceCreate || - // last history created before interval in configs - last.timestamp.getTime() < - previous.updatedAt.getTime() - this.config.doc.history.interval - ) { - shouldCreateHistory = true; - } - - if (shouldCreateHistory) { - // skip the history recording when no actual update on snapshot happended - if (last && isDeepStrictEqual(last.state, previous.state)) { - this.logger.debug( - `State matches, skip creating history record for ${id} in workspace ${workspaceId}` - ); - return; - } - - if (isEmptyBuffer(previous.blob)) { - this.logger.debug( - `Doc is empty, skip creating history record for ${id} in workspace ${workspaceId}` - ); - return; - } - - await this.db.snapshotHistory - .create({ - select: { - timestamp: true, - }, - data: { - workspaceId, - id, - timestamp: previous.updatedAt, - blob: previous.blob, - state: previous.state, - expiredAt: await this.getExpiredDateFromNow(workspaceId), - }, - }) - .catch(() => { - // safe to ignore - // only happens when duplicated history record created in multi processes - }); - metrics.doc - .counter('history_created_counter', { - description: 'How many times the snapshot history created', - }) - .add(1); - this.logger.debug( - `History created for ${id} in workspace ${workspaceId}.` - ); - } - } - - async list( - workspaceId: string, - id: string, - before: Date = new Date(), - take: number = 10 - ) { - return this.db.snapshotHistory.findMany({ - select: { - timestamp: true, - }, - where: { - workspaceId, - id, - timestamp: { - lt: before, - }, - // only include the ones has not expired - expiredAt: { - gt: new Date(), - }, - }, - orderBy: { - timestamp: 'desc', - }, - take, - }); - } - - async count(workspaceId: string, id: string) { - return this.db.snapshotHistory.count({ - where: { - workspaceId, - id, - expiredAt: { - gt: new Date(), - }, - }, - }); - } - - async get(workspaceId: string, id: string, timestamp: Date) { - return this.db.snapshotHistory.findUnique({ - where: { - workspaceId_id_timestamp: { - workspaceId, - id, - timestamp, - }, - expiredAt: { - gt: new Date(), - }, - }, - }); - } - - async last(workspaceId: string, id: string) { - return this.db.snapshotHistory.findFirst({ - where: { - workspaceId, - id, - }, - select: { - timestamp: true, - state: true, - }, - orderBy: { - timestamp: 'desc', - }, - }); - } - - async recover(workspaceId: string, id: string, timestamp: Date) { - const history = await this.db.snapshotHistory.findUnique({ - where: { - workspaceId_id_timestamp: { - workspaceId, - id, - timestamp, - }, - }, - }); - - if (!history) { - throw new DocHistoryNotFound({ - workspaceId, - docId: id, - timestamp: timestamp.getTime(), - }); - } - - const oldSnapshot = await this.db.snapshot.findUnique({ - where: { - id_workspaceId: { - id, - workspaceId, - }, - }, - }); - - if (!oldSnapshot) { - throw new DocNotFound({ workspaceId, docId: id }); - } - - // save old snapshot as one history record - await this.onDocUpdated({ workspaceId, id, previous: oldSnapshot }, true); - // WARN: - // we should never do the snapshot updating in recovering, - // which is not the solution in CRDT. - // let user revert in client and update the data in sync system - // `await this.db.snapshot.update();` - metrics.doc - .counter('history_recovered_counter', { - description: 'How many times history recovered request happened', - }) - .add(1); - - return history.timestamp; - } - - async getExpiredDateFromNow(workspaceId: string) { - const owner = await this.permission.getWorkspaceOwner(workspaceId); - const quota = await this.quota.getUserQuota(owner.id); - return quota.feature.historyPeriodFromNow; - } - - @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT /* everyday at 12am */) - async cleanupExpiredHistory() { - await this.db.snapshotHistory.deleteMany({ - where: { - expiredAt: { - lte: new Date(), - }, - }, - }); - } -} diff --git a/packages/backend/server/src/core/doc/index.ts b/packages/backend/server/src/core/doc/index.ts index 5c9720c070..55698352ca 100644 --- a/packages/backend/server/src/core/doc/index.ts +++ b/packages/backend/server/src/core/doc/index.ts @@ -4,14 +4,22 @@ import { Module } from '@nestjs/common'; import { PermissionModule } from '../permission'; import { QuotaModule } from '../quota'; -import { DocHistoryManager } from './history'; -import { DocManager } from './manager'; +import { PgUserspaceDocStorageAdapter } from './adapters/userspace'; +import { PgWorkspaceDocStorageAdapter } from './adapters/workspace'; +import { DocStorageCronJob } from './job'; +import { DocStorageOptions } from './options'; @Module({ imports: [QuotaModule, PermissionModule], - providers: [DocManager, DocHistoryManager], - exports: [DocManager, DocHistoryManager], + providers: [ + DocStorageOptions, + PgWorkspaceDocStorageAdapter, + PgUserspaceDocStorageAdapter, + DocStorageCronJob, + ], + exports: [PgWorkspaceDocStorageAdapter, PgUserspaceDocStorageAdapter], }) -export class DocModule {} +export class DocStorageModule {} +export { PgUserspaceDocStorageAdapter, PgWorkspaceDocStorageAdapter }; -export { DocHistoryManager, DocManager }; +export { DocStorageAdapter } from './storage'; diff --git a/packages/backend/server/src/core/doc/job.ts b/packages/backend/server/src/core/doc/job.ts new file mode 100644 index 0000000000..b6ecc2d382 --- /dev/null +++ b/packages/backend/server/src/core/doc/job.ts @@ -0,0 +1,76 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { Cron, CronExpression, SchedulerRegistry } from '@nestjs/schedule'; +import { PrismaClient } from '@prisma/client'; + +import { CallTimer, Config, metrics } from '../../fundamentals'; +import { PgWorkspaceDocStorageAdapter } from './adapters/workspace'; + +@Injectable() +export class DocStorageCronJob implements OnModuleInit { + private readonly logger = new Logger(DocStorageCronJob.name); + private busy = false; + + constructor( + private readonly registry: SchedulerRegistry, + private readonly config: Config, + private readonly db: PrismaClient, + private readonly workspace: PgWorkspaceDocStorageAdapter + ) {} + + onModuleInit() { + if (this.config.doc.manager.enableUpdateAutoMerging) { + this.registry.addInterval( + this.autoMergePendingDocUpdates.name, + // scheduler registry will clean up the interval when the app is stopped + setInterval(() => { + if (this.busy) { + return; + } + this.busy = true; + this.autoMergePendingDocUpdates() + .catch(() => { + /* never fail */ + }) + .finally(() => { + this.busy = false; + }); + }, this.config.doc.manager.updatePollInterval) + ); + + this.logger.log('Updates pending queue auto merging cron started'); + } + } + + @CallTimer('doc', 'auto_merge_pending_doc_updates') + async autoMergePendingDocUpdates() { + try { + const randomDoc = await this.workspace.randomDoc(); + if (!randomDoc) { + return; + } + + await this.workspace.getDoc(randomDoc.workspaceId, randomDoc.docId); + } catch (e) { + metrics.doc.counter('auto_merge_pending_doc_updates_error').add(1); + this.logger.error('Failed to auto merge pending doc updates', e); + } + } + + @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT /* everyday at 12am */) + async cleanupExpiredHistory() { + await this.db.snapshotHistory.deleteMany({ + where: { + expiredAt: { + lte: new Date(), + }, + }, + }); + } + + @Cron(CronExpression.EVERY_MINUTE) + async reportUpdatesQueueCount() { + metrics.doc + .gauge('updates_queue_count') + .record(await this.db.update.count()); + } +} diff --git a/packages/backend/server/src/core/doc/manager.ts b/packages/backend/server/src/core/doc/manager.ts deleted file mode 100644 index 7701944cb3..0000000000 --- a/packages/backend/server/src/core/doc/manager.ts +++ /dev/null @@ -1,853 +0,0 @@ -import { - Injectable, - Logger, - OnModuleDestroy, - OnModuleInit, -} from '@nestjs/common'; -import { Cron, CronExpression } from '@nestjs/schedule'; -import { PrismaClient, Snapshot, Update } from '@prisma/client'; -import { chunk } from 'lodash-es'; -import { defer, retry } from 'rxjs'; -import { - applyUpdate, - Doc, - encodeStateAsUpdate, - encodeStateVector, - transact, -} from 'yjs'; - -import type { EventPayload } from '../../fundamentals'; -import { - Cache, - CallTimer, - Config, - EventEmitter, - mergeUpdatesInApplyWay as jwstMergeUpdates, - metrics, - OnEvent, -} from '../../fundamentals'; - -function compare(yBinary: Buffer, jwstBinary: Buffer, strict = false): boolean { - if (yBinary.equals(jwstBinary)) { - return true; - } - - if (strict) { - return false; - } - - const doc = new Doc(); - applyUpdate(doc, jwstBinary); - - const yBinary2 = Buffer.from(encodeStateAsUpdate(doc)); - - return compare(yBinary, yBinary2, true); -} - -export function isEmptyBuffer(buf: Buffer): boolean { - return ( - buf.length === 0 || - // 0x0000 - (buf.length === 2 && buf[0] === 0 && buf[1] === 0) - ); -} - -const MAX_SEQ_NUM = 0x3fffffff; // u31 -const UPDATES_QUEUE_CACHE_KEY = 'doc:manager:updates'; - -interface DocResponse { - doc: Doc; - timestamp: number; -} - -interface BinaryResponse { - binary: Buffer; - timestamp: number; -} - -/** - * Since we can't directly save all client updates into database, in which way the database will overload, - * we need to buffer the updates and merge them to reduce db write. - * - * And also, if a new client join, it would be nice to see the latest doc asap, - * so we need to at least store a snapshot of the doc and return quickly, - * along side all the updates that have not been applies to that snapshot(timestamp). - */ -@Injectable() -export class DocManager implements OnModuleInit, OnModuleDestroy { - private readonly logger = new Logger(DocManager.name); - private job: NodeJS.Timeout | null = null; - private readonly seqMap = new Map(); - private busy = false; - - constructor( - private readonly db: PrismaClient, - private readonly config: Config, - private readonly cache: Cache, - private readonly event: EventEmitter - ) {} - - onModuleInit() { - if (this.config.doc.manager.enableUpdateAutoMerging) { - this.logger.log('Use Database'); - this.setup(); - } - } - - onModuleDestroy() { - this.destroy(); - } - - @CallTimer('doc', 'yjs_recover_updates_to_doc') - private recoverDoc(...updates: Buffer[]): Promise { - const doc = new Doc(); - const chunks = chunk(updates, 10); - - return new Promise(resolve => { - const next = () => { - const updates = chunks.shift(); - if (updates?.length) { - transact(doc, () => { - updates.forEach(u => { - try { - applyUpdate(doc, u); - } catch (e) { - this.logger.error('Failed to apply update', e); - } - }); - }); - - // avoid applying too many updates in single round which will take the whole cpu time like dead lock - setImmediate(() => { - next(); - }); - } else { - resolve(doc); - } - }; - - next(); - }); - } - - private async applyUpdates(guid: string, ...updates: Buffer[]): Promise { - const doc = await this.recoverDoc(...updates); - - const useYocto = await this.config.runtime.fetch( - 'doc/experimentalMergeWithYOcto' - ); - // test jwst codec - if (useYocto) { - metrics.jwst.counter('codec_merge_counter').add(1); - const yjsResult = Buffer.from(encodeStateAsUpdate(doc)); - let log = false; - try { - const jwstResult = jwstMergeUpdates(updates); - if (!compare(yjsResult, jwstResult)) { - metrics.jwst.counter('codec_not_match').add(1); - this.logger.warn( - `jwst codec result doesn't match yjs codec result for: ${guid}` - ); - log = true; - if (this.config.node.dev) { - this.logger.warn(`Expected:\n ${yjsResult.toString('hex')}`); - this.logger.warn(`Result:\n ${jwstResult.toString('hex')}`); - } - } - } catch (e) { - metrics.jwst.counter('codec_fails_counter').add(1); - this.logger.warn(`jwst apply update failed for ${guid}: ${e}`); - log = true; - } finally { - if (log && this.config.node.dev) { - this.logger.warn( - `Updates: ${updates.map(u => u.toString('hex')).join('\n')}` - ); - } - } - } - - return doc; - } - - /** - * setup pending update processing loop - */ - setup() { - this.job = setInterval(() => { - if (!this.busy) { - this.busy = true; - this.autoSquash() - .catch(() => { - /* we handle all errors in work itself */ - }) - .finally(() => { - this.busy = false; - }); - } - }, this.config.doc.manager.updatePollInterval); - - this.logger.log('Automation started'); - } - - /** - * stop pending update processing loop - */ - destroy() { - if (this.job) { - clearInterval(this.job); - this.job = null; - this.logger.log('Automation stopped'); - } - } - - @OnEvent('workspace.deleted') - async onWorkspaceDeleted(workspaceId: string) { - await this.db.snapshot.deleteMany({ - where: { - workspaceId, - }, - }); - await this.db.update.deleteMany({ - where: { - workspaceId, - }, - }); - } - - @OnEvent('snapshot.deleted') - async onSnapshotDeleted({ - id, - workspaceId, - }: EventPayload<'snapshot.deleted'>) { - await this.db.update.deleteMany({ - where: { - id, - workspaceId, - }, - }); - } - - /** - * add update to manager for later processing. - */ - async push( - workspaceId: string, - guid: string, - update: Buffer, - retryTimes = 10 - ) { - const timestamp = await new Promise((resolve, reject) => { - defer(async () => { - const seq = await this.getUpdateSeq(workspaceId, guid); - const { createdAt } = await this.db.update.create({ - select: { - createdAt: true, - }, - data: { - workspaceId, - id: guid, - seq, - blob: update, - }, - }); - - return createdAt.getTime(); - }) - .pipe(retry(retryTimes)) // retry until seq num not conflict - .subscribe({ - next: timestamp => { - this.logger.debug( - `pushed 1 update for ${guid} in workspace ${workspaceId}` - ); - resolve(timestamp); - }, - error: e => { - this.logger.error('Failed to push updates', e); - reject(new Error('Failed to push update')); - }, - }); - }); - - await this.updateCachedUpdatesCount(workspaceId, guid, 1); - - return timestamp; - } - - async batchPush( - workspaceId: string, - guid: string, - updates: Buffer[], - retryTimes = 10 - ) { - const timestamp = await new Promise((resolve, reject) => { - defer(async () => { - const lastSeq = await this.getUpdateSeq( - workspaceId, - guid, - updates.length - ); - const now = Date.now(); - let timestamp = now; - let turn = 0; - const batchCount = 10; - for (const batch of chunk(updates, batchCount)) { - await this.db.update.createMany({ - data: batch.map((update, i) => { - const subSeq = turn * batchCount + i + 1; - // `seq` is the last seq num of the batch - // example for 11 batched updates, start from seq num 20 - // seq for first update in the batch should be: - // 31 - 11 + subSeq(0 * 10 + 0 + 1) = 21 - // ^ last seq num ^ updates.length ^ turn ^ batchCount ^i - const seq = lastSeq - updates.length + subSeq; - const createdAt = now + subSeq; - timestamp = Math.max(timestamp, createdAt); - - return { - workspaceId, - id: guid, - blob: update, - seq, - createdAt: new Date(createdAt), // make sure the updates can be ordered by create time - }; - }), - }); - turn++; - } - - return timestamp; - }) - .pipe(retry(retryTimes)) // retry until seq num not conflict - .subscribe({ - next: timestamp => { - this.logger.debug( - `pushed ${updates.length} updates for ${guid} in workspace ${workspaceId}` - ); - resolve(timestamp); - }, - error: e => { - this.logger.error('Failed to push updates', e); - reject(new Error('Failed to push update')); - }, - }); - }); - await this.updateCachedUpdatesCount(workspaceId, guid, updates.length); - - return timestamp; - } - - /** - * Get latest timestamp of all docs in the workspace. - */ - @CallTimer('doc', 'get_doc_timestamps') - async getDocTimestamps(workspaceId: string, after: number | undefined = 0) { - const snapshots = await this.db.snapshot.findMany({ - where: { - workspaceId, - updatedAt: { - gt: new Date(after), - }, - }, - select: { - id: true, - updatedAt: true, - }, - }); - - const updates = await this.db.update.groupBy({ - where: { - workspaceId, - createdAt: { - gt: new Date(after), - }, - }, - by: ['id'], - _max: { - createdAt: true, - }, - }); - - const result: Record = {}; - - snapshots.forEach(s => { - result[s.id] = s.updatedAt.getTime(); - }); - - updates.forEach(u => { - if (u._max.createdAt) { - result[u.id] = u._max.createdAt.getTime(); - } - }); - - return result; - } - - /** - * get the latest doc with all update applied. - */ - async get(workspaceId: string, guid: string): Promise { - const result = await this._get(workspaceId, guid); - if (result) { - if ('doc' in result) { - return result; - } else { - const doc = await this.recoverDoc(result.binary); - - return { - doc, - timestamp: result.timestamp, - }; - } - } - - return null; - } - - /** - * get the latest doc binary with all update applied. - */ - async getBinary( - workspaceId: string, - guid: string - ): Promise { - const result = await this._get(workspaceId, guid); - if (result) { - if ('doc' in result) { - return { - binary: Buffer.from(encodeStateAsUpdate(result.doc)), - timestamp: result.timestamp, - }; - } else { - return result; - } - } - - return null; - } - - /** - * get the latest doc state vector with all update applied. - */ - async getDocState( - workspaceId: string, - guid: string - ): Promise { - const snapshot = await this.getSnapshot(workspaceId, guid); - const updates = await this.getUpdates(workspaceId, guid); - - if (updates.length) { - const { doc, timestamp } = await this.squash(snapshot, updates); - return { - binary: Buffer.from(encodeStateVector(doc)), - timestamp, - }; - } - - return snapshot?.state - ? { - binary: snapshot.state, - timestamp: snapshot.updatedAt.getTime(), - } - : null; - } - - /** - * get the snapshot of the doc we've seen. - */ - async getSnapshot(workspaceId: string, guid: string) { - return this.db.snapshot.findUnique({ - where: { - id_workspaceId: { - workspaceId, - id: guid, - }, - }, - }); - } - - /** - * get pending updates - */ - async getUpdates(workspaceId: string, guid: string) { - const updates = await this.db.update.findMany({ - where: { - workspaceId, - id: guid, - }, - // take it ease, we don't want to overload db and or cpu - // if we limit the taken number here, - // user will never see the latest doc if there are too many updates pending to be merged. - take: this.config.doc.manager.maxUpdatesPullCount, - }); - - // perf(memory): avoid sorting in db - return updates.sort((a, b) => (a.createdAt < b.createdAt ? -1 : 1)); - } - - /** - * apply pending updates to snapshot - */ - private async autoSquash() { - // find the first update and batch process updates with same id - const candidate = await this.getAutoSquashCandidate(); - - // no pending updates - if (!candidate) { - return; - } - - const { id, workspaceId } = candidate; - - await this.lockUpdatesForAutoSquash(workspaceId, id, async () => { - try { - await this._get(workspaceId, id); - } catch (e) { - this.logger.error( - `Failed to apply updates for workspace: ${workspaceId}, guid: ${id}` - ); - this.logger.error(e); - } - }); - } - - private async getAutoSquashCandidate() { - const cache = await this.getAutoSquashCandidateFromCache(); - - if (cache) { - return cache; - } - - return this.db.update.findFirst({ - select: { - id: true, - workspaceId: true, - }, - }); - } - - /** - * @returns whether the snapshot is updated to the latest, `undefined` means the doc to be upserted is outdated. - */ - @CallTimer('doc', 'upsert') - private async upsert( - workspaceId: string, - guid: string, - doc: Doc, - // we always delay the snapshot update to avoid db overload, - // so the value of auto updated `updatedAt` by db will never be accurate to user's real action time - updatedAt: Date, - seq: number - ) { - const blob = Buffer.from(encodeStateAsUpdate(doc)); - - if (isEmptyBuffer(blob)) { - return undefined; - } - - const state = Buffer.from(encodeStateVector(doc)); - - // CONCERNS: - // i. Because we save the real user's last seen action time as `updatedAt`, - // it's possible to simply compare the `updatedAt` to determine if the snapshot is older than the one we are going to save. - // - // ii. Prisma doesn't support `upsert` with additional `where` condition along side unique constraint. - // In our case, we need to manually check the `updatedAt` to avoid overriding the newer snapshot. - // where: { id_workspaceId: {}, updatedAt: { lt: updatedAt } } - // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - // - // iii. Only set the seq number when creating the snapshot. - // For updating scenario, the seq number will be updated when updates pushed to db. - try { - const result: { updatedAt: Date }[] = await this.db.$queryRaw` - INSERT INTO "snapshots" ("workspace_id", "guid", "blob", "state", "seq", "created_at", "updated_at") - VALUES (${workspaceId}, ${guid}, ${blob}, ${state}, ${seq}, DEFAULT, ${updatedAt}) - ON CONFLICT ("workspace_id", "guid") - DO UPDATE SET "blob" = ${blob}, "state" = ${state}, "updated_at" = ${updatedAt}, "seq" = ${seq} - WHERE "snapshots"."workspace_id" = ${workspaceId} AND "snapshots"."guid" = ${guid} AND "snapshots"."updated_at" <= ${updatedAt} - RETURNING "snapshots"."workspace_id" as "workspaceId", "snapshots"."guid" as "id", "snapshots"."updated_at" as "updatedAt" - `; - - // const result = await this.db.snapshot.upsert({ - // select: { - // updatedAt: true, - // seq: true, - // }, - // where: { - // id_workspaceId: { - // workspaceId, - // id: guid, - // }, - // ⬇️ NOT SUPPORTED BY PRISMA YET - // updatedAt: { - // lt: updatedAt, - // }, - // }, - // update: { - // blob, - // state, - // updatedAt, - // }, - // create: { - // workspaceId, - // id: guid, - // blob, - // state, - // updatedAt, - // seq, - // }, - // }); - - // if the condition `snapshot.updatedAt > updatedAt` is true, by which means the snapshot has already been updated by other process, - // the updates has been applied to current `doc` must have been seen by the other process as well. - // The `updatedSnapshot` will be `undefined` in this case. - const updatedSnapshot = result.at(0); - - if (!updatedSnapshot) { - return undefined; - } - - return true; - } catch (e) { - this.logger.error('Failed to upsert snapshot', e); - return false; - } - } - - private async _get( - workspaceId: string, - guid: string - ): Promise { - const snapshot = await this.getSnapshot(workspaceId, guid); - const updates = await this.getUpdates(workspaceId, guid); - - if (updates.length) { - return this.squash(snapshot, updates); - } - - return snapshot - ? { binary: snapshot.blob, timestamp: snapshot.updatedAt.getTime() } - : null; - } - - /** - * Squash updates into a single update and save it as snapshot, - * and delete the updates records at the same time. - */ - @CallTimer('doc', 'squash') - private async squash( - snapshot: Snapshot | null, - updates: Update[] - ): Promise { - if (!updates.length) { - throw new Error('No updates to squash'); - } - - const last = updates[updates.length - 1]; - const { id, workspaceId } = last; - - const doc = await this.applyUpdates( - id, - snapshot ? snapshot.blob : Buffer.from([0, 0]), - ...updates.map(u => u.blob) - ); - - const done = await this.upsert( - workspaceId, - id, - doc, - last.createdAt, - last.seq - ); - - if (done) { - if (snapshot) { - this.event.emit('snapshot.updated', { - id, - workspaceId, - previous: { - blob: snapshot.blob, - state: snapshot.state, - updatedAt: snapshot.updatedAt, - }, - }); - } - - this.logger.debug( - `Squashed ${updates.length} updates for ${id} in workspace ${workspaceId}` - ); - } - - // we will keep the updates only if the upsert failed on unknown reason - // `done === undefined` means the updates is outdated(have already been merged by other process), safe to be deleted - // `done === true` means the upsert is successful, safe to be deleted - if (done !== false) { - // always delete updates - // the upsert will return false if the state is not newer, so we don't need to worry about it - const { count } = await this.db.update.deleteMany({ - where: { - id, - workspaceId, - seq: { - in: updates.map(u => u.seq), - }, - }, - }); - - await this.updateCachedUpdatesCount(workspaceId, id, -count); - } - - return { doc, timestamp: last.createdAt.getTime() }; - } - - private async getUpdateSeq(workspaceId: string, guid: string, batch = 1) { - try { - const { seq } = await this.db.snapshot.update({ - select: { - seq: true, - }, - where: { - id_workspaceId: { - workspaceId, - id: guid, - }, - }, - data: { - seq: { - increment: batch, - }, - }, - }); - - // reset - if (seq >= MAX_SEQ_NUM) { - await this.db.snapshot.update({ - select: { - seq: true, - }, - where: { - id_workspaceId: { - workspaceId, - id: guid, - }, - }, - data: { - seq: 0, - }, - }); - } - - return seq; - } catch { - // not existing snapshot just count it from 1 - const last = this.seqMap.get(workspaceId + guid) ?? 0; - this.seqMap.set(workspaceId + guid, last + batch); - return last + batch; - } - } - - private async updateCachedUpdatesCount( - workspaceId: string, - guid: string, - count: number - ) { - const result = await this.cache.mapIncrease( - UPDATES_QUEUE_CACHE_KEY, - `${workspaceId}::${guid}`, - count - ); - - if (result <= 0) { - await this.cache.mapDelete( - UPDATES_QUEUE_CACHE_KEY, - `${workspaceId}::${guid}` - ); - } - } - - private async getAutoSquashCandidateFromCache() { - const key = await this.cache.mapRandomKey(UPDATES_QUEUE_CACHE_KEY); - - if (key) { - const cachedCount = await this.cache.mapIncrease( - UPDATES_QUEUE_CACHE_KEY, - key, - 0 - ); - - if (cachedCount > 0) { - const [workspaceId, id] = key.split('::'); - const count = await this.db.update.count({ - where: { - workspaceId, - id, - }, - }); - - // FIXME(@forehalo): somehow the update count in cache is not accurate - if (count === 0) { - await this.cache.mapDelete(UPDATES_QUEUE_CACHE_KEY, key); - - return null; - } - return { id, workspaceId }; - } - } - - return null; - } - - private async doWithLock( - lockScope: string, - lockResource: string, - job: () => Promise - ) { - const lock = `lock:${lockScope}:${lockResource}`; - const acquired = await this.cache.setnx(lock, 1, { - ttl: 60 * 1000, - }); - metrics.doc.counter('lock').add(1, { scope: lockScope }); - - if (!acquired) { - metrics.doc.counter('lock_failed').add(1, { scope: lockScope }); - return; - } - metrics.doc.counter('lock_required').add(1, { scope: lockScope }); - - try { - return await job(); - } finally { - await this.cache - .delete(lock) - .then(() => { - metrics.doc.counter('lock_released').add(1, { scope: lockScope }); - }) - .catch(e => { - metrics.doc - .counter('lock_release_failed') - .add(1, { scope: lockScope }); - // safe, the lock will be expired when ttl ends - this.logger.error(`Failed to release lock ${lock}`, e); - }); - } - } - - private async lockUpdatesForAutoSquash( - workspaceId: string, - guid: string, - job: () => Promise - ) { - return this.doWithLock( - 'doc:manager:updates', - `${workspaceId}::${guid}`, - job - ); - } - - @Cron(CronExpression.EVERY_MINUTE) - async reportUpdatesQueueCount() { - metrics.doc - .gauge('updates_queue_count') - .record(await this.db.update.count()); - } -} diff --git a/packages/backend/server/src/core/doc/options.ts b/packages/backend/server/src/core/doc/options.ts new file mode 100644 index 0000000000..477007625f --- /dev/null +++ b/packages/backend/server/src/core/doc/options.ts @@ -0,0 +1,130 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { chunk } from 'lodash-es'; +import * as Y from 'yjs'; + +import { + CallTimer, + Config, + mergeUpdatesInApplyWay as yotcoMergeUpdates, + metrics, +} from '../../fundamentals'; +import { PermissionService } from '../permission'; +import { QuotaService } from '../quota'; +import { DocStorageOptions as IDocStorageOptions } from './storage'; + +function compare(yBinary: Buffer, jwstBinary: Buffer, strict = false): boolean { + if (yBinary.equals(jwstBinary)) { + return true; + } + + if (strict) { + return false; + } + + const doc = new Y.Doc(); + Y.applyUpdate(doc, jwstBinary); + + const yBinary2 = Buffer.from(Y.encodeStateAsUpdate(doc)); + + return compare(yBinary, yBinary2, true); +} + +@Injectable() +export class DocStorageOptions implements IDocStorageOptions { + private readonly logger = new Logger('DocStorageOptions'); + + constructor( + private readonly config: Config, + private readonly permission: PermissionService, + private readonly quota: QuotaService + ) {} + + mergeUpdates = async (updates: Uint8Array[]) => { + const useYocto = await this.config.runtime.fetch( + 'doc/experimentalMergeWithYOcto' + ); + + if (useYocto) { + const doc = await this.recoverDoc(updates); + + metrics.jwst.counter('codec_merge_counter').add(1); + const yjsResult = Buffer.from(Y.encodeStateAsUpdate(doc)); + let log = false; + try { + const yocto = yotcoMergeUpdates(updates.map(Buffer.from)); + if (!compare(yjsResult, yocto)) { + metrics.jwst.counter('codec_not_match').add(1); + this.logger.warn(`yocto codec result doesn't match yjs codec result`); + log = true; + if (this.config.node.dev) { + this.logger.warn(`Expected:\n ${yjsResult.toString('hex')}`); + this.logger.warn(`Result:\n ${yocto.toString('hex')}`); + } + } + } catch (e) { + metrics.jwst.counter('codec_fails_counter').add(1); + this.logger.warn(`jwst apply update failed: ${e}`); + log = true; + } + + if (log && this.config.node.dev) { + this.logger.warn( + `Updates: ${updates.map(u => Buffer.from(u).toString('hex')).join('\n')}` + ); + } + + return yjsResult; + } else { + return this.simpleMergeUpdates(updates); + } + }; + + historyMaxAge = async (spaceId: string) => { + const owner = await this.permission.getWorkspaceOwner(spaceId); + const quota = await this.quota.getUserQuota(owner.id); + return quota.feature.historyPeriod; + }; + + historyMinInterval = (_spaceId: string) => { + return this.config.doc.history.interval; + }; + + @CallTimer('doc', 'yjs_merge_updates') + private simpleMergeUpdates(updates: Uint8Array[]) { + return Y.mergeUpdates(updates); + } + + @CallTimer('doc', 'yjs_recover_updates_to_doc') + private recoverDoc(updates: Uint8Array[]): Promise { + const doc = new Y.Doc(); + const chunks = chunk(updates, 10); + let i = 0; + + return new Promise(resolve => { + Y.transact(doc, () => { + const next = () => { + const updates = chunks.at(i++); + + if (updates?.length) { + updates.forEach(u => { + try { + Y.applyUpdate(doc, u); + } catch (e) { + this.logger.error('Failed to apply update', e); + } + }); + + // avoid applying too many updates in single round which will take the whole cpu time like dead lock + setImmediate(() => { + next(); + }); + } else { + resolve(doc); + } + }; + + next(); + }); + }); + } +} diff --git a/packages/backend/server/src/core/permission/service.ts b/packages/backend/server/src/core/permission/service.ts index 31cb7c324e..27ef6e91af 100644 --- a/packages/backend/server/src/core/permission/service.ts +++ b/packages/backend/server/src/core/permission/service.ts @@ -4,8 +4,8 @@ import { PrismaClient } from '@prisma/client'; import { DocAccessDenied, - WorkspaceAccessDenied, - WorkspaceOwnerNotFound, + SpaceAccessDenied, + SpaceOwnerNotFound, } from '../../fundamentals'; import { Permission, PublicPageMode } from './types'; @@ -69,7 +69,7 @@ export class PermissionService { }); if (!owner) { - throw new WorkspaceOwnerNotFound({ workspaceId }); + throw new SpaceOwnerNotFound({ spaceId: workspaceId }); } return owner.user; @@ -157,7 +157,7 @@ export class PermissionService { permission: Permission = Permission.Read ) { if (!(await this.tryCheckWorkspace(ws, user, permission))) { - throw new WorkspaceAccessDenied({ workspaceId: ws }); + throw new SpaceAccessDenied({ spaceId: ws }); } } @@ -340,7 +340,7 @@ export class PermissionService { permission = Permission.Read ) { if (!(await this.tryCheckPage(ws, page, user, permission))) { - throw new DocAccessDenied({ workspaceId: ws, docId: page }); + throw new DocAccessDenied({ spaceId: ws, docId: page }); } } diff --git a/packages/backend/server/src/core/quota/quota.ts b/packages/backend/server/src/core/quota/quota.ts index 752324a976..95321df902 100644 --- a/packages/backend/server/src/core/quota/quota.ts +++ b/packages/backend/server/src/core/quota/quota.ts @@ -71,10 +71,6 @@ export class QuotaConfig { return this.config.configs.historyPeriod; } - get historyPeriodFromNow() { - return new Date(Date.now() + this.historyPeriod); - } - get memberLimit() { return this.config.configs.memberLimit; } diff --git a/packages/backend/server/src/core/sync/events/events.gateway.ts b/packages/backend/server/src/core/sync/events/events.gateway.ts deleted file mode 100644 index fab58df40b..0000000000 --- a/packages/backend/server/src/core/sync/events/events.gateway.ts +++ /dev/null @@ -1,327 +0,0 @@ -import { applyDecorators, Logger } from '@nestjs/common'; -import { - ConnectedSocket, - MessageBody, - OnGatewayConnection, - OnGatewayDisconnect, - SubscribeMessage as RawSubscribeMessage, - WebSocketGateway, - WebSocketServer, -} from '@nestjs/websockets'; -import { Server, Socket } from 'socket.io'; -import { encodeStateAsUpdate, encodeStateVector } from 'yjs'; - -import { - CallTimer, - Config, - DocNotFound, - GatewayErrorWrapper, - metrics, - NotInWorkspace, - VersionRejected, - WorkspaceAccessDenied, -} from '../../../fundamentals'; -import { Auth, CurrentUser } from '../../auth'; -import { DocManager } from '../../doc'; -import { Permission, PermissionService } from '../../permission'; -import { DocID } from '../../utils/doc'; - -const SubscribeMessage = (event: string) => - applyDecorators( - GatewayErrorWrapper(event), - CallTimer('socketio', 'event_duration', { event }), - RawSubscribeMessage(event) - ); - -type EventResponse = Data extends never - ? { - data?: never; - } - : { - data: Data; - }; - -function Sync(workspaceId: string): `${string}:sync` { - return `${workspaceId}:sync`; -} - -function Awareness(workspaceId: string): `${string}:awareness` { - return `${workspaceId}:awareness`; -} - -@WebSocketGateway() -export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { - protected logger = new Logger(EventsGateway.name); - private connectionCount = 0; - - constructor( - private readonly config: Config, - private readonly docManager: DocManager, - private readonly permissions: PermissionService - ) {} - - @WebSocketServer() - server!: Server; - - handleConnection() { - this.connectionCount++; - metrics.socketio.gauge('realtime_connections').record(this.connectionCount); - } - - handleDisconnect() { - this.connectionCount--; - metrics.socketio.gauge('realtime_connections').record(this.connectionCount); - } - - async assertVersion(client: Socket, version?: string) { - const shouldCheckClientVersion = await this.config.runtime.fetch( - 'flags/syncClientVersionCheck' - ); - if ( - // @todo(@darkskygit): remove this flag after 0.12 goes stable - shouldCheckClientVersion && - version !== AFFiNE.version - ) { - client.emit('server-version-rejected', { - currentVersion: version, - requiredVersion: AFFiNE.version, - reason: `Client version${ - version ? ` ${version}` : '' - } is outdated, please update to ${AFFiNE.version}`, - }); - - throw new VersionRejected({ - version: version || 'unknown', - serverVersion: AFFiNE.version, - }); - } - } - - async joinWorkspace( - client: Socket, - room: `${string}:${'sync' | 'awareness'}` - ) { - await client.join(room); - } - - async leaveWorkspace( - client: Socket, - room: `${string}:${'sync' | 'awareness'}` - ) { - await client.leave(room); - } - - assertInWorkspace(client: Socket, room: `${string}:${'sync' | 'awareness'}`) { - if (!client.rooms.has(room)) { - throw new NotInWorkspace({ workspaceId: room.split(':')[0] }); - } - } - - async assertWorkspaceAccessible( - workspaceId: string, - userId: string, - permission: Permission = Permission.Read - ) { - if ( - !(await this.permissions.isWorkspaceMember( - workspaceId, - userId, - permission - )) - ) { - throw new WorkspaceAccessDenied({ workspaceId }); - } - } - - @Auth() - @SubscribeMessage('client-handshake-sync') - async handleClientHandshakeSync( - @CurrentUser() user: CurrentUser, - @MessageBody('workspaceId') workspaceId: string, - @MessageBody('version') version: string | undefined, - @ConnectedSocket() client: Socket - ): Promise> { - await this.assertVersion(client, version); - await this.assertWorkspaceAccessible( - workspaceId, - user.id, - Permission.Write - ); - - await this.joinWorkspace(client, Sync(workspaceId)); - return { - data: { - clientId: client.id, - }, - }; - } - - @Auth() - @SubscribeMessage('client-handshake-awareness') - async handleClientHandshakeAwareness( - @CurrentUser() user: CurrentUser, - @MessageBody('workspaceId') workspaceId: string, - @MessageBody('version') version: string | undefined, - @ConnectedSocket() client: Socket - ): Promise> { - await this.assertVersion(client, version); - await this.assertWorkspaceAccessible( - workspaceId, - user.id, - Permission.Write - ); - - await this.joinWorkspace(client, Awareness(workspaceId)); - return { - data: { - clientId: client.id, - }, - }; - } - - @SubscribeMessage('client-leave-sync') - async handleLeaveSync( - @MessageBody() workspaceId: string, - @ConnectedSocket() client: Socket - ): Promise { - this.assertInWorkspace(client, Sync(workspaceId)); - await this.leaveWorkspace(client, Sync(workspaceId)); - return {}; - } - - @SubscribeMessage('client-leave-awareness') - async handleLeaveAwareness( - @MessageBody() workspaceId: string, - @ConnectedSocket() client: Socket - ): Promise { - this.assertInWorkspace(client, Awareness(workspaceId)); - await this.leaveWorkspace(client, Awareness(workspaceId)); - return {}; - } - - @SubscribeMessage('client-pre-sync') - async loadDocStats( - @ConnectedSocket() client: Socket, - @MessageBody() - { workspaceId, timestamp }: { workspaceId: string; timestamp?: number } - ): Promise>> { - this.assertInWorkspace(client, Sync(workspaceId)); - - const stats = await this.docManager.getDocTimestamps( - workspaceId, - timestamp - ); - - return { - data: stats, - }; - } - - @SubscribeMessage('client-update-v2') - async handleClientUpdateV2( - @MessageBody() - { - workspaceId, - guid, - updates, - }: { - workspaceId: string; - guid: string; - updates: string[]; - }, - @ConnectedSocket() client: Socket - ): Promise> { - this.assertInWorkspace(client, Sync(workspaceId)); - - const docId = new DocID(guid, workspaceId); - const buffers = updates.map(update => Buffer.from(update, 'base64')); - const timestamp = await this.docManager.batchPush( - docId.workspace, - docId.guid, - buffers - ); - - client - .to(Sync(workspaceId)) - .emit('server-updates', { workspaceId, guid, updates, timestamp }); - - return { - data: { - accepted: true, - timestamp, - }, - }; - } - - @SubscribeMessage('doc-load-v2') - async loadDocV2( - @ConnectedSocket() client: Socket, - @MessageBody() - { - workspaceId, - guid, - stateVector, - }: { - workspaceId: string; - guid: string; - stateVector?: string; - } - ): Promise< - EventResponse<{ missing: string; state?: string; timestamp: number }> - > { - this.assertInWorkspace(client, Sync(workspaceId)); - - const docId = new DocID(guid, workspaceId); - const res = await this.docManager.get(docId.workspace, docId.guid); - - if (!res) { - throw new DocNotFound({ workspaceId, docId: docId.guid }); - } - - const missing = Buffer.from( - encodeStateAsUpdate( - res.doc, - stateVector ? Buffer.from(stateVector, 'base64') : undefined - ) - ).toString('base64'); - const state = Buffer.from(encodeStateVector(res.doc)).toString('base64'); - - return { - data: { - missing, - state, - timestamp: res.timestamp, - }, - }; - } - - @SubscribeMessage('awareness-init') - async handleInitAwareness( - @MessageBody() workspaceId: string, - @ConnectedSocket() client: Socket - ): Promise> { - this.assertInWorkspace(client, Awareness(workspaceId)); - client.to(Awareness(workspaceId)).emit('new-client-awareness-init'); - return { - data: { - clientId: client.id, - }, - }; - } - - @SubscribeMessage('awareness-update') - async handleHelpGatheringAwareness( - @MessageBody() - { - workspaceId, - awarenessUpdate, - }: { workspaceId: string; awarenessUpdate: string }, - @ConnectedSocket() client: Socket - ): Promise { - this.assertInWorkspace(client, Awareness(workspaceId)); - client - .to(Awareness(workspaceId)) - .emit('server-awareness-broadcast', { workspaceId, awarenessUpdate }); - return {}; - } -} diff --git a/packages/backend/server/src/core/sync/events/events.module.ts b/packages/backend/server/src/core/sync/events/events.module.ts deleted file mode 100644 index ca2eaa888e..0000000000 --- a/packages/backend/server/src/core/sync/events/events.module.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { Module } from '@nestjs/common'; - -import { DocModule } from '../../doc'; -import { PermissionModule } from '../../permission'; -import { EventsGateway } from './events.gateway'; - -@Module({ - imports: [DocModule, PermissionModule], - providers: [EventsGateway], -}) -export class EventsModule {} diff --git a/packages/backend/server/src/core/sync/gateway.ts b/packages/backend/server/src/core/sync/gateway.ts new file mode 100644 index 0000000000..9f22099449 --- /dev/null +++ b/packages/backend/server/src/core/sync/gateway.ts @@ -0,0 +1,666 @@ +import { applyDecorators, Logger } from '@nestjs/common'; +import { + ConnectedSocket, + MessageBody, + OnGatewayConnection, + OnGatewayDisconnect, + SubscribeMessage as RawSubscribeMessage, + WebSocketGateway, +} from '@nestjs/websockets'; +import { Socket } from 'socket.io'; +import { diffUpdate, encodeStateVectorFromUpdate } from 'yjs'; + +import { + CallTimer, + Config, + DocNotFound, + GatewayErrorWrapper, + metrics, + NotInSpace, + SpaceAccessDenied, + VersionRejected, +} from '../../fundamentals'; +import { Auth, CurrentUser } from '../auth'; +import { + DocStorageAdapter, + PgUserspaceDocStorageAdapter, + PgWorkspaceDocStorageAdapter, +} from '../doc'; +import { Permission, PermissionService } from '../permission'; +import { DocID } from '../utils/doc'; + +const SubscribeMessage = (event: string) => + applyDecorators( + GatewayErrorWrapper(event), + CallTimer('socketio', 'event_duration', { event }), + RawSubscribeMessage(event) + ); + +type EventResponse = Data extends never + ? { + data?: never; + } + : { + data: Data; + }; + +type RoomType = 'sync' | `${string}:awareness`; + +function Room( + spaceId: string, + type: RoomType = 'sync' +): `${string}:${RoomType}` { + return `${spaceId}:${type}`; +} + +enum SpaceType { + Workspace = 'workspace', + Userspace = 'userspace', +} + +interface JoinSpaceMessage { + spaceType: SpaceType; + spaceId: string; + clientVersion: string; +} + +interface JoinSpaceAwarenessMessage { + spaceType: SpaceType; + spaceId: string; + docId: string; + clientVersion: string; +} + +interface LeaveSpaceMessage { + spaceType: SpaceType; + spaceId: string; +} + +interface LeaveSpaceAwarenessMessage { + spaceType: SpaceType; + spaceId: string; + docId: string; +} + +interface PushDocUpdatesMessage { + spaceType: SpaceType; + spaceId: string; + docId: string; + updates: string[]; +} + +interface LoadDocMessage { + spaceType: SpaceType; + spaceId: string; + docId: string; + stateVector?: string; +} + +interface LoadDocTimestampsMessage { + spaceType: SpaceType; + spaceId: string; + timestamp?: number; +} + +interface LoadSpaceAwarenessesMessage { + spaceType: SpaceType; + spaceId: string; + docId: string; +} +interface UpdateAwarenessMessage { + spaceType: SpaceType; + spaceId: string; + docId: string; + awarenessUpdate: string; +} +@WebSocketGateway() +export class SpaceSyncGateway + implements OnGatewayConnection, OnGatewayDisconnect +{ + protected logger = new Logger(SpaceSyncGateway.name); + + private connectionCount = 0; + + constructor( + private readonly config: Config, + private readonly permissions: PermissionService, + private readonly workspace: PgWorkspaceDocStorageAdapter, + private readonly userspace: PgUserspaceDocStorageAdapter + ) {} + + handleConnection() { + this.connectionCount++; + metrics.socketio.gauge('realtime_connections').record(this.connectionCount); + } + + handleDisconnect() { + this.connectionCount--; + metrics.socketio.gauge('realtime_connections').record(this.connectionCount); + } + + selectAdapter(client: Socket, spaceType: SpaceType): SyncSocketAdapter { + let adapters: Record = (client as any) + .affineSyncAdapters; + + if (!adapters) { + const workspace = new WorkspaceSyncAdapter( + client, + this.workspace, + this.permissions + ); + const userspace = new UserspaceSyncAdapter(client, this.userspace); + + adapters = { workspace, userspace }; + (client as any).affineSyncAdapters = adapters; + } + + return adapters[spaceType]; + } + + async assertVersion(client: Socket, version?: string) { + const shouldCheckClientVersion = await this.config.runtime.fetch( + 'flags/syncClientVersionCheck' + ); + if ( + // @todo(@darkskygit): remove this flag after 0.12 goes stable + shouldCheckClientVersion && + version !== AFFiNE.version + ) { + client.emit('server-version-rejected', { + currentVersion: version, + requiredVersion: AFFiNE.version, + reason: `Client version${ + version ? ` ${version}` : '' + } is outdated, please update to ${AFFiNE.version}`, + }); + + throw new VersionRejected({ + version: version || 'unknown', + serverVersion: AFFiNE.version, + }); + } + } + + async joinWorkspace( + client: Socket, + room: `${string}:${'sync' | 'awareness'}` + ) { + await client.join(room); + } + + async leaveWorkspace( + client: Socket, + room: `${string}:${'sync' | 'awareness'}` + ) { + await client.leave(room); + } + + assertInWorkspace(client: Socket, room: `${string}:${'sync' | 'awareness'}`) { + if (!client.rooms.has(room)) { + throw new NotInSpace({ spaceId: room.split(':')[0] }); + } + } + + // v3 + @Auth() + @SubscribeMessage('space:join') + async onJoinSpace( + @CurrentUser() user: CurrentUser, + @ConnectedSocket() client: Socket, + @MessageBody() + { spaceType, spaceId, clientVersion }: JoinSpaceMessage + ): Promise> { + await this.assertVersion(client, clientVersion); + + await this.selectAdapter(client, spaceType).join(user.id, spaceId); + + return { data: { clientId: client.id, success: true } }; + } + + @SubscribeMessage('space:leave') + async onLeaveSpace( + @ConnectedSocket() client: Socket, + @MessageBody() { spaceType, spaceId }: LeaveSpaceMessage + ): Promise> { + await this.selectAdapter(client, spaceType).leave(spaceId); + + return { data: { clientId: client.id, success: true } }; + } + + @SubscribeMessage('space:load-doc') + async onLoadSpaceDoc( + @ConnectedSocket() client: Socket, + @MessageBody() + { spaceType, spaceId, docId, stateVector }: LoadDocMessage + ): Promise< + EventResponse<{ missing: string; state?: string; timestamp: number }> + > { + const adapter = this.selectAdapter(client, spaceType); + adapter.assertIn(spaceId); + + const doc = await adapter.get(spaceId, docId); + + if (!doc) { + throw new DocNotFound({ spaceId, docId }); + } + + const missing = Buffer.from( + stateVector + ? diffUpdate(doc.bin, Buffer.from(stateVector, 'base64')) + : doc.bin + ).toString('base64'); + + const state = Buffer.from(encodeStateVectorFromUpdate(doc.bin)).toString( + 'base64' + ); + + return { + data: { + missing, + state, + timestamp: doc.timestamp, + }, + }; + } + + @SubscribeMessage('space:push-doc-updates') + async onReceiveDocUpdates( + @ConnectedSocket() client: Socket, + @MessageBody() + message: PushDocUpdatesMessage + ): Promise> { + const { spaceType, spaceId, docId, updates } = message; + const adapter = this.selectAdapter(client, spaceType); + + // TODO(@forehalo): we might need to check write permission before push updates + const timestamp = await adapter.push( + spaceId, + docId, + updates.map(update => Buffer.from(update, 'base64')) + ); + + // could be put in [adapter.push] + // but the event should be kept away from adapter + // so + client + .to(adapter.room(spaceId)) + .emit('space:broadcast-doc-updates', { ...message, timestamp }); + + // TODO(@forehalo): remove backward compatibility + if (spaceType === SpaceType.Workspace) { + const id = new DocID(docId, spaceId); + client.to(adapter.room(spaceId)).emit('server-updates', { + workspaceId: spaceId, + guid: id.guid, + updates, + timestamp, + }); + } + + return { + data: { + accepted: true, + timestamp, + }, + }; + } + + @SubscribeMessage('space:load-doc-timestamps') + async onLoadDocTimestamps( + @ConnectedSocket() client: Socket, + @MessageBody() + { spaceType, spaceId, timestamp }: LoadDocTimestampsMessage + ): Promise>> { + const adapter = this.selectAdapter(client, spaceType); + + const stats = await adapter.getTimestamps(spaceId, timestamp); + + return { + data: stats ?? {}, + }; + } + + @Auth() + @SubscribeMessage('space:join-awareness') + async onJoinAwareness( + @ConnectedSocket() client: Socket, + @CurrentUser() user: CurrentUser, + @MessageBody() + { spaceType, spaceId, docId, clientVersion }: JoinSpaceAwarenessMessage + ) { + await this.assertVersion(client, clientVersion); + + await this.selectAdapter(client, spaceType).join( + user.id, + spaceId, + `${docId}:awareness` + ); + + return { data: { clientId: client.id, success: true } }; + } + + @SubscribeMessage('space:leave-awareness') + async onLeaveAwareness( + @ConnectedSocket() client: Socket, + @MessageBody() + { spaceType, spaceId, docId }: LeaveSpaceAwarenessMessage + ) { + await this.selectAdapter(client, spaceType).leave( + spaceId, + `${docId}:awareness` + ); + + return { data: { clientId: client.id, success: true } }; + } + + @SubscribeMessage('space:load-awarenesses') + async onLoadAwareness( + @ConnectedSocket() client: Socket, + @MessageBody() + { spaceType, spaceId, docId }: LoadSpaceAwarenessesMessage + ) { + const adapter = this.selectAdapter(client, spaceType); + + const roomType = `${docId}:awareness` as const; + adapter.assertIn(spaceId, roomType); + client + .to(adapter.room(spaceId, roomType)) + .emit('space:collect-awareness', { spaceType, spaceId, docId }); + + // TODO(@forehalo): remove backward compatibility + if (spaceType === SpaceType.Workspace) { + client + .to(adapter.room(spaceId, roomType)) + .emit('new-client-awareness-init'); + } + + return { data: { clientId: client.id } }; + } + + @SubscribeMessage('space:update-awareness') + async onUpdateAwareness( + @ConnectedSocket() client: Socket, + @MessageBody() message: UpdateAwarenessMessage + ) { + const { spaceType, spaceId, docId } = message; + const adapter = this.selectAdapter(client, spaceType); + + const roomType = `${docId}:awareness` as const; + adapter.assertIn(spaceId, roomType); + client + .to(adapter.room(spaceId, roomType)) + .emit('space:broadcast-awareness-update', message); + + // TODO(@forehalo): remove backward compatibility + if (spaceType === SpaceType.Workspace) { + client + .to(adapter.room(spaceId, roomType)) + .emit('server-awareness-broadcast', { + workspaceId: spaceId, + awarenessUpdate: message.awarenessUpdate, + }); + } + + return {}; + } + + // TODO(@forehalo): remove + // deprecated section + @Auth() + @SubscribeMessage('client-handshake-sync') + async handleClientHandshakeSync( + @CurrentUser() user: CurrentUser, + @MessageBody('workspaceId') workspaceId: string, + @MessageBody('version') version: string, + @ConnectedSocket() client: Socket + ): Promise> { + await this.assertVersion(client, version); + + return this.onJoinSpace(user, client, { + spaceType: SpaceType.Workspace, + spaceId: workspaceId, + clientVersion: version, + }); + } + + @SubscribeMessage('client-leave-sync') + async handleLeaveSync( + @MessageBody() workspaceId: string, + @ConnectedSocket() client: Socket + ): Promise { + return this.onLeaveSpace(client, { + spaceType: SpaceType.Workspace, + spaceId: workspaceId, + }); + } + + @SubscribeMessage('client-pre-sync') + async loadDocStats( + @ConnectedSocket() client: Socket, + @MessageBody() + { workspaceId, timestamp }: { workspaceId: string; timestamp?: number } + ): Promise>> { + return this.onLoadDocTimestamps(client, { + spaceType: SpaceType.Workspace, + spaceId: workspaceId, + timestamp, + }); + } + + @SubscribeMessage('client-update-v2') + async handleClientUpdateV2( + @MessageBody() + { + workspaceId, + guid, + updates, + }: { + workspaceId: string; + guid: string; + updates: string[]; + }, + @ConnectedSocket() client: Socket + ): Promise> { + return this.onReceiveDocUpdates(client, { + spaceType: SpaceType.Workspace, + spaceId: workspaceId, + docId: guid, + updates, + }); + } + + @SubscribeMessage('doc-load-v2') + async loadDocV2( + @ConnectedSocket() client: Socket, + @MessageBody() + { + workspaceId, + guid, + stateVector, + }: { + workspaceId: string; + guid: string; + stateVector?: string; + } + ): Promise< + EventResponse<{ missing: string; state?: string; timestamp: number }> + > { + return this.onLoadSpaceDoc(client, { + spaceType: SpaceType.Workspace, + spaceId: workspaceId, + docId: guid, + stateVector, + }); + } + + @Auth() + @SubscribeMessage('client-handshake-awareness') + async handleClientHandshakeAwareness( + @ConnectedSocket() client: Socket, + @CurrentUser() user: CurrentUser, + @MessageBody('workspaceId') workspaceId: string, + @MessageBody('version') version: string + ): Promise> { + return this.onJoinAwareness(client, user, { + spaceType: SpaceType.Workspace, + spaceId: workspaceId, + docId: workspaceId, + clientVersion: version, + }); + } + + @SubscribeMessage('client-leave-awareness') + async handleLeaveAwareness( + @MessageBody() workspaceId: string, + @ConnectedSocket() client: Socket + ): Promise { + return this.onLeaveAwareness(client, { + spaceType: SpaceType.Workspace, + spaceId: workspaceId, + docId: workspaceId, + }); + } + + @SubscribeMessage('awareness-init') + async handleInitAwareness( + @MessageBody() workspaceId: string, + @ConnectedSocket() client: Socket + ): Promise> { + return this.onLoadAwareness(client, { + spaceType: SpaceType.Workspace, + spaceId: workspaceId, + docId: workspaceId, + }); + } + + @SubscribeMessage('awareness-update') + async handleHelpGatheringAwareness( + @MessageBody() + { + workspaceId, + awarenessUpdate, + }: { workspaceId: string; awarenessUpdate: string }, + @ConnectedSocket() client: Socket + ): Promise { + return this.onUpdateAwareness(client, { + spaceType: SpaceType.Workspace, + spaceId: workspaceId, + docId: workspaceId, + awarenessUpdate, + }); + } +} + +abstract class SyncSocketAdapter { + constructor( + private readonly spaceType: SpaceType, + public readonly client: Socket, + public readonly storage: DocStorageAdapter + ) {} + + room(spaceId: string, roomType: RoomType = 'sync') { + return `${this.spaceType}:${Room(spaceId, roomType)}`; + } + + async join(userId: string, spaceId: string, roomType: RoomType = 'sync') { + this.assertNotIn(spaceId, roomType); + await this.assertAccessible(spaceId, userId, Permission.Read); + return this.client.join(this.room(spaceId, roomType)); + } + + async leave(spaceId: string, roomType: RoomType = 'sync') { + this.assertIn(spaceId, roomType); + return this.client.leave(this.room(spaceId, roomType)); + } + + in(spaceId: string, roomType: RoomType = 'sync') { + return this.client.rooms.has(this.room(spaceId, roomType)); + } + + assertNotIn(spaceId: string, roomType: RoomType = 'sync') { + if (this.client.rooms.has(this.room(spaceId, roomType))) { + // TODO(@forehalo): use new AlreadyInSpace({ spaceId }) instead + throw new NotInSpace({ spaceId }); + } + } + + assertIn(spaceId: string, roomType: RoomType = 'sync') { + if (!this.client.rooms.has(this.room(spaceId, roomType))) { + throw new NotInSpace({ spaceId }); + } + } + + abstract assertAccessible( + spaceId: string, + userId: string, + permission?: Permission + ): Promise; + + push(spaceId: string, docId: string, updates: Buffer[]) { + this.assertIn(spaceId); + return this.storage.pushDocUpdates(spaceId, docId, updates); + } + + get(spaceId: string, docId: string) { + this.assertIn(spaceId); + return this.storage.getDoc(spaceId, docId); + } + + getTimestamps(spaceId: string, timestamp?: number) { + this.assertIn(spaceId); + return this.storage.getSpaceDocTimestamps(spaceId, timestamp); + } +} + +class WorkspaceSyncAdapter extends SyncSocketAdapter { + constructor( + client: Socket, + storage: DocStorageAdapter, + private readonly permission: PermissionService + ) { + super(SpaceType.Workspace, client, storage); + } + + // backward compatibility + override room(spaceId: string, roomType: RoomType = 'sync') { + return Room(spaceId, roomType); + } + + override push(spaceId: string, docId: string, updates: Buffer[]) { + const id = new DocID(docId, spaceId); + return super.push(spaceId, id.guid, updates); + } + + override get(spaceId: string, docId: string) { + const id = new DocID(docId, spaceId); + return this.storage.getDoc(spaceId, id.guid); + } + + async assertAccessible( + spaceId: string, + userId: string, + permission: Permission = Permission.Read + ) { + if ( + !(await this.permission.isWorkspaceMember(spaceId, userId, permission)) + ) { + throw new SpaceAccessDenied({ spaceId }); + } + } +} + +class UserspaceSyncAdapter extends SyncSocketAdapter { + constructor(client: Socket, storage: DocStorageAdapter) { + super(SpaceType.Userspace, client, storage); + } + + async assertAccessible( + spaceId: string, + userId: string, + _permission: Permission = Permission.Read + ) { + if (spaceId !== userId) { + throw new SpaceAccessDenied({ spaceId }); + } + } +} diff --git a/packages/backend/server/src/core/sync/index.ts b/packages/backend/server/src/core/sync/index.ts index 78553c2b69..2e9a3a17db 100644 --- a/packages/backend/server/src/core/sync/index.ts +++ b/packages/backend/server/src/core/sync/index.ts @@ -1,8 +1,11 @@ import { Module } from '@nestjs/common'; -import { EventsModule } from './events/events.module'; +import { DocStorageModule } from '../doc'; +import { PermissionModule } from '../permission'; +import { SpaceSyncGateway } from './gateway'; @Module({ - imports: [EventsModule], + imports: [DocStorageModule, PermissionModule], + providers: [SpaceSyncGateway], }) export class SyncModule {} diff --git a/packages/backend/server/src/core/workspaces/controller.ts b/packages/backend/server/src/core/workspaces/controller.ts index 4d6903eb98..ee859c0d53 100644 --- a/packages/backend/server/src/core/workspaces/controller.ts +++ b/packages/backend/server/src/core/workspaces/controller.ts @@ -12,7 +12,7 @@ import { InvalidHistoryTimestamp, } from '../../fundamentals'; import { CurrentUser, Public } from '../auth'; -import { DocHistoryManager, DocManager } from '../doc'; +import { PgWorkspaceDocStorageAdapter } from '../doc'; import { Permission, PermissionService, PublicPageMode } from '../permission'; import { WorkspaceBlobStorage } from '../storage'; import { DocID } from '../utils/doc'; @@ -23,8 +23,7 @@ export class WorkspacesController { constructor( private readonly storage: WorkspaceBlobStorage, private readonly permission: PermissionService, - private readonly docManager: DocManager, - private readonly historyManager: DocHistoryManager, + private readonly workspace: PgWorkspaceDocStorageAdapter, private readonly prisma: PrismaClient ) {} @@ -56,7 +55,7 @@ export class WorkspacesController { if (!body) { throw new BlobNotFound({ - workspaceId, + spaceId: workspaceId, blobId: name, }); } @@ -96,14 +95,14 @@ export class WorkspacesController { throw new AccessDenied(); } - const binResponse = await this.docManager.getBinary( + const binResponse = await this.workspace.getDoc( docId.workspace, docId.guid ); if (!binResponse) { throw new DocNotFound({ - workspaceId: docId.workspace, + spaceId: docId.workspace, docId: docId.guid, }); } @@ -125,7 +124,7 @@ export class WorkspacesController { } res.setHeader('content-type', 'application/octet-stream'); - res.send(binResponse.binary); + res.send(binResponse.bin); } @Get('/:id/docs/:guid/histories/:timestamp') @@ -152,19 +151,19 @@ export class WorkspacesController { Permission.Write ); - const history = await this.historyManager.get( + const history = await this.workspace.getDocHistory( docId.workspace, docId.guid, - ts + ts.getTime() ); if (history) { res.setHeader('content-type', 'application/octet-stream'); res.setHeader('cache-control', 'private, max-age=2592000, immutable'); - res.send(history.blob); + res.send(history.bin); } else { throw new DocHistoryNotFound({ - workspaceId: docId.workspace, + spaceId: docId.workspace, docId: guid, timestamp: ts.getTime(), }); diff --git a/packages/backend/server/src/core/workspaces/index.ts b/packages/backend/server/src/core/workspaces/index.ts index bac47e148a..e673148d8c 100644 --- a/packages/backend/server/src/core/workspaces/index.ts +++ b/packages/backend/server/src/core/workspaces/index.ts @@ -1,6 +1,6 @@ import { Module } from '@nestjs/common'; -import { DocModule } from '../doc'; +import { DocStorageModule } from '../doc'; import { FeatureModule } from '../features'; import { PermissionModule } from '../permission'; import { QuotaModule } from '../quota'; @@ -17,7 +17,7 @@ import { @Module({ imports: [ - DocModule, + DocStorageModule, FeatureModule, QuotaModule, StorageModule, diff --git a/packages/backend/server/src/core/workspaces/resolvers/history.ts b/packages/backend/server/src/core/workspaces/resolvers/history.ts index cdeb9ee812..d11d2b6f03 100644 --- a/packages/backend/server/src/core/workspaces/resolvers/history.ts +++ b/packages/backend/server/src/core/workspaces/resolvers/history.ts @@ -12,7 +12,7 @@ import { import type { SnapshotHistory } from '@prisma/client'; import { CurrentUser } from '../../auth'; -import { DocHistoryManager } from '../../doc'; +import { PgWorkspaceDocStorageAdapter } from '../../doc'; import { Permission, PermissionService } from '../../permission'; import { DocID } from '../../utils/doc'; import { WorkspaceType } from '../types'; @@ -32,7 +32,7 @@ class DocHistoryType implements Partial { @Resolver(() => WorkspaceType) export class DocHistoryResolver { constructor( - private readonly historyManager: DocHistoryManager, + private readonly workspace: PgWorkspaceDocStorageAdapter, private readonly permission: PermissionService ) {} @@ -47,17 +47,19 @@ export class DocHistoryResolver { ): Promise { const docId = new DocID(guid, workspace.id); - return this.historyManager - .list(workspace.id, docId.guid, timestamp, take) - .then(rows => - rows.map(({ timestamp }) => { - return { - workspaceId: workspace.id, - id: docId.guid, - timestamp, - }; - }) - ); + const timestamps = await this.workspace.listDocHistories( + workspace.id, + docId.guid, + { before: timestamp.getTime(), limit: take } + ); + + return timestamps.map(timestamp => { + return { + workspaceId: workspace.id, + id: docId.guid, + timestamp: new Date(timestamp), + }; + }); } @Mutation(() => Date) @@ -76,6 +78,12 @@ export class DocHistoryResolver { Permission.Write ); - return this.historyManager.recover(docId.workspace, docId.guid, timestamp); + await this.workspace.rollbackDoc( + docId.workspace, + docId.guid, + timestamp.getTime() + ); + + return timestamp; } } diff --git a/packages/backend/server/src/core/workspaces/resolvers/workspace.ts b/packages/backend/server/src/core/workspaces/resolvers/workspace.ts index a749514508..76c84407fa 100644 --- a/packages/backend/server/src/core/workspaces/resolvers/workspace.ts +++ b/packages/backend/server/src/core/workspaces/resolvers/workspace.ts @@ -15,17 +15,17 @@ import { applyUpdate, Doc } from 'yjs'; import type { FileUpload } from '../../../fundamentals'; import { - CantChangeWorkspaceOwner, + CantChangeSpaceOwner, EventEmitter, InternalServerError, MailService, MemberQuotaExceeded, RequestMutex, + SpaceAccessDenied, + SpaceNotFound, Throttle, TooManyRequest, UserNotFound, - WorkspaceAccessDenied, - WorkspaceNotFound, } from '../../../fundamentals'; import { CurrentUser, Public } from '../../auth'; import { Permission, PermissionService } from '../../permission'; @@ -76,7 +76,7 @@ export class WorkspaceResolver { const permission = await this.permissions.get(workspace.id, user.id); if (!permission) { - throw new WorkspaceAccessDenied({ workspaceId: workspace.id }); + throw new SpaceAccessDenied({ spaceId: workspace.id }); } return permission; @@ -193,7 +193,7 @@ export class WorkspaceResolver { const workspace = await this.prisma.workspace.findUnique({ where: { id } }); if (!workspace) { - throw new WorkspaceNotFound({ workspaceId: id }); + throw new SpaceNotFound({ spaceId: id }); } return workspace; @@ -304,7 +304,7 @@ export class WorkspaceResolver { ); if (permission === Permission.Owner) { - throw new CantChangeWorkspaceOwner(); + throw new CantChangeSpaceOwner(); } try { diff --git a/packages/backend/server/src/fundamentals/error/def.ts b/packages/backend/server/src/fundamentals/error/def.ts index d65950dd99..2b4b4c964b 100644 --- a/packages/backend/server/src/fundamentals/error/def.ts +++ b/packages/backend/server/src/fundamentals/error/def.ts @@ -306,45 +306,49 @@ export const USER_FRIENDLY_ERRORS = { message: 'You must verify your email before accessing this resource.', }, - // Workspace & Doc & Sync errors - workspace_not_found: { + // Workspace & Userspace & Doc & Sync errors + space_not_found: { type: 'resource_not_found', - args: { workspaceId: 'string' }, - message: ({ workspaceId }) => `Workspace ${workspaceId} not found.`, + args: { spaceId: 'string' }, + message: ({ spaceId }) => `Space ${spaceId} not found.`, }, - not_in_workspace: { + not_in_space: { type: 'action_forbidden', - args: { workspaceId: 'string' }, - message: ({ workspaceId }) => - `You should join in workspace ${workspaceId} before broadcasting messages.`, + args: { spaceId: 'string' }, + message: ({ spaceId }) => + `You should join in Space ${spaceId} before broadcasting messages.`, }, - workspace_access_denied: { + already_in_space: { + type: 'action_forbidden', + args: { spaceId: 'string' }, + message: ({ spaceId }) => `You have already joined in Space ${spaceId}.`, + }, + space_access_denied: { type: 'no_permission', - args: { workspaceId: 'string' }, - message: ({ workspaceId }) => - `You do not have permission to access workspace ${workspaceId}.`, + args: { spaceId: 'string' }, + message: ({ spaceId }) => + `You do not have permission to access Space ${spaceId}.`, }, - workspace_owner_not_found: { + space_owner_not_found: { type: 'internal_server_error', - args: { workspaceId: 'string' }, - message: ({ workspaceId }) => - `Owner of workspace ${workspaceId} not found.`, + args: { spaceId: 'string' }, + message: ({ spaceId }) => `Owner of Space ${spaceId} not found.`, }, - cant_change_workspace_owner: { + cant_change_space_owner: { type: 'action_forbidden', - message: 'You are not allowed to change the owner of a workspace.', + message: 'You are not allowed to change the owner of a Space.', }, doc_not_found: { type: 'resource_not_found', - args: { workspaceId: 'string', docId: 'string' }, - message: ({ workspaceId, docId }) => - `Doc ${docId} under workspace ${workspaceId} not found.`, + args: { spaceId: 'string', docId: 'string' }, + message: ({ spaceId, docId }) => + `Doc ${docId} under Space ${spaceId} not found.`, }, doc_access_denied: { type: 'no_permission', - args: { workspaceId: 'string', docId: 'string' }, - message: ({ workspaceId, docId }) => - `You do not have permission to access doc ${docId} under workspace ${workspaceId}.`, + args: { spaceId: 'string', docId: 'string' }, + message: ({ spaceId, docId }) => + `You do not have permission to access doc ${docId} under Space ${spaceId}.`, }, version_rejected: { type: 'action_forbidden', @@ -359,28 +363,36 @@ export const USER_FRIENDLY_ERRORS = { }, doc_history_not_found: { type: 'resource_not_found', - args: { workspaceId: 'string', docId: 'string', timestamp: 'number' }, - message: ({ workspaceId, docId, timestamp }) => - `History of ${docId} at ${timestamp} under workspace ${workspaceId}.`, + args: { spaceId: 'string', docId: 'string', timestamp: 'number' }, + message: ({ spaceId, docId, timestamp }) => + `History of ${docId} at ${timestamp} under Space ${spaceId}.`, }, blob_not_found: { type: 'resource_not_found', - args: { workspaceId: 'string', blobId: 'string' }, - message: ({ workspaceId, blobId }) => - `Blob ${blobId} not found in workspace ${workspaceId}.`, + args: { spaceId: 'string', blobId: 'string' }, + message: ({ spaceId, blobId }) => + `Blob ${blobId} not found in Space ${spaceId}.`, }, expect_to_publish_page: { type: 'invalid_input', - message: 'Expected to publish a page, not a workspace.', + message: 'Expected to publish a page, not a Space.', }, expect_to_revoke_public_page: { type: 'invalid_input', - message: 'Expected to revoke a public page, not a workspace.', + message: 'Expected to revoke a public page, not a Space.', }, page_is_not_public: { type: 'bad_request', message: 'Page is not public.', }, + failed_to_save_updates: { + type: 'internal_server_error', + message: 'Failed to store doc updates.', + }, + failed_to_upsert_snapshot: { + type: 'internal_server_error', + message: 'Failed to store doc snapshot.', + }, // Subscription Errors failed_to_checkout: { diff --git a/packages/backend/server/src/fundamentals/error/errors.gen.ts b/packages/backend/server/src/fundamentals/error/errors.gen.ts index 4066ed57d4..b63a85506f 100644 --- a/packages/backend/server/src/fundamentals/error/errors.gen.ts +++ b/packages/backend/server/src/fundamentals/error/errors.gen.ts @@ -173,54 +173,64 @@ export class EmailVerificationRequired extends UserFriendlyError { } } @ObjectType() -class WorkspaceNotFoundDataType { - @Field() workspaceId!: string +class SpaceNotFoundDataType { + @Field() spaceId!: string } -export class WorkspaceNotFound extends UserFriendlyError { - constructor(args: WorkspaceNotFoundDataType, message?: string | ((args: WorkspaceNotFoundDataType) => string)) { - super('resource_not_found', 'workspace_not_found', message, args); +export class SpaceNotFound extends UserFriendlyError { + constructor(args: SpaceNotFoundDataType, message?: string | ((args: SpaceNotFoundDataType) => string)) { + super('resource_not_found', 'space_not_found', message, args); } } @ObjectType() -class NotInWorkspaceDataType { - @Field() workspaceId!: string +class NotInSpaceDataType { + @Field() spaceId!: string } -export class NotInWorkspace extends UserFriendlyError { - constructor(args: NotInWorkspaceDataType, message?: string | ((args: NotInWorkspaceDataType) => string)) { - super('action_forbidden', 'not_in_workspace', message, args); +export class NotInSpace extends UserFriendlyError { + constructor(args: NotInSpaceDataType, message?: string | ((args: NotInSpaceDataType) => string)) { + super('action_forbidden', 'not_in_space', message, args); } } @ObjectType() -class WorkspaceAccessDeniedDataType { - @Field() workspaceId!: string +class AlreadyInSpaceDataType { + @Field() spaceId!: string } -export class WorkspaceAccessDenied extends UserFriendlyError { - constructor(args: WorkspaceAccessDeniedDataType, message?: string | ((args: WorkspaceAccessDeniedDataType) => string)) { - super('no_permission', 'workspace_access_denied', message, args); +export class AlreadyInSpace extends UserFriendlyError { + constructor(args: AlreadyInSpaceDataType, message?: string | ((args: AlreadyInSpaceDataType) => string)) { + super('action_forbidden', 'already_in_space', message, args); } } @ObjectType() -class WorkspaceOwnerNotFoundDataType { - @Field() workspaceId!: string +class SpaceAccessDeniedDataType { + @Field() spaceId!: string } -export class WorkspaceOwnerNotFound extends UserFriendlyError { - constructor(args: WorkspaceOwnerNotFoundDataType, message?: string | ((args: WorkspaceOwnerNotFoundDataType) => string)) { - super('internal_server_error', 'workspace_owner_not_found', message, args); +export class SpaceAccessDenied extends UserFriendlyError { + constructor(args: SpaceAccessDeniedDataType, message?: string | ((args: SpaceAccessDeniedDataType) => string)) { + super('no_permission', 'space_access_denied', message, args); + } +} +@ObjectType() +class SpaceOwnerNotFoundDataType { + @Field() spaceId!: string +} + +export class SpaceOwnerNotFound extends UserFriendlyError { + constructor(args: SpaceOwnerNotFoundDataType, message?: string | ((args: SpaceOwnerNotFoundDataType) => string)) { + super('internal_server_error', 'space_owner_not_found', message, args); } } -export class CantChangeWorkspaceOwner extends UserFriendlyError { +export class CantChangeSpaceOwner extends UserFriendlyError { constructor(message?: string) { - super('action_forbidden', 'cant_change_workspace_owner', message); + super('action_forbidden', 'cant_change_space_owner', message); } } @ObjectType() class DocNotFoundDataType { - @Field() workspaceId!: string + @Field() spaceId!: string @Field() docId!: string } @@ -231,7 +241,7 @@ export class DocNotFound extends UserFriendlyError { } @ObjectType() class DocAccessDeniedDataType { - @Field() workspaceId!: string + @Field() spaceId!: string @Field() docId!: string } @@ -263,7 +273,7 @@ export class InvalidHistoryTimestamp extends UserFriendlyError { } @ObjectType() class DocHistoryNotFoundDataType { - @Field() workspaceId!: string + @Field() spaceId!: string @Field() docId!: string @Field() timestamp!: number } @@ -275,7 +285,7 @@ export class DocHistoryNotFound extends UserFriendlyError { } @ObjectType() class BlobNotFoundDataType { - @Field() workspaceId!: string + @Field() spaceId!: string @Field() blobId!: string } @@ -303,6 +313,18 @@ export class PageIsNotPublic extends UserFriendlyError { } } +export class FailedToSaveUpdates extends UserFriendlyError { + constructor(message?: string) { + super('internal_server_error', 'failed_to_save_updates', message); + } +} + +export class FailedToUpsertSnapshot extends UserFriendlyError { + constructor(message?: string) { + super('internal_server_error', 'failed_to_upsert_snapshot', message); + } +} + export class FailedToCheckout extends UserFriendlyError { constructor(message?: string) { super('internal_server_error', 'failed_to_checkout', message); @@ -538,11 +560,12 @@ export enum ErrorNames { ACTION_FORBIDDEN, ACCESS_DENIED, EMAIL_VERIFICATION_REQUIRED, - WORKSPACE_NOT_FOUND, - NOT_IN_WORKSPACE, - WORKSPACE_ACCESS_DENIED, - WORKSPACE_OWNER_NOT_FOUND, - CANT_CHANGE_WORKSPACE_OWNER, + SPACE_NOT_FOUND, + NOT_IN_SPACE, + ALREADY_IN_SPACE, + SPACE_ACCESS_DENIED, + SPACE_OWNER_NOT_FOUND, + CANT_CHANGE_SPACE_OWNER, DOC_NOT_FOUND, DOC_ACCESS_DENIED, VERSION_REJECTED, @@ -552,6 +575,8 @@ export enum ErrorNames { EXPECT_TO_PUBLISH_PAGE, EXPECT_TO_REVOKE_PUBLIC_PAGE, PAGE_IS_NOT_PUBLIC, + FAILED_TO_SAVE_UPDATES, + FAILED_TO_UPSERT_SNAPSHOT, FAILED_TO_CHECKOUT, SUBSCRIPTION_ALREADY_EXISTS, SUBSCRIPTION_NOT_EXISTS, @@ -588,5 +613,5 @@ registerEnumType(ErrorNames, { export const ErrorDataUnionType = createUnionType({ name: 'ErrorDataUnion', types: () => - [UnknownOauthProviderDataType, MissingOauthQueryParameterDataType, InvalidPasswordLengthDataType, WorkspaceNotFoundDataType, NotInWorkspaceDataType, WorkspaceAccessDeniedDataType, WorkspaceOwnerNotFoundDataType, DocNotFoundDataType, DocAccessDeniedDataType, VersionRejectedDataType, InvalidHistoryTimestampDataType, DocHistoryNotFoundDataType, BlobNotFoundDataType, SubscriptionAlreadyExistsDataType, SubscriptionNotExistsDataType, SameSubscriptionRecurringDataType, SubscriptionPlanNotFoundDataType, CopilotMessageNotFoundDataType, CopilotPromptNotFoundDataType, CopilotProviderSideErrorDataType, RuntimeConfigNotFoundDataType, InvalidRuntimeConfigTypeDataType] as const, + [UnknownOauthProviderDataType, MissingOauthQueryParameterDataType, InvalidPasswordLengthDataType, SpaceNotFoundDataType, NotInSpaceDataType, AlreadyInSpaceDataType, SpaceAccessDeniedDataType, SpaceOwnerNotFoundDataType, DocNotFoundDataType, DocAccessDeniedDataType, VersionRejectedDataType, InvalidHistoryTimestampDataType, DocHistoryNotFoundDataType, BlobNotFoundDataType, SubscriptionAlreadyExistsDataType, SubscriptionNotExistsDataType, SameSubscriptionRecurringDataType, SubscriptionPlanNotFoundDataType, CopilotMessageNotFoundDataType, CopilotPromptNotFoundDataType, CopilotProviderSideErrorDataType, RuntimeConfigNotFoundDataType, InvalidRuntimeConfigTypeDataType] as const, }); diff --git a/packages/backend/server/src/plugins/copilot/controller.ts b/packages/backend/server/src/plugins/copilot/controller.ts index 9fed213f3b..e2c2bf0720 100644 --- a/packages/backend/server/src/plugins/copilot/controller.ts +++ b/packages/backend/server/src/plugins/copilot/controller.ts @@ -472,7 +472,7 @@ export class CopilotController { if (!body) { throw new BlobNotFound({ - workspaceId, + spaceId: workspaceId, blobId: key, }); } diff --git a/packages/backend/server/src/schema.gql b/packages/backend/server/src/schema.gql index e816313fef..625a391927 100644 --- a/packages/backend/server/src/schema.gql +++ b/packages/backend/server/src/schema.gql @@ -2,9 +2,13 @@ # THIS FILE WAS AUTOMATICALLY GENERATED (DO NOT MODIFY) # ------------------------------------------------------ +type AlreadyInSpaceDataType { + spaceId: String! +} + type BlobNotFoundDataType { blobId: String! - workspaceId: String! + spaceId: String! } enum ChatHistoryOrder { @@ -175,13 +179,13 @@ input DeleteSessionInput { type DocAccessDeniedDataType { docId: String! - workspaceId: String! + spaceId: String! } type DocHistoryNotFoundDataType { docId: String! + spaceId: String! timestamp: Int! - workspaceId: String! } type DocHistoryType { @@ -192,20 +196,21 @@ type DocHistoryType { type DocNotFoundDataType { docId: String! - workspaceId: String! + spaceId: String! } -union ErrorDataUnion = BlobNotFoundDataType | CopilotMessageNotFoundDataType | CopilotPromptNotFoundDataType | CopilotProviderSideErrorDataType | DocAccessDeniedDataType | DocHistoryNotFoundDataType | DocNotFoundDataType | InvalidHistoryTimestampDataType | InvalidPasswordLengthDataType | InvalidRuntimeConfigTypeDataType | MissingOauthQueryParameterDataType | NotInWorkspaceDataType | RuntimeConfigNotFoundDataType | SameSubscriptionRecurringDataType | SubscriptionAlreadyExistsDataType | SubscriptionNotExistsDataType | SubscriptionPlanNotFoundDataType | UnknownOauthProviderDataType | VersionRejectedDataType | WorkspaceAccessDeniedDataType | WorkspaceNotFoundDataType | WorkspaceOwnerNotFoundDataType +union ErrorDataUnion = AlreadyInSpaceDataType | BlobNotFoundDataType | CopilotMessageNotFoundDataType | CopilotPromptNotFoundDataType | CopilotProviderSideErrorDataType | DocAccessDeniedDataType | DocHistoryNotFoundDataType | DocNotFoundDataType | InvalidHistoryTimestampDataType | InvalidPasswordLengthDataType | InvalidRuntimeConfigTypeDataType | MissingOauthQueryParameterDataType | NotInSpaceDataType | RuntimeConfigNotFoundDataType | SameSubscriptionRecurringDataType | SpaceAccessDeniedDataType | SpaceNotFoundDataType | SpaceOwnerNotFoundDataType | SubscriptionAlreadyExistsDataType | SubscriptionNotExistsDataType | SubscriptionPlanNotFoundDataType | UnknownOauthProviderDataType | VersionRejectedDataType enum ErrorNames { ACCESS_DENIED ACTION_FORBIDDEN + ALREADY_IN_SPACE AUTHENTICATION_REQUIRED BLOB_NOT_FOUND BLOB_QUOTA_EXCEEDED CANNOT_DELETE_ALL_ADMIN_ACCOUNT CANNOT_DELETE_OWN_ACCOUNT - CANT_CHANGE_WORKSPACE_OWNER + CANT_CHANGE_SPACE_OWNER CANT_UPDATE_LIFETIME_SUBSCRIPTION COPILOT_ACTION_TAKEN COPILOT_FAILED_TO_CREATE_MESSAGE @@ -228,6 +233,8 @@ enum ErrorNames { EXPECT_TO_PUBLISH_PAGE EXPECT_TO_REVOKE_PUBLIC_PAGE FAILED_TO_CHECKOUT + FAILED_TO_SAVE_UPDATES + FAILED_TO_UPSERT_SNAPSHOT INTERNAL_SERVER_ERROR INVALID_EMAIL INVALID_EMAIL_TOKEN @@ -240,7 +247,7 @@ enum ErrorNames { MEMBER_QUOTA_EXCEEDED MISSING_OAUTH_QUERY_PARAMETER NOT_FOUND - NOT_IN_WORKSPACE + NOT_IN_SPACE NO_COPILOT_PROVIDER_AVAILABLE OAUTH_ACCOUNT_ALREADY_CONNECTED OAUTH_STATE_EXPIRED @@ -250,6 +257,9 @@ enum ErrorNames { SAME_EMAIL_PROVIDED SAME_SUBSCRIPTION_RECURRING SIGN_UP_FORBIDDEN + SPACE_ACCESS_DENIED + SPACE_NOT_FOUND + SPACE_OWNER_NOT_FOUND SUBSCRIPTION_ALREADY_EXISTS SUBSCRIPTION_EXPIRED SUBSCRIPTION_HAS_BEEN_CANCELED @@ -261,9 +271,6 @@ enum ErrorNames { USER_AVATAR_NOT_FOUND USER_NOT_FOUND VERSION_REJECTED - WORKSPACE_ACCESS_DENIED - WORKSPACE_NOT_FOUND - WORKSPACE_OWNER_NOT_FOUND WRONG_SIGN_IN_CREDENTIALS WRONG_SIGN_IN_METHOD } @@ -494,8 +501,8 @@ type Mutation { verifyEmail(token: String!): Boolean! } -type NotInWorkspaceDataType { - workspaceId: String! +type NotInSpaceDataType { + spaceId: String! } enum OAuthProviderType { @@ -687,6 +694,18 @@ type ServerServiceConfig { name: String! } +type SpaceAccessDeniedDataType { + spaceId: String! +} + +type SpaceNotFoundDataType { + spaceId: String! +} + +type SpaceOwnerNotFoundDataType { + spaceId: String! +} + type SubscriptionAlreadyExistsDataType { plan: String! } @@ -845,22 +864,10 @@ type VersionRejectedDataType { version: String! } -type WorkspaceAccessDeniedDataType { - workspaceId: String! -} - type WorkspaceBlobSizes { size: SafeInt! } -type WorkspaceNotFoundDataType { - workspaceId: String! -} - -type WorkspaceOwnerNotFoundDataType { - workspaceId: String! -} - type WorkspacePage { id: String! mode: PublicPageMode! diff --git a/packages/backend/server/tests/doc.spec.ts b/packages/backend/server/tests/doc.spec.ts deleted file mode 100644 index aaa4201cd4..0000000000 --- a/packages/backend/server/tests/doc.spec.ts +++ /dev/null @@ -1,394 +0,0 @@ -import { mock } from 'node:test'; - -import { TestingModule } from '@nestjs/testing'; -import { PrismaClient } from '@prisma/client'; -import test from 'ava'; -import * as Sinon from 'sinon'; -import { applyUpdate, Doc as YDoc, encodeStateAsUpdate } from 'yjs'; - -import { DocManager, DocModule } from '../src/core/doc'; -import { QuotaModule } from '../src/core/quota'; -import { StorageModule } from '../src/core/storage'; -import { Config } from '../src/fundamentals/config'; -import { createTestingModule } from './utils'; - -const createModule = () => { - return createTestingModule( - { - imports: [QuotaModule, StorageModule, DocModule], - }, - false - ); -}; - -let m: TestingModule; -let timer: Sinon.SinonFakeTimers; - -// cleanup database before each test -test.beforeEach(async () => { - timer = Sinon.useFakeTimers({ - toFake: ['setInterval'], - }); - m = await createModule(); - await m.init(); -}); - -test.afterEach.always(async () => { - await m.close(); - timer.restore(); -}); - -test('should setup update poll interval', async t => { - const m = await createModule(); - const manager = m.get(DocManager); - const fake = mock.method(manager, 'setup'); - - await m.init(); - - t.is(fake.mock.callCount(), 1); - // @ts-expect-error private member - t.truthy(manager.job); - m.close(); -}); - -test('should be able to stop poll', async t => { - const manager = m.get(DocManager); - const fake = mock.method(manager, 'destroy'); - - await m.close(); - - t.is(fake.mock.callCount(), 1); - // @ts-expect-error private member - t.is(manager.job, null); -}); - -test('should poll when intervel due', async t => { - const manager = m.get(DocManager); - const interval = m.get(Config).doc.manager.updatePollInterval; - - let resolve: any; - // @ts-expect-error private method - const fake = mock.method(manager, 'autoSquash', () => { - return new Promise(_resolve => { - resolve = _resolve; - }); - }); - - timer.tick(interval); - t.is(fake.mock.callCount(), 1); - - // busy - timer.tick(interval); - // @ts-expect-error private member - t.is(manager.busy, true); - t.is(fake.mock.callCount(), 1); - - resolve(); - await timer.tickAsync(1); - - // @ts-expect-error private member - t.is(manager.busy, false); - timer.tick(interval); - t.is(fake.mock.callCount(), 2); -}); - -test('should merge update when intervel due', async t => { - const db = m.get(PrismaClient); - const manager = m.get(DocManager); - - const doc = new YDoc(); - const text = doc.getText('content'); - text.insert(0, 'hello'); - const update = encodeStateAsUpdate(doc); - - const ws = await db.workspace.create({ - data: { - id: '1', - public: false, - }, - }); - - await db.update.createMany({ - data: [ - { - id: '1', - workspaceId: '1', - blob: Buffer.from([0, 0]), - seq: 1, - }, - { - id: '1', - workspaceId: '1', - blob: Buffer.from(update), - seq: 2, - }, - ], - }); - - // @ts-expect-error private method - await manager.autoSquash(); - - t.deepEqual( - (await manager.getBinary(ws.id, '1'))?.binary.toString('hex'), - Buffer.from(update.buffer).toString('hex') - ); - - let appendUpdate = Buffer.from([]); - doc.on('update', update => { - appendUpdate = Buffer.from(update); - }); - text.insert(5, 'world'); - - await db.update.create({ - data: { - workspaceId: ws.id, - id: '1', - blob: appendUpdate, - seq: 3, - }, - }); - - // @ts-expect-error private method - await manager.autoSquash(); - - t.deepEqual( - (await manager.getBinary(ws.id, '1'))?.binary.toString('hex'), - Buffer.from(encodeStateAsUpdate(doc)).toString('hex') - ); -}); - -test('should have sequential update number', async t => { - const db = m.get(PrismaClient); - const manager = m.get(DocManager); - const doc = new YDoc(); - const text = doc.getText('content'); - const updates: Buffer[] = []; - - doc.on('update', update => { - updates.push(Buffer.from(update)); - }); - - text.insert(0, 'hello'); - text.insert(5, 'world'); - text.insert(5, ' '); - - await Promise.all(updates.map(update => manager.push('2', '2', update))); - - // [1,2,3] - let records = await manager.getUpdates('2', '2'); - - t.deepEqual( - records.map(({ seq }) => seq), - [1, 2, 3] - ); - - // @ts-expect-error private method - await manager.autoSquash(); - - await db.snapshot.update({ - where: { - id_workspaceId: { - id: '2', - workspaceId: '2', - }, - }, - data: { - seq: 0x3ffffffe, - }, - }); - - await Promise.all(updates.map(update => manager.push('2', '2', update))); - - records = await manager.getUpdates('2', '2'); - - // push a new update with new seq num - await manager.push('2', '2', updates[0]); - - // let the manager ignore update with the new seq num - const stub = Sinon.stub(manager, 'getUpdates').resolves(records); - - // @ts-expect-error private method - await manager.autoSquash(); - stub.restore(); - - records = await manager.getUpdates('2', '2'); - - // should not merge in one run - t.not(records.length, 0); -}); - -test('should have correct sequential update number with batching push', async t => { - const manager = m.get(DocManager); - const doc = new YDoc(); - const text = doc.getText('content'); - const updates: Buffer[] = []; - - doc.on('update', update => { - updates.push(Buffer.from(update)); - }); - - text.insert(0, 'hello'); - text.insert(5, 'world'); - text.insert(5, ' '); - - await manager.batchPush('2', '2', updates); - - // [1,2,3] - const records = await manager.getUpdates('2', '2'); - - t.deepEqual( - records.map(({ seq }) => seq), - [1, 2, 3] - ); -}); - -test('should retry if seq num conflict', async t => { - const manager = m.get(DocManager); - - // @ts-expect-error private method - const stub = Sinon.stub(manager, 'getUpdateSeq'); - - stub.onCall(0).resolves(1); - // seq num conflict - stub.onCall(1).resolves(1); - stub.onCall(2).resolves(2); - await t.notThrowsAsync(() => manager.push('1', '1', Buffer.from([0, 0]))); - await t.notThrowsAsync(() => manager.push('1', '1', Buffer.from([0, 0]))); - - t.is(stub.callCount, 3); -}); - -test('should throw if meet max retry times', async t => { - const manager = m.get(DocManager); - - // @ts-expect-error private method - const stub = Sinon.stub(manager, 'getUpdateSeq'); - - stub.resolves(1); - await t.notThrowsAsync(() => manager.push('1', '1', Buffer.from([0, 0]))); - - await t.throwsAsync( - () => manager.push('1', '1', Buffer.from([0, 0]), 3 /* retry 3 times */), - { message: 'Failed to push update' } - ); - t.is(stub.callCount, 5); -}); - -test('should be able to insert the snapshot if it is new created', async t => { - const manager = m.get(DocManager); - - { - const doc = new YDoc(); - const text = doc.getText('content'); - text.insert(0, 'hello'); - const update = encodeStateAsUpdate(doc); - - await manager.push('1', '1', Buffer.from(update)); - } - const updates = await manager.getUpdates('1', '1'); - t.is(updates.length, 1); - // @ts-expect-error private - const { doc } = await manager.squash(null, updates); - - t.truthy(doc); - t.is(doc.getText('content').toString(), 'hello'); - - const restUpdates = await manager.getUpdates('1', '1'); - - t.is(restUpdates.length, 0); -}); - -test('should be able to merge updates into snapshot', async t => { - const manager = m.get(DocManager); - - const updates: Buffer[] = []; - { - const doc = new YDoc(); - doc.on('update', data => { - updates.push(Buffer.from(data)); - }); - - const text = doc.getText('content'); - text.insert(0, 'hello'); - text.insert(5, 'world'); - text.insert(5, ' '); - text.insert(11, '!'); - } - - { - await manager.batchPush('1', '1', updates.slice(0, 2)); - // do the merge - const { doc } = (await manager.get('1', '1'))!; - - t.is(doc.getText('content').toString(), 'helloworld'); - } - - { - await manager.batchPush('1', '1', updates.slice(2)); - const { doc } = (await manager.get('1', '1'))!; - - t.is(doc.getText('content').toString(), 'hello world!'); - } - - const restUpdates = await manager.getUpdates('1', '1'); - - t.is(restUpdates.length, 0); -}); - -test('should not update snapshot if doc is outdated', async t => { - const manager = m.get(DocManager); - const db = m.get(PrismaClient); - - const updates: Buffer[] = []; - { - const doc = new YDoc(); - doc.on('update', data => { - updates.push(Buffer.from(data)); - }); - - const text = doc.getText('content'); - text.insert(0, 'hello'); - text.insert(5, 'world'); - text.insert(5, ' '); - text.insert(11, '!'); - } - - await manager.batchPush('2', '1', updates.slice(0, 2)); // 'helloworld' - // merge updates into snapshot - await manager.get('2', '1'); - // fake the snapshot is a lot newer - await db.snapshot.update({ - where: { - id_workspaceId: { - workspaceId: '2', - id: '1', - }, - }, - data: { - updatedAt: new Date(Date.now() + 10000), - }, - }); - - { - const snapshot = await manager.getSnapshot('2', '1'); - await manager.batchPush('2', '1', updates.slice(2)); // 'hello world!' - const updateRecords = await manager.getUpdates('2', '1'); - - // @ts-expect-error private - const { doc } = await manager.squash(snapshot, updateRecords); - - // all updated will merged into doc not matter it's timestamp is outdated or not, - // but the snapshot record will not be updated - t.is(doc.getText('content').toString(), 'hello world!'); - } - - { - const doc = new YDoc(); - applyUpdate(doc, (await manager.getSnapshot('2', '1'))!.blob); - // the snapshot will not get touched if the new doc's timestamp is outdated - t.is(doc.getText('content').toString(), 'helloworld'); - - // the updates are known as outdated, so they will be deleted - t.is((await manager.getUpdates('2', '1')).length, 0); - } -}); diff --git a/packages/backend/server/tests/history.spec.ts b/packages/backend/server/tests/doc/history.spec.ts similarity index 56% rename from packages/backend/server/tests/history.spec.ts rename to packages/backend/server/tests/doc/history.spec.ts index b795b3c985..4da253c259 100644 --- a/packages/backend/server/tests/history.spec.ts +++ b/packages/backend/server/tests/doc/history.spec.ts @@ -4,36 +4,42 @@ import { PrismaClient } from '@prisma/client'; import test from 'ava'; import * as Sinon from 'sinon'; -import { DocHistoryManager } from '../src/core/doc'; -import { PermissionModule } from '../src/core/permission'; -import { QuotaModule } from '../src/core/quota'; -import { StorageModule } from '../src/core/storage'; -import type { EventPayload } from '../src/fundamentals/event'; -import { createTestingModule } from './utils'; +import { + DocStorageModule, + PgWorkspaceDocStorageAdapter, +} from '../../src/core/doc'; +import { DocStorageOptions } from '../../src/core/doc/options'; +import { DocRecord } from '../../src/core/doc/storage'; +import { createTestingModule, initTestingDB } from '../utils'; let m: TestingModule; -let manager: DocHistoryManager; +let adapter: PgWorkspaceDocStorageAdapter; let db: PrismaClient; // cleanup database before each test -test.beforeEach(async () => { +test.before(async () => { m = await createTestingModule({ - imports: [StorageModule, QuotaModule, PermissionModule], - providers: [DocHistoryManager], + imports: [DocStorageModule], }); - manager = m.get(DocHistoryManager); - Sinon.stub(manager, 'getExpiredDateFromNow').resolves( - new Date(Date.now() + 1000) - ); + adapter = m.get(PgWorkspaceDocStorageAdapter); db = m.get(PrismaClient); }); -test.afterEach.always(async () => { - await m.close(); +test.beforeEach(async () => { + await initTestingDB(db); + const options = m.get(DocStorageOptions); + Sinon.stub(options, 'historyMaxAge').resolves(1000); +}); + +test.afterEach(async () => { Sinon.restore(); }); +test.after.always(async () => { + await m.close(); +}); + const snapshot: Snapshot = { workspaceId: '1', id: 'doc1', @@ -44,21 +50,22 @@ const snapshot: Snapshot = { createdAt: new Date(), }; -function getEventData( - timestamp: Date = new Date() -): EventPayload<'snapshot.updated'> { +function getSnapshot(timestamp: number = Date.now()): DocRecord { return { - workspaceId: snapshot.workspaceId, - id: snapshot.id, - previous: { ...snapshot, updatedAt: timestamp }, + spaceId: snapshot.workspaceId, + docId: snapshot.id, + bin: snapshot.blob, + timestamp, }; } test('should create doc history if never created before', async t => { - Sinon.stub(manager, 'last').resolves(null); + // @ts-expect-error private method + Sinon.stub(adapter, 'lastDocHistory').resolves(null); - const timestamp = new Date(); - await manager.onDocUpdated(getEventData(timestamp)); + const timestamp = Date.now(); + // @ts-expect-error private method + await adapter.createDocHistory(getSnapshot(timestamp)); const history = await db.snapshotHistory.findFirst({ where: { @@ -68,33 +75,17 @@ test('should create doc history if never created before', async t => { }); t.truthy(history); - t.is(history?.timestamp.getTime(), timestamp.getTime()); + t.is(history?.timestamp.getTime(), timestamp); }); test('should not create history if timestamp equals to last record', async t => { const timestamp = new Date(); - Sinon.stub(manager, 'last').resolves({ timestamp, state: null }); - await manager.onDocUpdated(getEventData(timestamp)); + // @ts-expect-error private method + Sinon.stub(adapter, 'lastDocHistory').resolves({ timestamp, state: null }); - const history = await db.snapshotHistory.findFirst({ - where: { - workspaceId: '1', - id: 'doc1', - }, - }); - - t.falsy(history); -}); - -test('should not create history if state equals to last record', async t => { - const timestamp = new Date(); - Sinon.stub(manager, 'last').resolves({ - timestamp: new Date(timestamp.getTime() - 1), - state: snapshot.state, - }); - - await manager.onDocUpdated(getEventData(timestamp)); + // @ts-expect-error private method + await adapter.createDocHistory(getSnapshot(timestamp)); const history = await db.snapshotHistory.findFirst({ where: { @@ -108,12 +99,15 @@ test('should not create history if state equals to last record', async t => { test('should not create history if time diff is less than interval config', async t => { const timestamp = new Date(); - Sinon.stub(manager, 'last').resolves({ + + // @ts-expect-error private method + Sinon.stub(adapter, 'lastDocHistory').resolves({ timestamp: new Date(timestamp.getTime() - 1000), state: Buffer.from([0, 1]), }); - await manager.onDocUpdated(getEventData(timestamp)); + // @ts-expect-error private method + await adapter.createDocHistory(getSnapshot(timestamp)); const history = await db.snapshotHistory.findFirst({ where: { @@ -127,12 +121,15 @@ test('should not create history if time diff is less than interval config', asyn test('should create history if time diff is larger than interval config and state diff', async t => { const timestamp = new Date(); - Sinon.stub(manager, 'last').resolves({ + + // @ts-expect-error private method + Sinon.stub(adapter, 'lastDocHistory').resolves({ timestamp: new Date(timestamp.getTime() - 1000 * 60 * 20), state: Buffer.from([0, 1]), }); - await manager.onDocUpdated(getEventData(timestamp)); + // @ts-expect-error private method + await adapter.createDocHistory(getSnapshot(timestamp)); const history = await db.snapshotHistory.findFirst({ where: { @@ -146,12 +143,15 @@ test('should create history if time diff is larger than interval config and stat test('should create history with force flag even if time diff in small', async t => { const timestamp = new Date(); - Sinon.stub(manager, 'last').resolves({ + + // @ts-expect-error private method + Sinon.stub(adapter, 'lastDocHistory').resolves({ timestamp: new Date(timestamp.getTime() - 1), state: Buffer.from([0, 1]), }); - await manager.onDocUpdated(getEventData(timestamp), true); + // @ts-expect-error private method + await adapter.createDocHistory(getSnapshot(timestamp), true); const history = await db.snapshotHistory.findFirst({ where: { @@ -194,31 +194,31 @@ test('should correctly list all history records', async t => { })), }); - const list = await manager.list( + const list = await adapter.listDocHistories( snapshot.workspaceId, snapshot.id, - new Date(timestamp + 20), - 8 + { before: timestamp + 20, limit: 8 } ); - const count = await manager.count(snapshot.workspaceId, snapshot.id); + const count = await db.snapshotHistory.count(); t.is(list.length, 8); - t.is(count, 10); + t.is(count, 20); }); test('should be able to get history data', async t => { - const timestamp = new Date(); + const timestamp = Date.now(); - await manager.onDocUpdated(getEventData(timestamp), true); + // @ts-expect-error private method + await adapter.createDocHistory(getSnapshot(timestamp), true); - const history = await manager.get( + const history = await adapter.getDocHistory( snapshot.workspaceId, snapshot.id, timestamp ); t.truthy(history); - t.deepEqual(history?.blob, snapshot.blob); + t.deepEqual(history?.bin, snapshot.blob); }); test('should be able to get last history record', async t => { @@ -238,7 +238,11 @@ test('should be able to get last history record', async t => { })), }); - const history = await manager.last(snapshot.workspaceId, snapshot.id); + // @ts-expect-error private method + const history = await adapter.lastDocHistory( + snapshot.workspaceId, + snapshot.id + ); t.truthy(history); t.is(history?.timestamp.getTime(), timestamp + 9); @@ -253,12 +257,14 @@ test('should be able to recover from history', async t => { }, }); const history1Timestamp = snapshot.updatedAt.getTime() - 10; - await manager.onDocUpdated(getEventData(new Date(history1Timestamp))); - await manager.recover( + // @ts-expect-error private method + await adapter.createDocHistory(getSnapshot(history1Timestamp)); + + await adapter.rollbackDoc( snapshot.workspaceId, snapshot.id, - new Date(history1Timestamp) + history1Timestamp ); const [history1, history2] = await db.snapshotHistory.findMany({ @@ -273,49 +279,4 @@ test('should be able to recover from history', async t => { // new history data force created with snapshot state before recovered t.deepEqual(history2.blob, Buffer.from([1, 1])); - t.deepEqual(history2.state, Buffer.from([1, 1])); -}); - -test('should be able to cleanup expired history', async t => { - const timestamp = Date.now(); - - // insert expired data - await db.snapshotHistory.createMany({ - data: Array.from({ length: 10 }) - .fill(0) - .map((_, i) => ({ - workspaceId: snapshot.workspaceId, - id: snapshot.id, - blob: snapshot.blob, - state: snapshot.state, - timestamp: new Date(timestamp - 10 - i), - expiredAt: new Date(timestamp - 1), - })), - }); - - // insert available data - await db.snapshotHistory.createMany({ - data: Array.from({ length: 10 }) - .fill(0) - .map((_, i) => ({ - workspaceId: snapshot.workspaceId, - id: snapshot.id, - blob: snapshot.blob, - state: snapshot.state, - timestamp: new Date(timestamp + i), - expiredAt: new Date(timestamp + 1000), - })), - }); - - let count = await db.snapshotHistory.count(); - t.is(count, 20); - - await manager.cleanupExpiredHistory(); - - count = await db.snapshotHistory.count(); - t.is(count, 10); - - const example = await db.snapshotHistory.findFirst(); - t.truthy(example); - t.true(example!.expiredAt > new Date()); }); diff --git a/packages/backend/server/tests/workspace.e2e.ts b/packages/backend/server/tests/workspace.e2e.ts index 997d542b7d..59d6b170d0 100644 --- a/packages/backend/server/tests/workspace.e2e.ts +++ b/packages/backend/server/tests/workspace.e2e.ts @@ -116,7 +116,7 @@ test('should share a page', async t => { const msg1 = await publishPage(app, u2.token.token, 'not_exists_ws', 'page2'); t.is( msg1, - 'You do not have permission to access workspace not_exists_ws.', + 'You do not have permission to access Space not_exists_ws.', 'unauthorized user can share page' ); const msg2 = await revokePublicPage( @@ -127,7 +127,7 @@ test('should share a page', async t => { ); t.is( msg2, - 'You do not have permission to access workspace not_exists_ws.', + 'You do not have permission to access Space not_exists_ws.', 'unauthorized user can share page' ); diff --git a/packages/backend/server/tests/workspace/controller.spec.ts b/packages/backend/server/tests/workspace/controller.spec.ts index 4ada30db3a..b24d70863f 100644 --- a/packages/backend/server/tests/workspace/controller.spec.ts +++ b/packages/backend/server/tests/workspace/controller.spec.ts @@ -9,7 +9,7 @@ import request from 'supertest'; import { AppModule } from '../../src/app.module'; import { CurrentUser } from '../../src/core/auth'; import { AuthService } from '../../src/core/auth/service'; -import { DocHistoryManager, DocManager } from '../../src/core/doc'; +import { PgWorkspaceDocStorageAdapter } from '../../src/core/doc'; import { WorkspaceBlobStorage } from '../../src/core/storage'; import { createTestingApp, internalSignIn } from '../utils'; @@ -18,19 +18,17 @@ const test = ava as TestFn<{ db: PrismaClient; app: INestApplication; storage: Sinon.SinonStubbedInstance; - doc: Sinon.SinonStubbedInstance; + workspace: Sinon.SinonStubbedInstance; }>; -test.beforeEach(async t => { +test.before(async t => { const { app } = await createTestingApp({ imports: [AppModule], tapModule: m => { m.overrideProvider(WorkspaceBlobStorage) .useValue(Sinon.createStubInstance(WorkspaceBlobStorage)) - .overrideProvider(DocManager) - .useValue(Sinon.createStubInstance(DocManager)) - .overrideProvider(DocHistoryManager) - .useValue(Sinon.createStubInstance(DocHistoryManager)); + .overrideProvider(PgWorkspaceDocStorageAdapter) + .useValue(Sinon.createStubInstance(PgWorkspaceDocStorageAdapter)); }, }); @@ -41,7 +39,7 @@ test.beforeEach(async t => { t.context.db = db; t.context.app = app; t.context.storage = app.get(WorkspaceBlobStorage); - t.context.doc = app.get(DocManager); + t.context.workspace = app.get(PgWorkspaceDocStorageAdapter); await db.workspacePage.create({ data: { @@ -83,7 +81,7 @@ test.beforeEach(async t => { }); }); -test.afterEach.always(async t => { +test.after.always(async t => { await t.context.app.close(); }); @@ -227,10 +225,12 @@ test('should not be able to get private workspace with private page', async t => }); test('should be able to get doc', async t => { - const { app, doc } = t.context; + const { app, workspace: doc } = t.context; - doc.getBinary.resolves({ - binary: Buffer.from([0, 0]), + doc.getDoc.resolves({ + spaceId: '', + docId: '', + bin: Buffer.from([0, 0]), timestamp: Date.now(), }); @@ -244,10 +244,12 @@ test('should be able to get doc', async t => { }); test('should be able to change page publish mode', async t => { - const { app, doc, db } = t.context; + const { app, workspace: doc, db } = t.context; - doc.getBinary.resolves({ - binary: Buffer.from([0, 0]), + doc.getDoc.resolves({ + spaceId: '', + docId: '', + bin: Buffer.from([0, 0]), timestamp: Date.now(), });