fix(core): merge updates before push to storage (#4986)

This commit is contained in:
EYHN 2023-11-20 23:26:19 +08:00 committed by GitHub
parent 9370110cdc
commit 90c130cf15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 10 deletions

View File

@ -13,6 +13,12 @@ export const dbVersion = 1;
export const DEFAULT_DB_NAME = 'affine-local';
export function mergeUpdates(updates: Uint8Array[]) {
if (updates.length === 0) {
return new Uint8Array();
}
if (updates.length === 1) {
return updates[0];
}
const doc = new Doc();
doc.transact(() => {
updates.forEach(update => {

View File

@ -4,7 +4,7 @@ import { isEqual } from '@blocksuite/global/utils';
import type { Doc } from 'yjs';
import { applyUpdate, encodeStateAsUpdate, encodeStateVector } from 'yjs';
import type { Storage } from '../storage';
import { mergeUpdates, type Storage } from '../storage';
import { AsyncQueue } from '../utils/async-queue';
import { throwIfAborted } from '../utils/throw-if-aborted';
import { MANUALLY_STOP } from './engine';
@ -154,7 +154,7 @@ export class SyncPeer {
connectedDocs: Map<string, Doc>;
pushUpdatesQueue: AsyncQueue<{
docId: string;
data: Uint8Array;
data: Uint8Array[];
}>;
pushingUpdate: boolean;
pullUpdatesQueue: AsyncQueue<{
@ -262,14 +262,16 @@ export class SyncPeer {
this.state.pushingUpdate = true;
this.reportSyncStatus();
const merged = mergeUpdates(data);
// don't push empty data or Uint8Array([0, 0])
if (
!(
data.byteLength === 0 ||
(data.byteLength === 2 && data[0] === 0 && data[1] === 0)
merged.byteLength === 0 ||
(merged.byteLength === 2 && merged[0] === 0 && merged[1] === 0)
)
) {
await this.storage.push(docId, data);
await this.storage.push(docId, merged);
}
this.state.pushingUpdate = false;
@ -298,7 +300,7 @@ export class SyncPeer {
// diff root doc and in-storage, save updates to pendingUpdates
this.state.pushUpdatesQueue.push({
docId: doc.guid,
data: encodeStateAsUpdate(doc, inStorageState),
data: [encodeStateAsUpdate(doc, inStorageState)],
});
this.state.connectedDocs.set(doc.guid, doc);
@ -324,10 +326,19 @@ export class SyncPeer {
if (origin === this.name) {
return;
}
this.state.pushUpdatesQueue.push({
docId: doc.guid,
data: update,
});
const exist = this.state.pushUpdatesQueue.find(
({ docId }) => docId === doc.guid
);
if (exist) {
exist.data.push(update);
} else {
this.state.pushUpdatesQueue.push({
docId: doc.guid,
data: [update],
});
}
this.reportSyncStatus();
};

View File

@ -56,6 +56,10 @@ export class AsyncQueue<T> {
}
}
find(predicate: (update: T) => boolean) {
return this._queue.find(predicate);
}
clear() {
this._queue = [];
}