feat(nbstore): add idb implementation (#8809)

This commit is contained in:
forehalo 2024-11-22 03:13:05 +00:00
parent 64656d198c
commit cd30e1a54b
No known key found for this signature in database
GPG Key ID: 56709255DC7EC728
13 changed files with 721 additions and 4 deletions

View File

@ -6,7 +6,9 @@
"sideEffects": false,
"exports": {
".": "./src/index.ts",
"./op": "./src/op/index.ts"
"./op": "./src/op/index.ts",
"./idb": "./src/impls/idb/index.ts",
"./idb/v1": "./src/impls/idb/v1/index.ts"
},
"dependencies": {
"@toeverything/infra": "workspace:*",
@ -14,5 +16,11 @@
"lodash-es": "^4.17.21",
"rxjs": "^7.8.1",
"yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch"
},
"devDependencies": {
"idb": "^8.0.0"
},
"peerDependencies": {
"idb": "^8.0.0"
}
}

View File

@ -0,0 +1,89 @@
import { share } from '../../connection';
import {
type BlobRecord,
BlobStorage,
type ListedBlobRecord,
} from '../../storage';
import { IDBConnection } from './db';
export class IndexedDBBlobStorage extends BlobStorage {
readonly connection = share(new IDBConnection(this.options));
get db() {
return this.connection.inner;
}
override async get(key: string) {
const trx = this.db.transaction(['blobs', 'blobData'], 'readonly');
const blob = await trx.objectStore('blobs').get(key);
const data = await trx.objectStore('blobData').get(key);
if (!blob || blob.deletedAt || !data) {
return null;
}
return {
...blob,
data: data.data,
};
}
override async set(blob: BlobRecord) {
const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite');
await trx.objectStore('blobs').put({
key: blob.key,
mime: blob.mime,
size: blob.data.byteLength,
createdAt: new Date(),
deletedAt: null,
});
await trx.objectStore('blobData').put({
key: blob.key,
data: blob.data,
});
}
override async delete(key: string, permanently: boolean) {
if (permanently) {
const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite');
await trx.objectStore('blobs').delete(key);
await trx.objectStore('blobData').delete(key);
} else {
const trx = this.db.transaction('blobs', 'readwrite');
const blob = await trx.store.get(key);
if (blob) {
await trx.store.put({
...blob,
deletedAt: new Date(),
});
}
}
}
override async release() {
const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite');
const it = trx.objectStore('blobs').iterate();
for await (const item of it) {
if (item.value.deletedAt) {
await item.delete();
await trx.objectStore('blobData').delete(item.value.key);
}
}
}
override async list() {
const trx = this.db.transaction('blobs', 'readonly');
const it = trx.store.iterate();
const blobs: ListedBlobRecord[] = [];
for await (const item of it) {
if (!item.value.deletedAt) {
blobs.push(item.value);
}
}
return blobs;
}
}

View File

@ -0,0 +1,43 @@
import { type IDBPDatabase, openDB } from 'idb';
import { Connection } from '../../connection';
import type { StorageOptions } from '../../storage';
import { type DocStorageSchema, migrator } from './schema';
export class IDBConnection extends Connection<IDBPDatabase<DocStorageSchema>> {
private readonly dbName = `${this.opts.peer}:${this.opts.type}:${this.opts.id}`;
override get shareId() {
return `idb(${migrator.version}):${this.dbName}`;
}
constructor(private readonly opts: StorageOptions) {
super();
}
override async doConnect() {
return openDB<DocStorageSchema>(this.dbName, migrator.version, {
upgrade: migrator.migrate,
blocking: () => {
// if, for example, an tab with newer version is opened, this function will be called.
// we should close current connection to allow the new version to upgrade the db.
this.close(
new Error('Blocking a new version. Closing the connection.')
);
},
blocked: () => {
// fallback to retry auto retry
this.setStatus('error', new Error('Blocked by other tabs.'));
},
});
}
override async doDisconnect() {
this.close();
}
private close(error?: Error) {
this.maybeConnection?.close();
this.setStatus('closed', error);
}
}

View File

