fix: make server guid consistent (#4341)

This commit is contained in:
liuyi 2023-10-17 10:34:13 +08:00 committed by GitHub
parent b3dc4dce9c
commit 01987990ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 303 additions and 114 deletions

View File

@ -2,6 +2,7 @@ import {
Inject,
Injectable,
Logger,
OnApplicationBootstrap,
OnModuleDestroy,
OnModuleInit,
} from '@nestjs/common';
@ -13,6 +14,7 @@ import { Config } from '../../config';
import { Metrics } from '../../metrics/metrics';
import { PrismaService } from '../../prisma';
import { mergeUpdatesInApplyWay as jwstMergeUpdates } from '../../storage';
import { DocID } from '../../utils/doc';
function compare(yBinary: Buffer, jwstBinary: Buffer, strict = false): boolean {
if (yBinary.equals(jwstBinary)) {
@ -42,7 +44,9 @@ const MAX_SEQ_NUM = 0x3fffffff; // u31
* along side all the updates that have not been applies to that snapshot(timestamp).
*/
@Injectable()
export class DocManager implements OnModuleInit, OnModuleDestroy {
export class DocManager
implements OnModuleInit, OnModuleDestroy, OnApplicationBootstrap
{
protected logger = new Logger(DocManager.name);
private job: NodeJS.Timeout | null = null;
private seqMap = new Map<string, number>();
@ -56,6 +60,12 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
protected readonly metrics: Metrics
) {}
async onApplicationBootstrap() {
if (!this.config.node.test) {
await this.refreshDocGuid();
}
}
onModuleInit() {
if (this.automation) {
this.logger.log('Use Database');
@ -239,7 +249,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
/**
* get the snapshot of the doc we've seen.
*/
protected async getSnapshot(workspaceId: string, guid: string) {
async getSnapshot(workspaceId: string, guid: string) {
return this.db.snapshot.findUnique({
where: {
id_workspaceId: {
@ -253,7 +263,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
/**
* get pending updates
*/
protected async getUpdates(workspaceId: string, guid: string) {
async getUpdates(workspaceId: string, guid: string) {
return this.db.update.findMany({
where: {
workspaceId,
@ -411,4 +421,46 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
return last + 1;
}
}
/**
* deal with old records that has wrong guid format
* correct guid with `${non-wsId}:${variant}:${subId}` to `${subId}`
*
* @TODO delete in next release
* @deprecated
*/
private async refreshDocGuid() {
let turn = 0;
let lastTurnCount = 100;
while (lastTurnCount === 100) {
const docs = await this.db.snapshot.findMany({
skip: turn * 100,
take: 100,
orderBy: {
createdAt: 'asc',
},
});
lastTurnCount = docs.length;
for (const doc of docs) {
const docId = new DocID(doc.id, doc.workspaceId);
if (docId && !docId.isWorkspace && docId.guid !== doc.id) {
await this.db.snapshot.update({
where: {
id_workspaceId: {
id: doc.id,
workspaceId: doc.workspaceId,
},
},
data: {
id: docId.guid,
},
});
}
}
turn++;
}
}
}

View File

@ -4,6 +4,7 @@ import Redis from 'ioredis';
import { Config } from '../../config';
import { Metrics } from '../../metrics/metrics';
import { PrismaService } from '../../prisma';
import { DocID } from '../../utils/doc';
import { DocManager } from './manager';
function makeKey(prefix: string) {
@ -62,11 +63,9 @@ export class RedisDocManager extends DocManager {
return;
}
const docId = new DocID(pendingDoc);
const updateKey = updates`${pendingDoc}`;
const lockKey = lock`${pendingDoc}`;
const splitAt = pendingDoc.indexOf(':');
const workspaceId = pendingDoc.substring(0, splitAt);
const id = pendingDoc.substring(splitAt + 1);
// acquire the lock
const lockResult = await this.redis
@ -98,18 +97,18 @@ export class RedisDocManager extends DocManager {
}
this.logger.verbose(
`applying ${updates.length} updates for workspace: ${workspaceId}, guid: ${id}`
`applying ${updates.length} updates for workspace: ${docId}`
);
const snapshot = await this.getSnapshot(workspaceId, id);
const snapshot = await this.getSnapshot(docId.workspace, docId.guid);
// merge
const doc = snapshot
? this.applyUpdates(id, snapshot.blob, ...updates)
: this.applyUpdates(id, ...updates);
? this.applyUpdates(docId.full, snapshot.blob, ...updates)
: this.applyUpdates(docId.full, ...updates);
// update snapshot
await this.upsert(workspaceId, id, doc, snapshot?.seq);
await this.upsert(docId.workspace, docId.guid, doc, snapshot?.seq);
// delete merged updates
await this.redis
@ -120,9 +119,9 @@ export class RedisDocManager extends DocManager {
});
} catch (e) {
this.logger.error(
`Failed to merge updates with snapshot for ${pendingDoc}: ${e}`
`Failed to merge updates with snapshot for ${docId}: ${e}`
);
await this.redis.sadd(pending, `${workspaceId}:${id}`).catch(() => null); // safe
await this.redis.sadd(pending, docId.toString()).catch(() => null); // safe
} finally {
await this.redis.del(lockKey);
}

View File

@ -12,7 +12,7 @@ import { Server, Socket } from 'socket.io';
import { encodeStateAsUpdate, encodeStateVector } from 'yjs';
import { Metrics } from '../../../metrics/metrics';
import { trimGuid } from '../../../utils/doc';
import { DocID } from '../../../utils/doc';
import { Auth, CurrentUser } from '../../auth';
import { DocManager } from '../../doc';
import { UserType } from '../../users';
@ -85,7 +85,11 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
@SubscribeMessage('client-update')
async handleClientUpdate(
@MessageBody()
message: {
{
workspaceId,
guid,
update,
}: {
workspaceId: string;
guid: string;
update: string;
@ -94,51 +98,54 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
) {
this.metric.socketIOEventCounter(1, { event: 'client-update' });
const endTimer = this.metric.socketIOEventTimer({ event: 'client-update' });
if (!client.rooms.has(message.workspaceId)) {
if (!client.rooms.has(workspaceId)) {
this.logger.verbose(
`Client ${client.id} tried to push update to workspace ${message.workspaceId} without joining it first`
`Client ${client.id} tried to push update to workspace ${workspaceId} without joining it first`
);
endTimer();
return;
}
const update = Buffer.from(message.update, 'base64');
client.to(message.workspaceId).emit('server-update', message);
const docId = new DocID(guid, workspaceId);
client
.to(docId.workspace)
.emit('server-update', { workspaceId, guid, update });
const guid = trimGuid(message.workspaceId, message.guid);
await this.docManager.push(message.workspaceId, guid, update);
const buf = Buffer.from(update, 'base64');
await this.docManager.push(docId.workspace, docId.guid, buf);
endTimer();
}
@Auth()
@SubscribeMessage('doc-load')
async loadDoc(
@ConnectedSocket() client: Socket,
@CurrentUser() user: UserType,
@MessageBody()
message: {
{
workspaceId,
guid,
stateVector,
}: {
workspaceId: string;
guid: string;
stateVector?: string;
targetClientId?: number;
},
@ConnectedSocket() client: Socket
}
): Promise<{ missing: string; state?: string } | false> {
this.metric.socketIOEventCounter(1, { event: 'doc-load' });
const endTimer = this.metric.socketIOEventTimer({ event: 'doc-load' });
if (!client.rooms.has(message.workspaceId)) {
const canRead = await this.permissions.tryCheck(
message.workspaceId,
user.id
);
if (!client.rooms.has(workspaceId)) {
const canRead = await this.permissions.tryCheck(workspaceId, user.id);
if (!canRead) {
endTimer();
return false;
}
}
const guid = trimGuid(message.workspaceId, message.guid);
const doc = await this.docManager.get(message.workspaceId, guid);
const docId = new DocID(guid, workspaceId);
const doc = await this.docManager.get(docId.workspace, docId.guid);
if (!doc) {
endTimer();
@ -148,9 +155,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
const missing = Buffer.from(
encodeStateAsUpdate(
doc,
message.stateVector
? Buffer.from(message.stateVector, 'base64')
: undefined
stateVector ? Buffer.from(stateVector, 'base64') : undefined
)
).toString('base64');
const state = Buffer.from(encodeStateVector(doc)).toString('base64');

View File

@ -13,7 +13,7 @@ import type { Response } from 'express';
import format from 'pretty-time';
import { StorageProvide } from '../../storage';
import { trimGuid } from '../../utils/doc';
import { DocID } from '../../utils/doc';
import { Auth, CurrentUser, Publicable } from '../auth';
import { DocManager } from '../doc';
import { UserType } from '../users';
@ -64,15 +64,19 @@ export class WorkspacesController {
@Res() res: Response
) {
const start = process.hrtime();
const id = trimGuid(ws, guid);
const docId = new DocID(guid, ws);
if (
// if a user has the permission
!(await this.permission.isAccessible(ws, id, user?.id))
!(await this.permission.isAccessible(
docId.workspace,
docId.guid,
user?.id
))
) {
throw new ForbiddenException('Permission denied');
}
const update = await this.docManager.getBinary(ws, id);
const update = await this.docManager.getBinary(docId.workspace, docId.guid);
if (!update) {
throw new NotFoundException('Doc not found');

View File

@ -74,11 +74,7 @@ export class PermissionService {
return true;
} else {
// check if this is a public subpage
// why use `endsWith`?
// because there might have `${wsId}:space:${subpageId}`,
// but subpages only have `${subpageId}`
return subpages.some(subpage => id.endsWith(subpage));
return subpages.some(subpage => id === subpage);
}
}

View File

@ -33,6 +33,7 @@ import { PrismaService } from '../../prisma';
import { StorageProvide } from '../../storage';
import { CloudThrottlerGuard, Throttle } from '../../throttler';
import type { FileUpload } from '../../types';
import { DocID } from '../../utils/doc';
import { Auth, CurrentUser, Public } from '../auth';
import { MailService } from '../auth/mailer';
import { AuthService } from '../auth/service';
@ -662,16 +663,24 @@ export class WorkspaceResolver {
@Args('workspaceId') workspaceId: string,
@Args('pageId') pageId: string
) {
const docId = new DocID(pageId, workspaceId);
if (docId.isWorkspace) {
throw new ForbiddenException('Expect page not to be workspace');
}
const userWorkspace = await this.prisma.userWorkspacePermission.findFirst({
where: {
userId: user.id,
workspaceId,
workspaceId: docId.workspace,
},
});
if (!userWorkspace?.accepted) {
throw new ForbiddenException('Permission denied');
}
return this.permissions.grantPage(workspaceId, pageId);
return this.permissions.grantPage(docId.workspace, docId.guid);
}
@Mutation(() => Boolean)
@ -680,9 +689,15 @@ export class WorkspaceResolver {
@Args('workspaceId') workspaceId: string,
@Args('pageId') pageId: string
) {
await this.permissions.check(workspaceId, user.id, Permission.Admin);
const docId = new DocID(pageId, workspaceId);
return this.permissions.revokePage(workspaceId, pageId);
if (docId.isWorkspace) {
throw new ForbiddenException('Expect page not to be workspace');
}
await this.permissions.check(docId.workspace, user.id, Permission.Admin);
return this.permissions.revokePage(docId.workspace, docId.guid);
}
@Query(() => [String], {

View File

@ -0,0 +1,72 @@
import test from 'ava';
import { DocID, DocVariant } from '../doc';
test('can parse', t => {
// workspace only
let id = new DocID('ws');
t.is(id.workspace, 'ws');
t.assert(id.isWorkspace);
// full id
id = new DocID('ws:space:sub');
t.is(id.workspace, 'ws');
t.is(id.variant, DocVariant.Space);
t.is(id.guid, 'sub');
// variant only
id = new DocID('space:sub', 'ws');
t.is(id.workspace, 'ws');
t.is(id.variant, DocVariant.Space);
t.is(id.guid, 'sub');
// sub id only
id = new DocID('sub', 'ws');
t.is(id.workspace, 'ws');
t.is(id.variant, DocVariant.Unknown);
t.is(id.guid, 'sub');
});
test('fail', t => {
t.throws(() => new DocID('a:b:c:d'), {
message: 'Invalid format of Doc ID: a:b:c:d',
});
t.throws(() => new DocID(':space:sub'), { message: 'Workspace is required' });
t.throws(() => new DocID('space:sub'), { message: 'Workspace is required' });
t.throws(() => new DocID('ws:any:sub'), {
message: 'Invalid ID variant: any',
});
t.throws(() => new DocID('ws:space:'), {
message: 'ID is required for non-workspace doc',
});
t.throws(() => new DocID('ws::space'), {
message: 'Variant is required for non-workspace doc',
});
});
test('fix', t => {
let id = new DocID('ws');
// can't fix because the doc variant is [Workspace]
id.fixWorkspace('ws2');
t.is(id.workspace, 'ws');
t.is(id.toString(), 'ws');
id = new DocID('ws:space:sub');
id.fixWorkspace('ws2');
t.is(id.workspace, 'ws2');
t.is(id.toString(), 'ws2:space:sub');
id = new DocID('space:sub', 'ws');
t.is(id.workspace, 'ws');
t.is(id.toString(), 'ws:space:sub');
id = new DocID('ws2:space:sub', 'ws');
t.is(id.workspace, 'ws');
t.is(id.toString(), 'ws:space:sub');
});

View File

@ -1,7 +1,115 @@
export function trimGuid(ws: string, guid: string) {
if (guid.startsWith(`${ws}:space:`)) {
return guid.substring(ws.length + 1);
import { registerEnumType } from '@nestjs/graphql';
export enum DocVariant {
Workspace = 'workspace',
Space = 'space',
Settings = 'settings',
Unknown = 'unknown',
}
registerEnumType(DocVariant, {
name: 'DocVariant',
});
export class DocID {
raw: string;
workspace: string;
variant: DocVariant;
private sub: string | null;
static parse(raw: string): DocID | null {
try {
return new DocID(raw);
} catch (e) {
return null;
}
}
return guid;
/**
* pure guid for workspace and subdoc without any prefix
*/
get guid(): string {
return this.variant === DocVariant.Workspace
? this.workspace
: // sub is always truthy when variant is not workspace
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.sub!;
}
get full(): string {
return this.variant === DocVariant.Workspace
? this.workspace
: `${this.workspace}:${this.variant}:${this.sub}`;
}
get isWorkspace(): boolean {
return this.variant === DocVariant.Workspace;
}
constructor(raw: string, workspaceId?: string) {
if (!raw.length) {
throw new Error('Invalid Empty Doc ID');
}
let parts = raw.split(':');
if (parts.length > 3) {
throw new Error(`Invalid format of Doc ID: ${raw}`);
} else if (parts.length === 2) {
// `${variant}:${guid}`
if (!workspaceId) {
throw new Error('Workspace is required');
}
parts.unshift(workspaceId);
} else if (parts.length === 1) {
// ${ws} or ${pageId}
if (workspaceId && parts[0] !== workspaceId) {
parts = [workspaceId, DocVariant.Unknown, parts[0]];
} else {
// parts:[ws] equals [workspaceId]
}
}
let workspace = parts.at(0);
// fix for `${non-workspaceId}:${variant}:${guid}`
if (workspaceId) {
workspace = workspaceId;
}
const variant = parts.at(1);
const docId = parts.at(2);
if (!workspace) {
throw new Error('Workspace is required');
}
if (variant) {
if (!Object.values(DocVariant).includes(variant as any)) {
throw new Error(`Invalid ID variant: ${variant}`);
}
if (!docId) {
throw new Error('ID is required for non-workspace doc');
}
} else if (docId) {
throw new Error('Variant is required for non-workspace doc');
}
this.raw = raw;
this.workspace = workspace;
this.variant = (variant as DocVariant | undefined) ?? DocVariant.Workspace;
this.sub = docId || null;
}
toString() {
return this.full;
}
fixWorkspace(workspaceId: string) {
if (!this.isWorkspace && this.workspace !== workspaceId) {
this.workspace = workspaceId;
}
}
}

View File

@ -183,7 +183,6 @@ test('should have sequential update number', async t => {
await Promise.all(updates.map(update => manager.push('2', '2', update)));
// [1,2,3]
// @ts-expect-error private method
let records = await manager.getUpdates('2', '2');
t.deepEqual(
@ -208,21 +207,18 @@ test('should have sequential update number', async t => {
await Promise.all(updates.map(update => manager.push('2', '2', update)));
// @ts-expect-error private method
records = await manager.getUpdates('2', '2');
// push a new update with new seq num
await manager.push('2', '2', updates[0]);
// let the manager ignore update with the new seq num
// @ts-expect-error private method
const stub = Sinon.stub(manager, 'getUpdates').resolves(records);
// @ts-expect-error private method
await manager.autoSquash();
stub.restore();
// @ts-expect-error private method
records = await manager.getUpdates('2', '2');
// should not merge in one run

View File

@ -415,28 +415,6 @@ mutation sendVerifyChangeEmail($token: String!, $email: String!, $callbackUrl: S
}`,
};
export const setRevokePageMutation = {
id: 'setRevokePageMutation' as const,
operationName: 'setRevokePage',
definitionName: 'revokePage',
containsFile: false,
query: `
mutation setRevokePage($workspaceId: String!, $pageId: String!) {
revokePage(workspaceId: $workspaceId, pageId: $pageId)
}`,
};
export const setSharePageMutation = {
id: 'setSharePageMutation' as const,
operationName: 'setSharePage',
definitionName: 'sharePage',
containsFile: false,
query: `
mutation setSharePage($workspaceId: String!, $pageId: String!) {
sharePage(workspaceId: $workspaceId, pageId: $pageId)
}`,
};
export const setWorkspacePublicByIdMutation = {
id: 'setWorkspacePublicByIdMutation' as const,
operationName: 'setWorkspacePublicById',

View File

@ -1,3 +0,0 @@
mutation setRevokePage($workspaceId: String!, $pageId: String!) {
revokePage(workspaceId: $workspaceId, pageId: $pageId)
}

View File

@ -1,3 +0,0 @@
mutation setSharePage($workspaceId: String!, $pageId: String!) {
sharePage(workspaceId: $workspaceId, pageId: $pageId)
}

View File

@ -376,26 +376,6 @@ export type SendVerifyChangeEmailMutation = {
sendVerifyChangeEmail: boolean;
};
export type SetRevokePageMutationVariables = Exact<{
workspaceId: Scalars['String']['input'];
pageId: Scalars['String']['input'];
}>;
export type SetRevokePageMutation = {
__typename?: 'Mutation';
revokePage: boolean;
};
export type SetSharePageMutationVariables = Exact<{
workspaceId: Scalars['String']['input'];
pageId: Scalars['String']['input'];
}>;
export type SetSharePageMutation = {
__typename?: 'Mutation';
sharePage: boolean;
};
export type SetWorkspacePublicByIdMutationVariables = Exact<{
id: Scalars['ID']['input'];
public: Scalars['Boolean']['input'];
@ -637,16 +617,6 @@ export type Mutations =
variables: SendVerifyChangeEmailMutationVariables;
response: SendVerifyChangeEmailMutation;
}
| {
name: 'setRevokePageMutation';
variables: SetRevokePageMutationVariables;
response: SetRevokePageMutation;
}
| {
name: 'setSharePageMutation';
variables: SetSharePageMutationVariables;
response: SetSharePageMutation;
}
| {
name: 'setWorkspacePublicByIdMutation';
variables: SetWorkspacePublicByIdMutationVariables;