fix(workspace): fix sync stuck (#5762)

* remove MultipleBatchSyncSender
* add timeout (30 seconds) on socket.emit
This commit is contained in:
EYHN 2024-02-01 06:58:09 +00:00
parent aa33bf60d6
commit 799fa9cfa6
No known key found for this signature in database
GPG Key ID: 46C9E26A75AB276C
4 changed files with 185 additions and 321 deletions

View File

@ -32,12 +32,13 @@ export class AffineCloudAwarenessProvider implements AwarenessProvider {
window.addEventListener('beforeunload', this.windowBeforeUnloadHandler);
this.socket.connect();
this.socket.on('connect', () => this.handleConnect());
this.socket.emit('client-handshake-awareness', this.workspaceId);
this.socket.emit('awareness-init', this.workspaceId);
if (this.socket.connected) {
this.handleConnect();
} else {
this.socket.connect();
}
}
disconnect(): void {
removeAwarenessStates(

View File

@ -0,0 +1,180 @@
import { DebugLogger } from '@affine/debug';
import { fetchWithTraceReport } from '@affine/graphql';
import { type SyncStorage } from '@toeverything/infra';
import type { CleanupService } from '@toeverything/infra/lifecycle';
import { getIoManager } from '../utils/affine-io';
import { base64ToUint8Array, uint8ArrayToBase64 } from '../utils/base64';
const logger = new DebugLogger('affine:storage:socketio');
export class AffineSyncStorage implements SyncStorage {
name = 'affine-cloud';
SEND_TIMEOUT = 30000;
socket = getIoManager().socket('/');
constructor(
private readonly workspaceId: string,
cleanupService: CleanupService
) {
this.socket.on('connect', this.handleConnect);
if (this.socket.connected) {
this.socket.emit('client-handshake-sync', this.workspaceId);
} else {
this.socket.connect();
}
cleanupService.add(() => {
this.cleanup();
});
}
handleConnect = () => {
this.socket.emit('client-handshake-sync', this.workspaceId);
};
async pull(
docId: string,
state: Uint8Array
): Promise<{ data: Uint8Array; state?: Uint8Array } | null> {
const stateVector = state ? await uint8ArrayToBase64(state) : undefined;
logger.debug('doc-load-v2', {
workspaceId: this.workspaceId,
guid: docId,
stateVector,
});
const response:
| { error: any }
| { data: { missing: string; state: string } } = await this.socket
.timeout(this.SEND_TIMEOUT)
.emitWithAck('doc-load-v2', {
workspaceId: this.workspaceId,
guid: docId,
stateVector,
});
logger.debug('doc-load callback', {
workspaceId: this.workspaceId,
guid: docId,
stateVector,
response,
});
if ('error' in response) {
// TODO: result `EventError` with server
if (response.error.code === 'DOC_NOT_FOUND') {
return null;
} else {
throw new Error(response.error.message);
}
} else {
return {
data: base64ToUint8Array(response.data.missing),
state: response.data.state
? base64ToUint8Array(response.data.state)
: undefined,
};
}
}
async push(docId: string, update: Uint8Array) {
logger.debug('client-update-v2', {
workspaceId: this.workspaceId,
guid: docId,
update,
});
const payload = await uint8ArrayToBase64(update);
const response: {
// TODO: reuse `EventError` with server
error?: any;
data: any;
} = await this.socket
.timeout(this.SEND_TIMEOUT)
.emitWithAck('client-update-v2', {
workspaceId: this.workspaceId,
guid: docId,
updates: [payload],
});
// TODO: raise error with different code to users
if (response.error) {
logger.error('client-update-v2 error', {
workspaceId: this.workspaceId,
guid: docId,
response,
});
throw new Error(response.error);
}
}
async subscribe(
cb: (docId: string, data: Uint8Array) => void,
disconnect: (reason: string) => void
) {
const handleUpdate = async (message: {
workspaceId: string;
guid: string;
updates: string[];
}) => {
if (message.workspaceId === this.workspaceId) {
message.updates.forEach(update => {
cb(message.guid, base64ToUint8Array(update));
});
}
};
const handleDisconnect = (reason: string) => {
this.socket.off('server-updates', handleUpdate);
disconnect(reason);
};
this.socket.on('server-updates', handleUpdate);
this.socket.on('disconnect', handleDisconnect);
return () => {
this.socket.off('server-updates', handleUpdate);
this.socket.off('disconnect', handleDisconnect);
};
}
cleanup() {
this.socket.emit('client-leave-sync', this.workspaceId);
this.socket.off('connect', this.handleConnect);
}
}
export class AffineStaticSyncStorage implements SyncStorage {
name = 'affine-cloud-static';
constructor(private readonly workspaceId: string) {}
async pull(
docId: string
): Promise<{ data: Uint8Array; state?: Uint8Array | undefined } | null> {
const response = await fetchWithTraceReport(
`/api/workspaces/${this.workspaceId}/docs/${docId}`,
{
priority: 'high',
}
);
if (response.ok) {
const arrayBuffer = await response.arrayBuffer();
return { data: new Uint8Array(arrayBuffer) };
}
return null;
}
push(): Promise<void> {
throw new Error('Method not implemented.');
}
subscribe(): Promise<() => void> {
throw new Error('Method not implemented.');
}
}

View File

@ -1,107 +0,0 @@
interface SyncUpdateSender {
(
guid: string,
updates: Uint8Array[]
): Promise<{
accepted: boolean;
retry: boolean;
}>;
}
/**
* BatchSyncSender is simple wrapper with vanilla update sync with several advanced features:
* - ACK mechanism, send updates sequentially with previous sync request correctly responds with ACK
* - batching updates, when waiting for previous ACK, new updates will be buffered and sent in single sync request
* - retryable, allow retry when previous sync request failed but with retry flag been set to true
*/
export class BatchSyncSender {
private readonly buffered: Uint8Array[] = [];
private job: Promise<void> | null = null;
private started = true;
constructor(
private readonly guid: string,
private readonly rawSender: SyncUpdateSender
) {}
send(update: Uint8Array) {
this.buffered.push(update);
this.next();
return Promise.resolve();
}
stop() {
this.started = false;
}
start() {
this.started = true;
this.next();
}
private next() {
if (!this.started || this.job || !this.buffered.length) {
return;
}
const lastIndex = Math.min(
this.buffered.length - 1,
99 /* max batch updates size */
);
const updates = this.buffered.slice(0, lastIndex + 1);
if (updates.length) {
this.job = this.rawSender(this.guid, updates)
.then(({ accepted, retry }) => {
// remove pending updates if updates are accepted
if (accepted) {
this.buffered.splice(0, lastIndex + 1);
}
// stop when previous sending failed and non-recoverable
if (accepted || retry) {
// avoid call stack overflow
setTimeout(() => {
this.next();
}, 0);
} else {
this.stop();
}
})
.catch(() => {
this.stop();
})
.finally(() => {
this.job = null;
});
}
}
}
export class MultipleBatchSyncSender {
private senders: Record<string, BatchSyncSender> = {};
constructor(private readonly rawSender: SyncUpdateSender) {}
async send(guid: string, update: Uint8Array) {
return this.getSender(guid).send(update);
}
private getSender(guid: string) {
let sender = this.senders[guid];
if (!sender) {
sender = new BatchSyncSender(guid, this.rawSender);
this.senders[guid] = sender;
}
return sender;
}
start() {
Object.values(this.senders).forEach(sender => sender.start());
}
stop() {
Object.values(this.senders).forEach(sender => sender.stop());
}
}

View File

@ -1,210 +0,0 @@
import { DebugLogger } from '@affine/debug';
import { fetchWithTraceReport } from '@affine/graphql';
import { type SyncStorage } from '@toeverything/infra';
import type { CleanupService } from '@toeverything/infra/lifecycle';
import { getIoManager } from '../../utils/affine-io';
import { base64ToUint8Array, uint8ArrayToBase64 } from '../../utils/base64';
import { MultipleBatchSyncSender } from './batch-sync-sender';
const logger = new DebugLogger('affine:storage:socketio');
export class AffineSyncStorage implements SyncStorage {
name = 'affine-cloud';
socket = getIoManager().socket('/');
syncSender = new MultipleBatchSyncSender(async (guid, updates) => {
const payload = await Promise.all(
updates.map(update => uint8ArrayToBase64(update))
);
return new Promise(resolve => {
this.socket.emit(
'client-update-v2',
{
workspaceId: this.workspaceId,
guid,
updates: payload,
},
(response: {
// TODO: reuse `EventError` with server
error?: any;
data: any;
}) => {
// TODO: raise error with different code to users
if (response.error) {
logger.error('client-update-v2 error', {
workspaceId: this.workspaceId,
guid,
response,
});
}
resolve({
accepted: !response.error,
// TODO: reuse `EventError` with server
retry: response.error?.code === 'INTERNAL',
});
}
);
});
});
constructor(
private readonly workspaceId: string,
cleanupService: CleanupService
) {
this.socket.on('connect', this.handleConnect);
this.socket.connect();
this.socket.emit(
'client-handshake-sync',
this.workspaceId,
(response: { error?: any }) => {
if (!response.error) {
this.syncSender.start();
}
}
);
cleanupService.add(() => {
this.cleanup();
});
}
handleConnect = () => {
this.socket.emit(
'client-handshake-sync',
this.workspaceId,
(response: { error?: any }) => {
if (!response.error) {
this.syncSender.start();
}
}
);
};
async pull(
docId: string,
state: Uint8Array
): Promise<{ data: Uint8Array; state?: Uint8Array } | null> {
const stateVector = state ? await uint8ArrayToBase64(state) : undefined;
return new Promise((resolve, reject) => {
logger.debug('doc-load-v2', {
workspaceId: this.workspaceId,
guid: docId,
stateVector,
});
this.socket.emit(
'doc-load-v2',
{
workspaceId: this.workspaceId,
guid: docId,
stateVector,
},
(
response: // TODO: reuse `EventError` with server
{ error: any } | { data: { missing: string; state: string } }
) => {
logger.debug('doc-load callback', {
workspaceId: this.workspaceId,
guid: docId,
stateVector,
response,
});
if ('error' in response) {
// TODO: result `EventError` with server
if (response.error.code === 'DOC_NOT_FOUND') {
resolve(null);
} else {
reject(new Error(response.error.message));
}
} else {
resolve({
data: base64ToUint8Array(response.data.missing),
state: response.data.state
? base64ToUint8Array(response.data.state)
: undefined,
});
}
}
);
});
}
async push(docId: string, update: Uint8Array) {
logger.debug('client-update-v2', {
workspaceId: this.workspaceId,
guid: docId,
update,
});
await this.syncSender.send(docId, update);
}
async subscribe(
cb: (docId: string, data: Uint8Array) => void,
disconnect: (reason: string) => void
) {
const handleUpdate = async (message: {
workspaceId: string;
guid: string;
updates: string[];
}) => {
if (message.workspaceId === this.workspaceId) {
message.updates.forEach(update => {
cb(message.guid, base64ToUint8Array(update));
});
}
};
this.socket.on('server-updates', handleUpdate);
this.socket.on('disconnect', reason => {
this.socket.off('server-updates', handleUpdate);
disconnect(reason);
});
return () => {
this.socket.off('server-updates', handleUpdate);
};
}
cleanup() {
this.syncSender.stop();
this.socket.emit('client-leave-sync', this.workspaceId);
this.socket.off('connect', this.handleConnect);
}
}
export class AffineStaticSyncStorage implements SyncStorage {
name = 'affine-cloud-static';
constructor(private readonly workspaceId: string) {}
async pull(
docId: string
): Promise<{ data: Uint8Array; state?: Uint8Array | undefined } | null> {
const response = await fetchWithTraceReport(
`/api/workspaces/${this.workspaceId}/docs/${docId}`,
{
priority: 'high',
}
);
if (response.ok) {
const arrayBuffer = await response.arrayBuffer();
return { data: new Uint8Array(arrayBuffer) };
}
return null;
}
push(): Promise<void> {
throw new Error('Method not implemented.');
}
subscribe(): Promise<() => void> {
throw new Error('Method not implemented.');
}
}