@ -0,0 +1,118 @@
import { share } from '../../connection';
import {
type DocClocks,
type DocRecord,
DocStorage,
type DocUpdate,
} from '../../storage';
import { IDBConnection } from './db';
export class IndexedDBDocStorage extends DocStorage {
readonly connection = share(new IDBConnection(this.options));
get db() {
return this.connection.inner;
}
override async pushDocUpdate(update: DocUpdate) {
const trx = this.db.transaction(['updates', 'clocks'], 'readwrite');
const timestamp = new Date();
await trx.objectStore('updates').add({
...update,
createdAt: timestamp,
});
await trx.objectStore('clocks').put({ docId: update.docId, timestamp });
return { docId: update.docId, timestamp };
}
protected override async getDocSnapshot(docId: string) {
const trx = this.db.transaction('snapshots', 'readonly');
const record = await trx.store.get(docId);
if (!record) {
return null;
}
return {
docId,
bin: record.bin,
timestamp: record.updatedAt,
};
}
override async deleteDoc(docId: string) {
const trx = this.db.transaction(
['snapshots', 'updates', 'clocks'],
'readwrite'
);
const idx = trx.objectStore('updates').index('docId');
const iter = idx.iterate(IDBKeyRange.only(docId));
for await (const { value } of iter) {
await trx.objectStore('updates').delete([value.docId, value.createdAt]);
}
await trx.objectStore('snapshots').delete(docId);
await trx.objectStore('clocks').delete(docId);
}
override async getDocTimestamps(after: Date = new Date(0)) {
const trx = this.db.transaction('clocks', 'readonly');
const clocks = await trx.store.getAll();
return clocks.reduce((ret, cur) => {
if (cur.timestamp > after) {
ret[cur.docId] = cur.timestamp;
}
return ret;
}, {} as DocClocks);
}
protected override async setDocSnapshot(
snapshot: DocRecord
): Promise<boolean> {
const trx = this.db.transaction('snapshots', 'readwrite');
const record = await trx.store.get(snapshot.docId);
if (!record || record.updatedAt < snapshot.timestamp) {
await trx.store.put({
docId: snapshot.docId,
bin: snapshot.bin,
createdAt: record?.createdAt ?? snapshot.timestamp,
updatedAt: snapshot.timestamp,
});
}
trx.commit();
return true;
}
protected override async getDocUpdates(docId: string): Promise<DocRecord[]> {
const trx = this.db.transaction('updates', 'readonly');
const updates = await trx.store.index('docId').getAll(docId);
return updates.map(update => ({
docId,
bin: update.bin,
timestamp: update.createdAt,
}));
}
protected override async markUpdatesMerged(
docId: string,
updates: DocRecord[]
): Promise<number> {
const trx = this.db.transaction('updates', 'readwrite');
await Promise.all(
updates.map(update => trx.store.delete([docId, update.timestamp]))
);
trx.commit();
return updates.length;
}
}

View File

@ -0,0 +1,3 @@
export * from './blob';
export * from './doc';
export * from './sync';

View File

@ -0,0 +1,174 @@
import { type DBSchema, type OpenDBCallbacks } from 'idb';
/**
IndexedDB
> DB(workspace:${workspaceId})
> Table(Snapshots)
> Table(Updates)
> Table(...)
Table(Snapshots)
| docId | blob | createdAt | updatedAt |
|-------|------|-----------|-----------|
| str | bin | Date | Date |
Table(Updates)
| id | docId | blob | createdAt |
|----|-------|------|-----------|
|auto| str | bin | Date |
Table(Clocks)
| docId | clock |
|-------|-----------|
| str | Date |
Table(Blobs)
| key | mime | size | createdAt | deletedAt |
|-----|------|------|-----------|-----------|
| str | str | num | Date | Date |
Table(BlobData)
| key | data |
|-----|------|
| str | bin |
Table(PeerClocks)
| peer | docId | clock | pushed |
|------|-------|-----------|-----------|
| str | str | Date | Date |
*/
export interface DocStorageSchema extends DBSchema {
snapshots: {
key: string;
value: {
docId: string;
bin: Uint8Array;
createdAt: Date;
updatedAt: Date;
};
indexes: {
updatedAt: Date;
};
};
updates: {
key: [string, Date];
value: {
docId: string;
bin: Uint8Array;
createdAt: Date;
};
indexes: {
docId: string;
};
};
clocks: {
key: string;
value: {
docId: string;
timestamp: Date;
};
indexes: {
timestamp: Date;
};
};
blobs: {
key: string;
value: {
key: string;
mime: string;
size: number;
createdAt: Date;
deletedAt: Date | null;
};
};
blobData: {
key: string;
value: {
key: string;
data: Uint8Array;
};
};
peerClocks: {
key: [string, string];
value: {
peer: string;
docId: string;
clock: Date;
pushedClock: Date;
};
indexes: {
peer: string;
};
};
}
const migrate: OpenDBCallbacks<DocStorageSchema>['upgrade'] = (
db,
oldVersion,
_newVersion,
trx
) => {
if (!oldVersion) {
oldVersion = 0;
}
for (let i = oldVersion; i < migrations.length; i++) {
migrations[i](db, trx);
}
};
type MigrateParameters = Parameters<
NonNullable<OpenDBCallbacks<DocStorageSchema>['upgrade']>
>;
type Migrate = (db: MigrateParameters[0], trx: MigrateParameters[3]) => void;
// START REGION: migrations
const init: Migrate = db => {
const snapshots = db.createObjectStore('snapshots', {
keyPath: 'docId',
autoIncrement: false,
});
snapshots.createIndex('updatedAt', 'updatedAt', { unique: false });
const updates = db.createObjectStore('updates', {
keyPath: ['docId', 'createdAt'],
autoIncrement: false,
});
updates.createIndex('docId', 'docId', { unique: false });
const clocks = db.createObjectStore('clocks', {
keyPath: 'docId',
autoIncrement: false,
});
clocks.createIndex('timestamp', 'timestamp', { unique: false });
const peerClocks = db.createObjectStore('peerClocks', {
keyPath: ['peer', 'docId'],
autoIncrement: false,
});
peerClocks.createIndex('peer', 'peer', { unique: false });
db.createObjectStore('blobs', {
keyPath: 'key',
autoIncrement: false,
});
db.createObjectStore('blobData', {
keyPath: 'key',
autoIncrement: false,
});
};
// END REGION
// 1. all schema changed should be put in migrations
// 2. order matters
const migrations: Migrate[] = [init];
export const migrator = {
version: migrations.length,
migrate,
};

