feat(nbstore): add awareness storage&sync&frontend (#9016)

This commit is contained in:
EYHN 2024-12-17 04:37:15 +00:00
parent 36ac79351f
commit ffa0231cf5
No known key found for this signature in database
GPG Key ID: 46C9E26A75AB276C
21 changed files with 572 additions and 26 deletions

View File

@ -20,6 +20,7 @@
"lodash-es": "^4.17.21",
"nanoid": "^5.0.9",
"rxjs": "^7.8.1",
"y-protocols": "^1.0.6",
"yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch"
},
"devDependencies": {

View File

@ -1,10 +1,14 @@
import 'fake-indexeddb/auto';
import { test, vitest } from 'vitest';
import { expect, test, vitest } from 'vitest';
import { Awareness } from 'y-protocols/awareness.js';
import { Doc as YDoc } from 'yjs';
import { AwarenessFrontend } from '../frontend/awareness';
import { DocFrontend } from '../frontend/doc';
import { BroadcastChannelAwarenessStorage } from '../impls/broadcast-channel/awareness';
import { IndexedDBDocStorage } from '../impls/idb';
import { AwarenessSync } from '../sync/awareness';
import { expectYjsEqual } from './utils';
test('doc', async () => {
@ -48,3 +52,86 @@ test('doc', async () => {
});
});
});
test('awareness', async () => {
const storage1 = new BroadcastChannelAwarenessStorage({
id: 'ws1',
peer: 'a',
type: 'workspace',
});
const storage2 = new BroadcastChannelAwarenessStorage({
id: 'ws1',
peer: 'b',
type: 'workspace',
});
await storage1.connect();
await storage2.connect();
// peer a
const docA = new YDoc({ guid: 'test-doc' });
docA.clientID = 1;
const awarenessA = new Awareness(docA);
// peer b
const docB = new YDoc({ guid: 'test-doc' });
docB.clientID = 2;
const awarenessB = new Awareness(docB);
// peer c
const docC = new YDoc({ guid: 'test-doc' });
docC.clientID = 3;
const awarenessC = new Awareness(docC);
{
const sync = new AwarenessSync(storage1, [storage2]);
const frontend = new AwarenessFrontend(sync);
frontend.connect(awarenessA);
frontend.connect(awarenessB);
}
{
const sync = new AwarenessSync(storage2, [storage1]);
const frontend = new AwarenessFrontend(sync);
frontend.connect(awarenessC);
}
awarenessA.setLocalState({
hello: 'world',
});
await vitest.waitFor(() => {
expect(awarenessB.getStates().get(1)).toEqual({
hello: 'world',
});
expect(awarenessC.getStates().get(1)).toEqual({
hello: 'world',
});
});
awarenessB.setLocalState({
foo: 'bar',
});
await vitest.waitFor(() => {
expect(awarenessA.getStates().get(2)).toEqual({
foo: 'bar',
});
expect(awarenessC.getStates().get(2)).toEqual({
foo: 'bar',
});
});
awarenessC.setLocalState({
baz: 'qux',
});
await vitest.waitFor(() => {
expect(awarenessA.getStates().get(3)).toEqual({
baz: 'qux',
});
expect(awarenessB.getStates().get(3)).toEqual({
baz: 'qux',
});
});
});

View File

