diff --git a/packages/frontend/workspace/src/providers/storage/indexeddb/index.ts b/packages/frontend/workspace/src/providers/storage/indexeddb/index.ts index 79a165635c..51a47d352f 100644 --- a/packages/frontend/workspace/src/providers/storage/indexeddb/index.ts +++ b/packages/frontend/workspace/src/providers/storage/indexeddb/index.ts @@ -13,6 +13,12 @@ export const dbVersion = 1; export const DEFAULT_DB_NAME = 'affine-local'; export function mergeUpdates(updates: Uint8Array[]) { + if (updates.length === 0) { + return new Uint8Array(); + } + if (updates.length === 1) { + return updates[0]; + } const doc = new Doc(); doc.transact(() => { updates.forEach(update => { diff --git a/packages/frontend/workspace/src/providers/sync/peer.ts b/packages/frontend/workspace/src/providers/sync/peer.ts index 18f8493b4f..ecca8106a7 100644 --- a/packages/frontend/workspace/src/providers/sync/peer.ts +++ b/packages/frontend/workspace/src/providers/sync/peer.ts @@ -4,7 +4,7 @@ import { isEqual } from '@blocksuite/global/utils'; import type { Doc } from 'yjs'; import { applyUpdate, encodeStateAsUpdate, encodeStateVector } from 'yjs'; -import type { Storage } from '../storage'; +import { mergeUpdates, type Storage } from '../storage'; import { AsyncQueue } from '../utils/async-queue'; import { throwIfAborted } from '../utils/throw-if-aborted'; import { MANUALLY_STOP } from './engine'; @@ -154,7 +154,7 @@ export class SyncPeer { connectedDocs: Map; pushUpdatesQueue: AsyncQueue<{ docId: string; - data: Uint8Array; + data: Uint8Array[]; }>; pushingUpdate: boolean; pullUpdatesQueue: AsyncQueue<{ @@ -262,14 +262,16 @@ export class SyncPeer { this.state.pushingUpdate = true; this.reportSyncStatus(); + const merged = mergeUpdates(data); + // don't push empty data or Uint8Array([0, 0]) if ( !( - data.byteLength === 0 || - (data.byteLength === 2 && data[0] === 0 && data[1] === 0) + merged.byteLength === 0 || + (merged.byteLength === 2 && merged[0] === 0 && merged[1] === 0) ) ) { - await this.storage.push(docId, data); + await this.storage.push(docId, merged); } this.state.pushingUpdate = false; @@ -298,7 +300,7 @@ export class SyncPeer { // diff root doc and in-storage, save updates to pendingUpdates this.state.pushUpdatesQueue.push({ docId: doc.guid, - data: encodeStateAsUpdate(doc, inStorageState), + data: [encodeStateAsUpdate(doc, inStorageState)], }); this.state.connectedDocs.set(doc.guid, doc); @@ -324,10 +326,19 @@ export class SyncPeer { if (origin === this.name) { return; } - this.state.pushUpdatesQueue.push({ - docId: doc.guid, - data: update, - }); + + const exist = this.state.pushUpdatesQueue.find( + ({ docId }) => docId === doc.guid + ); + if (exist) { + exist.data.push(update); + } else { + this.state.pushUpdatesQueue.push({ + docId: doc.guid, + data: [update], + }); + } + this.reportSyncStatus(); }; diff --git a/packages/frontend/workspace/src/providers/utils/async-queue.ts b/packages/frontend/workspace/src/providers/utils/async-queue.ts index 8b146a4f07..db29b8d43e 100644 --- a/packages/frontend/workspace/src/providers/utils/async-queue.ts +++ b/packages/frontend/workspace/src/providers/utils/async-queue.ts @@ -56,6 +56,10 @@ export class AsyncQueue { } } + find(predicate: (update: T) => boolean) { + return this._queue.find(predicate); + } + clear() { this._queue = []; }