AFFiNE/packages/backend/server/tests/doc.spec.ts

395 lines
9.5 KiB
TypeScript

import { mock } from 'node:test';
import { TestingModule } from '@nestjs/testing';
import { PrismaClient } from '@prisma/client';
import test from 'ava';
import * as Sinon from 'sinon';
import { applyUpdate, Doc as YDoc, encodeStateAsUpdate } from 'yjs';
import { DocManager, DocModule } from '../src/core/doc';
import { QuotaModule } from '../src/core/quota';
import { StorageModule } from '../src/core/storage';
import { Config } from '../src/fundamentals/config';
import { createTestingModule } from './utils';
const createModule = () => {
return createTestingModule(
{
imports: [QuotaModule, StorageModule, DocModule],
},
false
);
};
let m: TestingModule;
let timer: Sinon.SinonFakeTimers;
// cleanup database before each test
test.beforeEach(async () => {
timer = Sinon.useFakeTimers({
toFake: ['setInterval'],
});
m = await createModule();
await m.init();
});
test.afterEach.always(async () => {
await m.close();
timer.restore();
});
test('should setup update poll interval', async t => {
const m = await createModule();
const manager = m.get(DocManager);
const fake = mock.method(manager, 'setup');
await m.init();
t.is(fake.mock.callCount(), 1);
// @ts-expect-error private member
t.truthy(manager.job);
m.close();
});
test('should be able to stop poll', async t => {
const manager = m.get(DocManager);
const fake = mock.method(manager, 'destroy');
await m.close();
t.is(fake.mock.callCount(), 1);
// @ts-expect-error private member
t.is(manager.job, null);
});
test('should poll when intervel due', async t => {
const manager = m.get(DocManager);
const interval = m.get(Config).doc.manager.updatePollInterval;
let resolve: any;
// @ts-expect-error private method
const fake = mock.method(manager, 'autoSquash', () => {
return new Promise(_resolve => {
resolve = _resolve;
});
});
timer.tick(interval);
t.is(fake.mock.callCount(), 1);
// busy
timer.tick(interval);
// @ts-expect-error private member
t.is(manager.busy, true);
t.is(fake.mock.callCount(), 1);
resolve();
await timer.tickAsync(1);
// @ts-expect-error private member
t.is(manager.busy, false);
timer.tick(interval);
t.is(fake.mock.callCount(), 2);
});
test('should merge update when intervel due', async t => {
const db = m.get(PrismaClient);
const manager = m.get(DocManager);
const doc = new YDoc();
const text = doc.getText('content');
text.insert(0, 'hello');
const update = encodeStateAsUpdate(doc);
const ws = await db.workspace.create({
data: {
id: '1',
public: false,
},
});
await db.update.createMany({
data: [
{
id: '1',
workspaceId: '1',
blob: Buffer.from([0, 0]),
seq: 1,
},
{
id: '1',
workspaceId: '1',
blob: Buffer.from(update),
seq: 2,
},
],
});
// @ts-expect-error private method
await manager.autoSquash();
t.deepEqual(
(await manager.getBinary(ws.id, '1'))?.binary.toString('hex'),
Buffer.from(update.buffer).toString('hex')
);
let appendUpdate = Buffer.from([]);
doc.on('update', update => {
appendUpdate = Buffer.from(update);
});
text.insert(5, 'world');
await db.update.create({
data: {
workspaceId: ws.id,
id: '1',
blob: appendUpdate,
seq: 3,
},
});
// @ts-expect-error private method
await manager.autoSquash();
t.deepEqual(
(await manager.getBinary(ws.id, '1'))?.binary.toString('hex'),
Buffer.from(encodeStateAsUpdate(doc)).toString('hex')
);
});
test('should have sequential update number', async t => {
const db = m.get(PrismaClient);
const manager = m.get(DocManager);
const doc = new YDoc();
const text = doc.getText('content');
const updates: Buffer[] = [];
doc.on('update', update => {
updates.push(Buffer.from(update));
});
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
await Promise.all(updates.map(update => manager.push('2', '2', update)));
// [1,2,3]
let records = await manager.getUpdates('2', '2');
t.deepEqual(
records.map(({ seq }) => seq),
[1, 2, 3]
);
// @ts-expect-error private method
await manager.autoSquash();
await db.snapshot.update({
where: {
id_workspaceId: {
id: '2',
workspaceId: '2',
},
},
data: {
seq: 0x3ffffffe,
},
});
await Promise.all(updates.map(update => manager.push('2', '2', update)));
records = await manager.getUpdates('2', '2');
// push a new update with new seq num
await manager.push('2', '2', updates[0]);
// let the manager ignore update with the new seq num
const stub = Sinon.stub(manager, 'getUpdates').resolves(records);
// @ts-expect-error private method
await manager.autoSquash();
stub.restore();
records = await manager.getUpdates('2', '2');
// should not merge in one run
t.not(records.length, 0);
});
test('should have correct sequential update number with batching push', async t => {
const manager = m.get(DocManager);
const doc = new YDoc();
const text = doc.getText('content');
const updates: Buffer[] = [];
doc.on('update', update => {
updates.push(Buffer.from(update));
});
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
await manager.batchPush('2', '2', updates);
// [1,2,3]
const records = await manager.getUpdates('2', '2');
t.deepEqual(
records.map(({ seq }) => seq),
[1, 2, 3]
);
});
test('should retry if seq num conflict', async t => {
const manager = m.get(DocManager);
// @ts-expect-error private method
const stub = Sinon.stub(manager, 'getUpdateSeq');
stub.onCall(0).resolves(1);
// seq num conflict
stub.onCall(1).resolves(1);
stub.onCall(2).resolves(2);
await t.notThrowsAsync(() => manager.push('1', '1', Buffer.from([0, 0])));
await t.notThrowsAsync(() => manager.push('1', '1', Buffer.from([0, 0])));
t.is(stub.callCount, 3);
});
test('should throw if meet max retry times', async t => {
const manager = m.get(DocManager);
// @ts-expect-error private method
const stub = Sinon.stub(manager, 'getUpdateSeq');
stub.resolves(1);
await t.notThrowsAsync(() => manager.push('1', '1', Buffer.from([0, 0])));
await t.throwsAsync(
() => manager.push('1', '1', Buffer.from([0, 0]), 3 /* retry 3 times */),
{ message: 'Failed to push update' }
);
t.is(stub.callCount, 5);
});
test('should be able to insert the snapshot if it is new created', async t => {
const manager = m.get(DocManager);
{
const doc = new YDoc();
const text = doc.getText('content');
text.insert(0, 'hello');
const update = encodeStateAsUpdate(doc);
await manager.push('1', '1', Buffer.from(update));
}
const updates = await manager.getUpdates('1', '1');
t.is(updates.length, 1);
// @ts-expect-error private
const { doc } = await manager.squash(null, updates);
t.truthy(doc);
t.is(doc.getText('content').toString(), 'hello');
const restUpdates = await manager.getUpdates('1', '1');
t.is(restUpdates.length, 0);
});
test('should be able to merge updates into snapshot', async t => {
const manager = m.get(DocManager);
const updates: Buffer[] = [];
{
const doc = new YDoc();
doc.on('update', data => {
updates.push(Buffer.from(data));
});
const text = doc.getText('content');
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
text.insert(11, '!');
}
{
await manager.batchPush('1', '1', updates.slice(0, 2));
// do the merge
const { doc } = (await manager.get('1', '1'))!;
t.is(doc.getText('content').toString(), 'helloworld');
}
{
await manager.batchPush('1', '1', updates.slice(2));
const { doc } = (await manager.get('1', '1'))!;
t.is(doc.getText('content').toString(), 'hello world!');
}
const restUpdates = await manager.getUpdates('1', '1');
t.is(restUpdates.length, 0);
});
test('should not update snapshot if doc is outdated', async t => {
const manager = m.get(DocManager);
const db = m.get(PrismaClient);
const updates: Buffer[] = [];
{
const doc = new YDoc();
doc.on('update', data => {
updates.push(Buffer.from(data));
});
const text = doc.getText('content');
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
text.insert(11, '!');
}
await manager.batchPush('2', '1', updates.slice(0, 2)); // 'helloworld'
// merge updates into snapshot
await manager.get('2', '1');
// fake the snapshot is a lot newer
await db.snapshot.update({
where: {
id_workspaceId: {
workspaceId: '2',
id: '1',
},
},
data: {
updatedAt: new Date(Date.now() + 10000),
},
});
{
const snapshot = await manager.getSnapshot('2', '1');
await manager.batchPush('2', '1', updates.slice(2)); // 'hello world!'
const updateRecords = await manager.getUpdates('2', '1');
// @ts-expect-error private
const { doc } = await manager.squash(snapshot, updateRecords);
// all updated will merged into doc not matter it's timestamp is outdated or not,
// but the snapshot record will not be updated
t.is(doc.getText('content').toString(), 'hello world!');
}
{
const doc = new YDoc();
applyUpdate(doc, (await manager.getSnapshot('2', '1'))!.blob);
// the snapshot will not get touched if the new doc's timestamp is outdated
t.is(doc.getText('content').toString(), 'helloworld');
// the updates are known as outdated, so they will be deleted
t.is((await manager.getUpdates('2', '1')).length, 0);
}
});