View File

@ -0,0 +1,66 @@
import { share } from '../../connection';
import { type DocClock, type DocClocks, SyncStorage } from '../../storage';
import { IDBConnection } from './db';
export class IndexedDBSyncStorage extends SyncStorage {
readonly connection = share(new IDBConnection(this.options));
get db() {
return this.connection.inner;
}
override async getPeerClocks(peer: string) {
const trx = this.db.transaction('peerClocks', 'readonly');
const records = await trx.store.index('peer').getAll(peer);
return records.reduce((clocks, { docId, clock }) => {
clocks[docId] = clock;
return clocks;
}, {} as DocClocks);
}
override async setPeerClock(peer: string, clock: DocClock) {
const trx = this.db.transaction('peerClocks', 'readwrite');
const record = await trx.store.get([peer, clock.docId]);
if (!record || record.clock < clock.timestamp) {
await trx.store.put({
peer,
docId: clock.docId,
clock: clock.timestamp,
pushedClock: record?.pushedClock ?? new Date(0),
});
}
}
override async getPeerPushedClocks(peer: string) {
const trx = this.db.transaction('peerClocks', 'readonly');
const records = await trx.store.index('peer').getAll(peer);
return records.reduce((clocks, { docId, pushedClock }) => {
clocks[docId] = pushedClock;
return clocks;
}, {} as DocClocks);
}
override async setPeerPushedClock(peer: string, clock: DocClock) {
const trx = this.db.transaction('peerClocks', 'readwrite');
const record = await trx.store.get([peer, clock.docId]);
if (!record || record.pushedClock < clock.timestamp) {
await trx.store.put({
peer,
docId: clock.docId,
clock: record?.clock ?? new Date(0),
pushedClock: clock.timestamp,
});
}
}
override async clearClocks() {
const trx = this.db.transaction('peerClocks', 'readwrite');
await trx.store.clear();
}
}

View File

@ -0,0 +1,62 @@
import { share } from '../../../connection';
import { BlobStorage, type ListedBlobRecord } from '../../../storage';
import { BlobIDBConnection } from './db';
/**
* @deprecated readonly
*/
export class IndexedDBV1BlobStorage extends BlobStorage {
readonly connection = share(new BlobIDBConnection(this.spaceId));
get db() {
return this.connection.inner;
}
override async get(key: string) {
const trx = this.db.transaction('blob', 'readonly');
const blob = await trx.store.get(key);
if (!blob) {
return null;
}
return {
key,
mime: '',
createdAt: new Date(),
data: new Uint8Array(blob),
};
}
override async delete(key: string, permanently: boolean) {
if (permanently) {
const trx = this.db.transaction('blob', 'readwrite');
await trx.store.delete(key);
}
}
override async list() {
const trx = this.db.transaction('blob', 'readonly');
const it = trx.store.iterate();
const records: ListedBlobRecord[] = [];
for await (const { key, value } of it) {
records.push({
key,
mime: '',
size: value.byteLength,
createdAt: new Date(),
});
}
return records;
}
override async set() {
// no more writes
}
override async release() {
// no more writes
}
}