@ -9,7 +9,7 @@ import {
IndexedDBSyncStorage,
} from '../impls/idb';
import { SpaceStorage } from '../storage';
import { SyncEngine } from '../sync';
import { Sync } from '../sync';
import { expectYjsEqual } from './utils';
test('doc', async () => {
@ -53,7 +53,7 @@ test('doc', async () => {
bin: update,
});
const sync = new SyncEngine(peerA, [peerB, peerC]);
const sync = new Sync(peerA, [peerB, peerC]);
sync.start();
await new Promise(resolve => setTimeout(resolve, 1000));
@ -143,7 +143,7 @@ test('blob', async () => {
await peerB.connect();
await peerC.connect();
const sync = new SyncEngine(peerA, [peerB, peerC]);
const sync = new Sync(peerA, [peerB, peerC]);
sync.start();
await new Promise(resolve => setTimeout(resolve, 1000));

View File

@ -0,0 +1,70 @@
import { nanoid } from 'nanoid';
import {
applyAwarenessUpdate,
type Awareness,
encodeAwarenessUpdate,
} from 'y-protocols/awareness.js';
import type { AwarenessRecord } from '../storage/awareness';
import type { AwarenessSync } from '../sync/awareness';
type AwarenessChanges = Record<'added' | 'updated' | 'removed', number[]>;
export class AwarenessFrontend {
constructor(private readonly sync: AwarenessSync) {}
connect(awareness: Awareness) {
const uniqueId = nanoid();
const handleAwarenessUpdate = (
changes: AwarenessChanges,
origin: string
) => {
if (origin === uniqueId) {
return;
}
const changedClients = Object.values(changes).reduce((res, cur) =>
res.concat(cur)
);
const update = encodeAwarenessUpdate(awareness, changedClients);
this.sync
.update(
{
docId: awareness.doc.guid,
bin: update,
},
uniqueId
)
.catch(error => {
console.error('update awareness error', error);
});
};
awareness.on('update', handleAwarenessUpdate);
const handleSyncUpdate = (update: AwarenessRecord, origin?: string) => {
if (origin === uniqueId) {
// skip self update
return;
}
applyAwarenessUpdate(awareness, update.bin, origin);
};
const handleSyncCollect = () => {
return {
docId: awareness.doc.guid,
bin: encodeAwarenessUpdate(awareness, [awareness.clientID]),
};
};
const unsubscribe = this.sync.subscribeUpdate(
awareness.doc.guid,
handleSyncUpdate,
handleSyncCollect
);
return () => {
awareness.off('update', handleAwarenessUpdate);
unsubscribe();
};
}
}

View File

@ -1,10 +1,10 @@
import type { BlobRecord, BlobStorage } from '../storage';
import type { BlobSyncEngine } from '../sync/blob';
import type { BlobSync } from '../sync/blob';
export class BlobFrontend {
constructor(
readonly storage: BlobStorage,
readonly sync?: BlobSyncEngine
readonly sync?: BlobSync
) {}
get(blobId: string) {

View File

@ -9,7 +9,7 @@ import {
} from 'yjs';
import type { DocRecord, DocStorage } from '../storage';
import type { DocSyncEngine } from '../sync/doc';
import type { DocSync } from '../sync/doc';
import { AsyncPriorityQueue } from '../utils/async-priority-queue';
import { isEmptyUpdate } from '../utils/is-empty-update';
import { throwIfAborted } from '../utils/throw-if-aborted';
@ -56,7 +56,7 @@ export class DocFrontend {
constructor(
private readonly storage: DocStorage,
private readonly sync: DocSyncEngine | null,
private readonly sync: DocSync | null,
readonly options: DocFrontendOptions = {}
) {}

View File

@ -0,0 +1,128 @@
import { nanoid } from 'nanoid';
import {
type AwarenessRecord,
AwarenessStorage,
} from '../../storage/awareness';
import { BroadcastChannelConnection } from './channel';
type ChannelMessage =
| {
type: 'awareness-update';
docId: string;
bin: Uint8Array;
origin?: string;
}
| {
type: 'awareness-collect';
docId: string;
collectId: string;
}
| {
type: 'awareness-collect-fallback';
docId: string;
bin: Uint8Array;
collectId: string;
};
export class BroadcastChannelAwarenessStorage extends AwarenessStorage {
override readonly storageType = 'awareness';
override readonly connection = new BroadcastChannelConnection(this.options);
get channel() {
return this.connection.inner;
}
private readonly subscriptions = new Map<
string,
Set<{
onUpdate: (update: AwarenessRecord, origin?: string) => void;
onCollect: () => AwarenessRecord;
}>
>();
override update(record: AwarenessRecord, origin?: string): Promise<void> {
const subscribers = this.subscriptions.get(record.docId);
if (subscribers) {
subscribers.forEach(subscriber => subscriber.onUpdate(record, origin));
}
this.channel.postMessage({
type: 'awareness-update',
docId: record.docId,
bin: record.bin,
origin,
} satisfies ChannelMessage);
return Promise.resolve();
}
override subscribeUpdate(
id: string,
onUpdate: (update: AwarenessRecord, origin?: string) => void,
onCollect: () => AwarenessRecord
): () => void {
const subscribers = this.subscriptions.get(id) ?? new Set();
subscribers.forEach(subscriber => {
const fallback = subscriber.onCollect();
onUpdate(fallback);
});
const collectUniqueId = nanoid();
const onChannelMessage = (message: MessageEvent<ChannelMessage>) => {
if (
message.data.type === 'awareness-update' &&
message.data.docId === id
) {
onUpdate(
{
docId: message.data.docId,
bin: message.data.bin,
},
message.data.origin
);
}
if (
message.data.type === 'awareness-collect' &&
message.data.docId === id
) {
const fallback = onCollect();
if (fallback) {
this.channel.postMessage({
type: 'awareness-collect-fallback',
docId: message.data.docId,
bin: fallback.bin,
collectId: collectUniqueId,
} satisfies ChannelMessage);
}
}
if (
message.data.type === 'awareness-collect-fallback' &&
message.data.docId === id &&
message.data.collectId === collectUniqueId
) {
onUpdate({
docId: message.data.docId,
bin: message.data.bin,
});
}
};
this.channel.addEventListener('message', onChannelMessage);
this.channel.postMessage({
type: 'awareness-collect',
docId: id,
collectId: collectUniqueId,
} satisfies ChannelMessage);
const subscriber = {
onUpdate,
onCollect,
};
subscribers.add(subscriber);
this.subscriptions.set(id, subscribers);
return () => {
subscribers.delete(subscriber);
this.channel.removeEventListener('message', onChannelMessage);
};
}
}

View File

@ -0,0 +1,23 @@
import { Connection } from '../../connection';
import type { StorageOptions } from '../../storage';
export class BroadcastChannelConnection extends Connection<BroadcastChannel> {
readonly channelName = `channel:${this.opts.peer}:${this.opts.type}:${this.opts.id}`;
constructor(private readonly opts: StorageOptions) {
super();
}
override async doConnect() {
return new BroadcastChannel(this.channelName);
}
override async doDisconnect() {
this.close();
}
private close(error?: Error) {
this.maybeConnection?.close();
this.setStatus('closed', error);
}
}

View File

@ -0,0 +1,148 @@
import type { SocketOptions } from 'socket.io-client';
import { share } from '../../connection';
import {
type AwarenessRecord,
AwarenessStorage,
type AwarenessStorageOptions,
} from '../../storage/awareness';
import {
base64ToUint8Array,
SocketConnection,
uint8ArrayToBase64,
} from './socket';
interface CloudAwarenessStorageOptions extends AwarenessStorageOptions {
socketOptions: SocketOptions;
}
export class CloudAwarenessStorage extends AwarenessStorage<CloudAwarenessStorageOptions> {
connection = share(
new SocketConnection(this.peer, this.options.socketOptions)
);
private get socket() {
return this.connection.inner;
}
override async connect(): Promise<void> {
await super.connect();
}
override async update(record: AwarenessRecord): Promise<void> {
const encodedUpdate = await uint8ArrayToBase64(record.bin);
this.socket.emit('space:update-awareness', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId: record.docId,
awarenessUpdate: encodedUpdate,
});
}
override subscribeUpdate(
id: string,
onUpdate: (update: AwarenessRecord, origin?: string) => void,
onCollect: () => AwarenessRecord
): () => void {
// leave awareness
const leave = () => {
this.socket.emit('space:leave-awareness', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId: id,
});
};
// join awareness, and collect awareness from others
const joinAndCollect = async () => {
await this.socket.emitWithAck('space:join-awareness', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId: id,
clientVersion: BUILD_CONFIG.appVersion,
});
this.socket.emit('space:load-awarenesses', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId: id,
});
};
joinAndCollect().catch(err => console.error('awareness join failed', err));
const unsubscribeConnectionStatusChanged = this.connection.onStatusChanged(
status => {
if (status === 'connected') {
joinAndCollect().catch(err =>
console.error('awareness join failed', err)
);
}
}
);
const handleCollectAwareness = ({
spaceId,
spaceType,
docId,
}: {
spaceId: string;
spaceType: string;
docId: string;
}) => {
if (
spaceId === this.spaceId &&
spaceType === this.spaceType &&
docId === id
) {
(async () => {
const record = onCollect();
const encodedUpdate = await uint8ArrayToBase64(record.bin);
this.socket.emit('space:update-awareness', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId: record.docId,
awarenessUpdate: encodedUpdate,
});
})().catch(err => console.error('awareness upload failed', err));
}
};
const handleBroadcastAwarenessUpdate = ({
spaceType,
spaceId,
docId,
awarenessUpdate,
}: {
spaceType: string;
spaceId: string;
docId: string;
awarenessUpdate: string;
}) => {
if (
spaceId === this.spaceId &&
spaceType === this.spaceType &&
docId === id
) {
onUpdate({
bin: base64ToUint8Array(awarenessUpdate),
docId: id,
});
}
};
this.socket.on('space:collect-awareness', handleCollectAwareness);
this.socket.on(
'space:broadcast-awareness-update',
handleBroadcastAwarenessUpdate
);
return () => {
leave();
this.socket.off('space:collect-awareness', handleCollectAwareness);
this.socket.off(
'space:broadcast-awareness-update',
handleBroadcastAwarenessUpdate
);
unsubscribeConnectionStatusChanged();
};
}
}

View File

@ -1,4 +1,3 @@
import { noop } from 'lodash-es';
import type { SocketOptions } from 'socket.io-client';
import { share } from '../../connection';
@ -33,7 +32,9 @@ export class CloudDocStorage extends DocStorage<CloudDocStorageOptions> {
await super.connect();
this.connection.onStatusChanged(status => {
if (status === 'connected') {
this.join().catch(noop);
this.join().catch(err => {
console.error('doc storage join failed', err);
});
this.socket.on('space:broadcast-doc-update', this.onServerUpdate);
}
});

View File

@ -1,2 +1,3 @@
export * from './awareness';
export * from './blob';
export * from './doc';

View File

@ -29,6 +29,19 @@ interface ServerEvents {
timestamp: number;
editor: string;
};
'space:collect-awareness': {
spaceType: string;
spaceId: string;
docId: string;
};
'space:broadcast-awareness-update': {
spaceType: string;
spaceId: string;
docId: string;
awarenessUpdate: string;
};
}
interface ClientEvents {
@ -52,6 +65,19 @@ interface ClientEvents {
docId: string;
};
'space:update-awareness': {
spaceType: string;
spaceId: string;
docId: string;
awarenessUpdate: string;
};
'space:load-awarenesses': {
spaceType: string;
spaceId: string;
docId: string;
};
'space:push-doc-update': [
{ spaceType: string; spaceId: string; docId: string; updates: string },
{ timestamp: number },

View File

@ -34,7 +34,7 @@ export class IDBConnection extends Connection<{
this.setStatus('error', new Error('Blocked by other tabs.'));
},
}),
channel: new BroadcastChannel(this.dbName),
channel: new BroadcastChannel('idb:' + this.dbName),
};
}

View File

@ -0,0 +1,27 @@
import { Storage, type StorageOptions } from './storage';
export interface AwarenessStorageOptions extends StorageOptions {}
export type AwarenessRecord = {
docId: string;
bin: Uint8Array;
};
export abstract class AwarenessStorage<
Options extends AwarenessStorageOptions = AwarenessStorageOptions,
> extends Storage<Options> {
override readonly storageType = 'awareness';
/**
* Update the awareness record.
*
* @param origin - Internal identifier to recognize the source in the "update" event. Will not be stored or transferred.
*/
abstract update(record: AwarenessRecord, origin?: string): Promise<void>;
abstract subscribeUpdate(
id: string,
onUpdate: (update: AwarenessRecord, origin?: string) => void,
onCollect: () => AwarenessRecord
): () => void;
}

View File

@ -3,7 +3,7 @@ import EventEmitter2 from 'eventemitter2';
import type { ConnectionStatus } from '../connection';
import type { BlobStorage } from './blob';
import type { DocStorage } from './doc';
import { type Storage, type StorageType } from './storage';
import type { Storage, StorageType } from './storage';
import type { SyncStorage } from './sync';
type Storages = DocStorage | BlobStorage | SyncStorage;
@ -22,7 +22,10 @@ export class SpaceStorage {
tryGet<T extends StorageType>(
type: T
): Extract<Storages, { storageType: T }> | undefined {
return this.storages.get(type) as Extract<Storages, { storageType: T }>;
return this.storages.get(type) as unknown as Extract<
Storages,
{ storageType: T }
>;
}
get<T extends StorageType>(type: T): Extract<Storages, { storageType: T }> {

View File

@ -1,7 +1,7 @@
import type { Connection } from '../connection';
export type SpaceType = 'workspace' | 'userspace';
export type StorageType = 'blob' | 'doc' | 'sync';
export type StorageType = 'blob' | 'doc' | 'sync' | 'awareness';
export interface StorageOptions {
peer: string;

View File

@ -0,0 +1,30 @@
import type {
AwarenessRecord,
AwarenessStorage,
} from '../../storage/awareness';
export class AwarenessSync {
constructor(
readonly local: AwarenessStorage,
readonly remotes: AwarenessStorage[]
) {}
async update(record: AwarenessRecord, origin?: string) {
await Promise.all(
[this.local, ...this.remotes].map(peer => peer.update(record, origin))
);
}
subscribeUpdate(
id: string,
onUpdate: (update: AwarenessRecord, origin?: string) => void,
onCollect: () => AwarenessRecord
): () => void {
const unsubscribes = [this.local, ...this.remotes].map(peer =>
peer.subscribeUpdate(id, onUpdate, onCollect)
);
return () => {
unsubscribes.forEach(unsubscribe => unsubscribe());
};
}
}

View File

@ -3,7 +3,7 @@ import { difference } from 'lodash-es';
import type { BlobRecord, BlobStorage } from '../../storage';
import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted';
export class BlobSyncEngine {
export class BlobSync {
private abort: AbortController | null = null;
constructor(

View File

@ -1,7 +1,7 @@
import type { DocStorage, SyncStorage } from '../../storage';
import { DocSyncPeer } from './peer';
export class DocSyncEngine {
export class DocSync {
private readonly peers: DocSyncPeer[];
private abort: AbortController | null = null;

View File

@ -1,10 +1,10 @@
import type { BlobStorage, DocStorage, SpaceStorage } from '../storage';
import { BlobSyncEngine } from './blob';
import { DocSyncEngine } from './doc';
import { BlobSync } from './blob';
import { DocSync } from './doc';
export class SyncEngine {
private readonly doc: DocSyncEngine | null;
private readonly blob: BlobSyncEngine | null;
export class Sync {
private readonly doc: DocSync | null;
private readonly blob: BlobSync | null;
constructor(
readonly local: SpaceStorage,
@ -16,7 +16,7 @@ export class SyncEngine {
this.doc =
doc && sync
? new DocSyncEngine(
? new DocSync(
doc,
sync,
peers
@ -25,7 +25,7 @@ export class SyncEngine {
)
: null;
this.blob = blob
? new BlobSyncEngine(
? new BlobSync(
blob,
peers
.map(peer => peer.tryGet('blob'))

View File

@ -747,6 +747,7 @@ __metadata:
rxjs: "npm:^7.8.1"
socket.io-client: "npm:^4.8.1"
vitest: "npm:2.1.8"
y-protocols: "npm:^1.0.6"
yjs: "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch"
peerDependencies:
"@affine/electron-api": "workspace:*"
@ -15060,9 +15061,9 @@ __metadata:
linkType: hard
"@ungap/structured-clone@npm:^1.0.0":
version: 1.2.1
resolution: "@ungap/structured-clone@npm:1.2.1"
checksum: 10/6770f71e8183311b2871601ddb02d62a26373be7cf2950cb546a345a2305c75b502e36ce80166120aa2f5f1ea1562141684651ebbfcc711c58acd32035d3e545
version: 1.2.0
resolution: "@ungap/structured-clone@npm:1.2.0"
checksum: 10/c6fe89a505e513a7592e1438280db1c075764793a2397877ff1351721fe8792a966a5359769e30242b3cd023f2efb9e63ca2ca88019d73b564488cc20e3eab12
languageName: node
linkType: hard