View File

@ -0,0 +1,63 @@
import { type DBSchema, type IDBPDatabase, openDB } from 'idb';
import { Connection } from '../../../connection';
export interface DocDBSchema extends DBSchema {
workspace: {
key: string;
value: {
id: string;
updates: {
timestamp: number;
update: Uint8Array;
}[];
};
};
}
export class DocIDBConnection extends Connection<IDBPDatabase<DocDBSchema>> {
override get shareId() {
return 'idb(old):affine-local';
}
override async doConnect() {
return openDB<DocDBSchema>('affine-local', 1, {
upgrade: db => {
db.createObjectStore('workspace', { keyPath: 'id' });
},
});
}
override async doDisconnect(conn: IDBPDatabase<DocDBSchema>) {
conn.close();
}
}
export interface BlobDBSchema extends DBSchema {
blob: {
key: string;
value: ArrayBuffer;
};
}
export class BlobIDBConnection extends Connection<IDBPDatabase<BlobDBSchema>> {
constructor(private readonly workspaceId: string) {
super();
}
override get shareId() {
return `idb(old-blob):${this.workspaceId}`;
}
override async doConnect() {
return openDB<BlobDBSchema>(`${this.workspaceId}_blob`, 1, {
upgrade: db => {
db.createObjectStore('blob');
},
});
}
override async doDisconnect(conn: IDBPDatabase<BlobDBSchema>) {
conn.close();
}
}

View File

@ -0,0 +1,71 @@
import { share } from '../../../connection';
import { type DocRecord, DocStorage, type DocUpdate } from '../../../storage';
import { DocIDBConnection } from './db';
/**
* @deprecated readonly
*/
export class IndexedDBV1DocStorage extends DocStorage {
readonly connection = share(new DocIDBConnection());
get db() {
return this.connection.inner;
}
get name() {
return 'idb(old)';
}
override async getDoc(docId: string) {
const trx = this.db.transaction('workspace', 'readonly');
const record = await trx.store.get(docId);
if (!record?.updates.length) {
return null;
}
if (record.updates.length === 1) {
return {
docId,
bin: record.updates[0].update,
timestamp: new Date(record.updates[0].timestamp),
};
}
return {
docId,
bin: await this.mergeUpdates(record.updates.map(update => update.update)),
timestamp: new Date(record.updates.at(-1)?.timestamp ?? Date.now()),
};
}
protected override async getDocSnapshot() {
return null;
}
override async pushDocUpdate(update: DocUpdate) {
// no more writes to old db
return { docId: update.docId, timestamp: new Date() };
}
override async deleteDoc(docId: string) {
const trx = this.db.transaction('workspace', 'readwrite');
await trx.store.delete(docId);
}
override async getDocTimestamps() {
return {};
}
protected override async setDocSnapshot(): Promise<boolean> {
return false;
}
protected override async getDocUpdates(): Promise<DocRecord[]> {
return [];
}
protected override async markUpdatesMerged(): Promise<number> {
return 0;
}
}

View File

@ -0,0 +1,2 @@
export * from './blob';
export * from './doc';

View File

@ -1,11 +1,26 @@
import type { Storage } from '../storage';
import {
IndexedDBBlobStorage,
IndexedDBDocStorage,
IndexedDBSyncStorage,
} from './idb';
import { IndexedDBV1BlobStorage, IndexedDBV1DocStorage } from './idb/v1';
type StorageConstructor = new (...args: any[]) => Storage;
export const storages: StorageConstructor[] = [];
const idb: StorageConstructor[] = [
IndexedDBDocStorage,
IndexedDBBlobStorage,
IndexedDBSyncStorage,
];
const idbv1: StorageConstructor[] = [
IndexedDBV1DocStorage,
IndexedDBV1BlobStorage,
];
export const storages: StorageConstructor[] = [...idbv1, ...idb];
// in next pr
// eslint-disable-next-line sonarjs/no-empty-collection
const AvailableStorageImplementations = storages.reduce(
(acc, curr) => {
acc[curr.name] = curr;

View File

@ -729,9 +729,12 @@ __metadata:
dependencies:
"@toeverything/infra": "workspace:*"
eventemitter2: "npm:^6.4.9"
idb: "npm:^8.0.0"
lodash-es: "npm:^4.17.21"
rxjs: "npm:^7.8.1"
yjs: "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch"
peerDependencies:
idb: ^8.0.0
languageName: unknown
linkType: soft