mirror of
https://github.com/toeverything/AFFiNE.git
synced 2024-12-12 17:43:02 +03:00
fix(server): wrap read-modify-write apis with distributed lock (#5979)
This commit is contained in:
parent
fd4e4123f5
commit
34f892b05b
@ -28,6 +28,7 @@ import { GqlModule } from './fundamentals/graphql';
|
|||||||
import { HelpersModule } from './fundamentals/helpers';
|
import { HelpersModule } from './fundamentals/helpers';
|
||||||
import { MailModule } from './fundamentals/mailer';
|
import { MailModule } from './fundamentals/mailer';
|
||||||
import { MetricsModule } from './fundamentals/metrics';
|
import { MetricsModule } from './fundamentals/metrics';
|
||||||
|
import { MutexModule } from './fundamentals/mutex';
|
||||||
import { PrismaModule } from './fundamentals/prisma';
|
import { PrismaModule } from './fundamentals/prisma';
|
||||||
import { StorageProviderModule } from './fundamentals/storage';
|
import { StorageProviderModule } from './fundamentals/storage';
|
||||||
import { RateLimiterModule } from './fundamentals/throttler';
|
import { RateLimiterModule } from './fundamentals/throttler';
|
||||||
@ -39,6 +40,7 @@ export const FunctionalityModules = [
|
|||||||
ScheduleModule.forRoot(),
|
ScheduleModule.forRoot(),
|
||||||
EventModule,
|
EventModule,
|
||||||
CacheModule,
|
CacheModule,
|
||||||
|
MutexModule,
|
||||||
PrismaModule,
|
PrismaModule,
|
||||||
MetricsModule,
|
MetricsModule,
|
||||||
RateLimiterModule,
|
RateLimiterModule,
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
import { PrismaClient } from '@prisma/client';
|
import { PrismaTransaction } from '../../fundamentals';
|
||||||
|
|
||||||
import { Feature, FeatureSchema, FeatureType } from './types';
|
import { Feature, FeatureSchema, FeatureType } from './types';
|
||||||
|
|
||||||
class FeatureConfig {
|
class FeatureConfig {
|
||||||
@ -67,7 +66,7 @@ export type FeatureConfigType<F extends FeatureType> = InstanceType<
|
|||||||
|
|
||||||
const FeatureCache = new Map<number, FeatureConfigType<FeatureType>>();
|
const FeatureCache = new Map<number, FeatureConfigType<FeatureType>>();
|
||||||
|
|
||||||
export async function getFeature(prisma: PrismaClient, featureId: number) {
|
export async function getFeature(prisma: PrismaTransaction, featureId: number) {
|
||||||
const cachedQuota = FeatureCache.get(featureId);
|
const cachedQuota = FeatureCache.get(featureId);
|
||||||
|
|
||||||
if (cachedQuota) {
|
if (cachedQuota) {
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
import { PrismaClient } from '@prisma/client';
|
import { PrismaTransaction } from '../../fundamentals';
|
||||||
|
|
||||||
import { formatDate, formatSize, Quota, QuotaSchema } from './types';
|
import { formatDate, formatSize, Quota, QuotaSchema } from './types';
|
||||||
|
|
||||||
const QuotaCache = new Map<number, QuotaConfig>();
|
const QuotaCache = new Map<number, QuotaConfig>();
|
||||||
@ -7,14 +6,14 @@ const QuotaCache = new Map<number, QuotaConfig>();
|
|||||||
export class QuotaConfig {
|
export class QuotaConfig {
|
||||||
readonly config: Quota;
|
readonly config: Quota;
|
||||||
|
|
||||||
static async get(prisma: PrismaClient, featureId: number) {
|
static async get(tx: PrismaTransaction, featureId: number) {
|
||||||
const cachedQuota = QuotaCache.get(featureId);
|
const cachedQuota = QuotaCache.get(featureId);
|
||||||
|
|
||||||
if (cachedQuota) {
|
if (cachedQuota) {
|
||||||
return cachedQuota;
|
return cachedQuota;
|
||||||
}
|
}
|
||||||
|
|
||||||
const quota = await prisma.features.findFirst({
|
const quota = await tx.features.findFirst({
|
||||||
where: {
|
where: {
|
||||||
id: featureId,
|
id: featureId,
|
||||||
},
|
},
|
||||||
|
@ -1,13 +1,15 @@
|
|||||||
import { Injectable } from '@nestjs/common';
|
import { Injectable } from '@nestjs/common';
|
||||||
import { PrismaClient } from '@prisma/client';
|
import { PrismaClient } from '@prisma/client';
|
||||||
|
|
||||||
import { type EventPayload, OnEvent } from '../../fundamentals';
|
import {
|
||||||
|
type EventPayload,
|
||||||
|
OnEvent,
|
||||||
|
PrismaTransaction,
|
||||||
|
} from '../../fundamentals';
|
||||||
import { FeatureKind } from '../features';
|
import { FeatureKind } from '../features';
|
||||||
import { QuotaConfig } from './quota';
|
import { QuotaConfig } from './quota';
|
||||||
import { QuotaType } from './types';
|
import { QuotaType } from './types';
|
||||||
|
|
||||||
type Transaction = Parameters<Parameters<PrismaClient['$transaction']>[0]>[0];
|
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class QuotaService {
|
export class QuotaService {
|
||||||
constructor(private readonly prisma: PrismaClient) {}
|
constructor(private readonly prisma: PrismaClient) {}
|
||||||
@ -140,8 +142,8 @@ export class QuotaService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async hasQuota(userId: string, quota: QuotaType, transaction?: Transaction) {
|
async hasQuota(userId: string, quota: QuotaType, tx?: PrismaTransaction) {
|
||||||
const executor = transaction ?? this.prisma;
|
const executor = tx ?? this.prisma;
|
||||||
|
|
||||||
return executor.userFeatures
|
return executor.userFeatures
|
||||||
.count({
|
.count({
|
||||||
|
@ -54,7 +54,7 @@ export class UserService {
|
|||||||
|
|
||||||
return this.createUser({
|
return this.createUser({
|
||||||
email,
|
email,
|
||||||
name: 'Unnamed',
|
name: email.split('@')[0],
|
||||||
...data,
|
...data,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,9 @@ import {
|
|||||||
EventEmitter,
|
EventEmitter,
|
||||||
type FileUpload,
|
type FileUpload,
|
||||||
MailService,
|
MailService,
|
||||||
|
MutexService,
|
||||||
Throttle,
|
Throttle,
|
||||||
|
TooManyRequestsException,
|
||||||
} from '../../../fundamentals';
|
} from '../../../fundamentals';
|
||||||
import { CurrentUser, Public } from '../../auth';
|
import { CurrentUser, Public } from '../../auth';
|
||||||
import { QuotaManagementService, QuotaQueryType } from '../../quota';
|
import { QuotaManagementService, QuotaQueryType } from '../../quota';
|
||||||
@ -58,7 +60,8 @@ export class WorkspaceResolver {
|
|||||||
private readonly quota: QuotaManagementService,
|
private readonly quota: QuotaManagementService,
|
||||||
private readonly users: UserService,
|
private readonly users: UserService,
|
||||||
private readonly event: EventEmitter,
|
private readonly event: EventEmitter,
|
||||||
private readonly blobStorage: WorkspaceBlobStorage
|
private readonly blobStorage: WorkspaceBlobStorage,
|
||||||
|
private readonly mutex: MutexService
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
@ResolveField(() => Permission, {
|
@ResolveField(() => Permission, {
|
||||||
@ -336,6 +339,14 @@ export class WorkspaceResolver {
|
|||||||
throw new ForbiddenException('Cannot change owner');
|
throw new ForbiddenException('Cannot change owner');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// lock to prevent concurrent invite
|
||||||
|
const lockFlag = `invite:${workspaceId}`;
|
||||||
|
await using lock = await this.mutex.lock(lockFlag);
|
||||||
|
if (!lock) {
|
||||||
|
return new TooManyRequestsException('Server is busy');
|
||||||
|
}
|
||||||
|
|
||||||
// member limit check
|
// member limit check
|
||||||
const [memberCount, quota] = await Promise.all([
|
const [memberCount, quota] = await Promise.all([
|
||||||
this.prisma.workspaceUserPermission.count({
|
this.prisma.workspaceUserPermission.count({
|
||||||
@ -344,12 +355,13 @@ export class WorkspaceResolver {
|
|||||||
this.quota.getWorkspaceUsage(workspaceId),
|
this.quota.getWorkspaceUsage(workspaceId),
|
||||||
]);
|
]);
|
||||||
if (memberCount >= quota.memberLimit) {
|
if (memberCount >= quota.memberLimit) {
|
||||||
throw new PayloadTooLargeException('Workspace member limit reached.');
|
return new PayloadTooLargeException('Workspace member limit reached.');
|
||||||
}
|
}
|
||||||
|
|
||||||
let target = await this.users.findUserByEmail(email);
|
let target = await this.users.findUserByEmail(email);
|
||||||
if (target) {
|
if (target) {
|
||||||
const originRecord = await this.prisma.workspaceUserPermission.findFirst({
|
const originRecord =
|
||||||
|
await this.prisma.workspaceUserPermission.findFirst({
|
||||||
where: {
|
where: {
|
||||||
workspaceId,
|
workspaceId,
|
||||||
userId: target.id,
|
userId: target.id,
|
||||||
@ -404,6 +416,10 @@ export class WorkspaceResolver {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return inviteId;
|
return inviteId;
|
||||||
|
} catch (e) {
|
||||||
|
this.logger.error('failed to invite user', e);
|
||||||
|
return new TooManyRequestsException('Server is busy');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Throttle({
|
@Throttle({
|
||||||
|
@ -1 +1,2 @@
|
|||||||
export * from './payment-required';
|
export * from './payment-required';
|
||||||
|
export * from './too-many-requests';
|
||||||
|
@ -0,0 +1,14 @@
|
|||||||
|
import { HttpException, HttpStatus } from '@nestjs/common';
|
||||||
|
|
||||||
|
export class TooManyRequestsException extends HttpException {
|
||||||
|
constructor(desc?: string, code: string = 'Too Many Requests') {
|
||||||
|
super(
|
||||||
|
HttpException.createBody(
|
||||||
|
desc ?? code,
|
||||||
|
code,
|
||||||
|
HttpStatus.TOO_MANY_REQUESTS
|
||||||
|
),
|
||||||
|
HttpStatus.TOO_MANY_REQUESTS
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
@ -11,6 +11,12 @@ import { GraphQLError } from 'graphql';
|
|||||||
import { Config } from '../config';
|
import { Config } from '../config';
|
||||||
import { GQLLoggerPlugin } from './logger-plugin';
|
import { GQLLoggerPlugin } from './logger-plugin';
|
||||||
|
|
||||||
|
export type GraphqlContext = {
|
||||||
|
req: Request;
|
||||||
|
res: Response;
|
||||||
|
isAdminQuery: boolean;
|
||||||
|
};
|
||||||
|
|
||||||
@Global()
|
@Global()
|
||||||
@Module({
|
@Module({
|
||||||
imports: [
|
imports: [
|
||||||
@ -30,7 +36,13 @@ import { GQLLoggerPlugin } from './logger-plugin';
|
|||||||
: '../../../schema.gql'
|
: '../../../schema.gql'
|
||||||
),
|
),
|
||||||
sortSchema: true,
|
sortSchema: true,
|
||||||
context: ({ req, res }: { req: Request; res: Response }) => ({
|
context: ({
|
||||||
|
req,
|
||||||
|
res,
|
||||||
|
}: {
|
||||||
|
req: Request;
|
||||||
|
res: Response;
|
||||||
|
}): GraphqlContext => ({
|
||||||
req,
|
req,
|
||||||
res,
|
res,
|
||||||
isAdminQuery: false,
|
isAdminQuery: false,
|
||||||
|
@ -14,14 +14,23 @@ export {
|
|||||||
} from './config';
|
} from './config';
|
||||||
export * from './error';
|
export * from './error';
|
||||||
export { EventEmitter, type EventPayload, OnEvent } from './event';
|
export { EventEmitter, type EventPayload, OnEvent } from './event';
|
||||||
|
export type { GraphqlContext } from './graphql';
|
||||||
export { CryptoHelper, URLHelper } from './helpers';
|
export { CryptoHelper, URLHelper } from './helpers';
|
||||||
export { MailService } from './mailer';
|
export { MailService } from './mailer';
|
||||||
export { CallCounter, CallTimer, metrics } from './metrics';
|
export { CallCounter, CallTimer, metrics } from './metrics';
|
||||||
|
export {
|
||||||
|
BucketService,
|
||||||
|
LockGuard,
|
||||||
|
MUTEX_RETRY,
|
||||||
|
MUTEX_WAIT,
|
||||||
|
MutexService,
|
||||||
|
} from './mutex';
|
||||||
export {
|
export {
|
||||||
getOptionalModuleMetadata,
|
getOptionalModuleMetadata,
|
||||||
GlobalExceptionFilter,
|
GlobalExceptionFilter,
|
||||||
OptionalModule,
|
OptionalModule,
|
||||||
} from './nestjs';
|
} from './nestjs';
|
||||||
|
export type { PrismaTransaction } from './prisma';
|
||||||
export * from './storage';
|
export * from './storage';
|
||||||
export { type StorageProvider, StorageProviderFactory } from './storage';
|
export { type StorageProvider, StorageProviderFactory } from './storage';
|
||||||
export { AuthThrottlerGuard, CloudThrottlerGuard, Throttle } from './throttler';
|
export { AuthThrottlerGuard, CloudThrottlerGuard, Throttle } from './throttler';
|
||||||
|
15
packages/backend/server/src/fundamentals/mutex/bucket.ts
Normal file
15
packages/backend/server/src/fundamentals/mutex/bucket.ts
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
export class BucketService {
|
||||||
|
private readonly bucket = new Map<string, string>();
|
||||||
|
|
||||||
|
get(key: string) {
|
||||||
|
return this.bucket.get(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
set(key: string, value: string) {
|
||||||
|
this.bucket.set(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(key: string) {
|
||||||
|
this.bucket.delete(key);
|
||||||
|
}
|
||||||
|
}
|
14
packages/backend/server/src/fundamentals/mutex/index.ts
Normal file
14
packages/backend/server/src/fundamentals/mutex/index.ts
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
import { Global, Module } from '@nestjs/common';
|
||||||
|
|
||||||
|
import { BucketService } from './bucket';
|
||||||
|
import { MutexService } from './mutex';
|
||||||
|
|
||||||
|
@Global()
|
||||||
|
@Module({
|
||||||
|
providers: [BucketService, MutexService],
|
||||||
|
exports: [BucketService, MutexService],
|
||||||
|
})
|
||||||
|
export class MutexModule {}
|
||||||
|
|
||||||
|
export { BucketService, MutexService };
|
||||||
|
export { LockGuard, MUTEX_RETRY, MUTEX_WAIT } from './mutex';
|
96
packages/backend/server/src/fundamentals/mutex/mutex.ts
Normal file
96
packages/backend/server/src/fundamentals/mutex/mutex.ts
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
import { randomUUID } from 'node:crypto';
|
||||||
|
import { setTimeout } from 'node:timers/promises';
|
||||||
|
|
||||||
|
import { Inject, Injectable, Logger, Scope } from '@nestjs/common';
|
||||||
|
import { CONTEXT } from '@nestjs/graphql';
|
||||||
|
|
||||||
|
import type { GraphqlContext } from '../graphql';
|
||||||
|
import { BucketService } from './bucket';
|
||||||
|
|
||||||
|
export class LockGuard<M extends MutexService = MutexService>
|
||||||
|
implements AsyncDisposable
|
||||||
|
{
|
||||||
|
constructor(
|
||||||
|
private readonly mutex: M,
|
||||||
|
private readonly key: string
|
||||||
|
) {}
|
||||||
|
|
||||||
|
async [Symbol.asyncDispose]() {
|
||||||
|
return this.mutex.unlock(this.key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const MUTEX_RETRY = 5;
|
||||||
|
export const MUTEX_WAIT = 100;
|
||||||
|
|
||||||
|
@Injectable({ scope: Scope.REQUEST })
|
||||||
|
export class MutexService {
|
||||||
|
protected logger = new Logger(MutexService.name);
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
@Inject(CONTEXT) private readonly context: GraphqlContext,
|
||||||
|
private readonly bucket: BucketService
|
||||||
|
) {}
|
||||||
|
|
||||||
|
protected getId() {
|
||||||
|
let id = this.context.req.headers['x-transaction-id'] as string;
|
||||||
|
|
||||||
|
if (!id) {
|
||||||
|
id = randomUUID();
|
||||||
|
this.context.req.headers['x-transaction-id'] = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* lock an resource and return a lock guard, which will release the lock when disposed
|
||||||
|
*
|
||||||
|
* if the lock is not available, it will retry for [MUTEX_RETRY] times
|
||||||
|
*
|
||||||
|
* usage:
|
||||||
|
* ```typescript
|
||||||
|
* {
|
||||||
|
* // lock is acquired here
|
||||||
|
* await using lock = await mutex.lock('resource-key');
|
||||||
|
* if (lock) {
|
||||||
|
* // do something
|
||||||
|
* } else {
|
||||||
|
* // failed to lock
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* // lock is released here
|
||||||
|
* ```
|
||||||
|
* @param key resource key
|
||||||
|
* @returns LockGuard
|
||||||
|
*/
|
||||||
|
async lock(key: string): Promise<LockGuard | undefined> {
|
||||||
|
const id = this.getId();
|
||||||
|
const fetchLock = async (retry: number): Promise<LockGuard | undefined> => {
|
||||||
|
if (retry === 0) {
|
||||||
|
this.logger.error(
|
||||||
|
`Failed to fetch lock ${key} after ${MUTEX_RETRY} retry`
|
||||||
|
);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
const current = this.bucket.get(key);
|
||||||
|
if (current && current !== id) {
|
||||||
|
this.logger.warn(
|
||||||
|
`Failed to fetch lock ${key}, retrying in ${MUTEX_WAIT} ms`
|
||||||
|
);
|
||||||
|
await setTimeout(MUTEX_WAIT * (MUTEX_RETRY - retry + 1));
|
||||||
|
return fetchLock(retry - 1);
|
||||||
|
}
|
||||||
|
this.bucket.set(key, id);
|
||||||
|
return new LockGuard(this, key);
|
||||||
|
};
|
||||||
|
|
||||||
|
return fetchLock(MUTEX_RETRY);
|
||||||
|
}
|
||||||
|
|
||||||
|
async unlock(key: string): Promise<void> {
|
||||||
|
if (this.bucket.get(key) === this.getId()) {
|
||||||
|
this.bucket.delete(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -16,3 +16,7 @@ const clientProvider: Provider = {
|
|||||||
})
|
})
|
||||||
export class PrismaModule {}
|
export class PrismaModule {}
|
||||||
export { PrismaService } from './service';
|
export { PrismaService } from './service';
|
||||||
|
|
||||||
|
export type PrismaTransaction = Parameters<
|
||||||
|
Parameters<PrismaClient['$transaction']>[0]
|
||||||
|
>[0];
|
||||||
|
@ -1,18 +1,27 @@
|
|||||||
import { Global, Provider, Type } from '@nestjs/common';
|
import { Global, Provider, Type } from '@nestjs/common';
|
||||||
|
import { CONTEXT } from '@nestjs/graphql';
|
||||||
import { Redis, type RedisOptions } from 'ioredis';
|
import { Redis, type RedisOptions } from 'ioredis';
|
||||||
import { ThrottlerStorageRedisService } from 'nestjs-throttler-storage-redis';
|
import { ThrottlerStorageRedisService } from 'nestjs-throttler-storage-redis';
|
||||||
|
|
||||||
import { Cache, SessionCache } from '../../fundamentals';
|
import {
|
||||||
|
BucketService,
|
||||||
|
Cache,
|
||||||
|
type GraphqlContext,
|
||||||
|
MutexService,
|
||||||
|
SessionCache,
|
||||||
|
} from '../../fundamentals';
|
||||||
import { ThrottlerStorage } from '../../fundamentals/throttler';
|
import { ThrottlerStorage } from '../../fundamentals/throttler';
|
||||||
import { SocketIoAdapterImpl } from '../../fundamentals/websocket';
|
import { SocketIoAdapterImpl } from '../../fundamentals/websocket';
|
||||||
import { Plugin } from '../registry';
|
import { Plugin } from '../registry';
|
||||||
import { RedisCache } from './cache';
|
import { RedisCache } from './cache';
|
||||||
import {
|
import {
|
||||||
CacheRedis,
|
CacheRedis,
|
||||||
|
MutexRedis,
|
||||||
SessionRedis,
|
SessionRedis,
|
||||||
SocketIoRedis,
|
SocketIoRedis,
|
||||||
ThrottlerRedis,
|
ThrottlerRedis,
|
||||||
} from './instances';
|
} from './instances';
|
||||||
|
import { MutexRedisService } from './mutex';
|
||||||
import { createSockerIoAdapterImpl } from './ws-adapter';
|
import { createSockerIoAdapterImpl } from './ws-adapter';
|
||||||
|
|
||||||
function makeProvider(token: Type, impl: Type<Redis>): Provider {
|
function makeProvider(token: Type, impl: Type<Redis>): Provider {
|
||||||
@ -47,15 +56,31 @@ const socketIoRedisAdapterProvider: Provider = {
|
|||||||
inject: [SocketIoRedis],
|
inject: [SocketIoRedis],
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// mutex
|
||||||
|
const mutexRedisAdapterProvider: Provider = {
|
||||||
|
provide: MutexService,
|
||||||
|
useFactory: (redis: Redis, ctx: GraphqlContext, bucket: BucketService) => {
|
||||||
|
return new MutexRedisService(redis, ctx, bucket);
|
||||||
|
},
|
||||||
|
inject: [MutexRedis, CONTEXT, BucketService],
|
||||||
|
};
|
||||||
|
|
||||||
@Global()
|
@Global()
|
||||||
@Plugin({
|
@Plugin({
|
||||||
name: 'redis',
|
name: 'redis',
|
||||||
providers: [CacheRedis, SessionRedis, ThrottlerRedis, SocketIoRedis],
|
providers: [
|
||||||
|
CacheRedis,
|
||||||
|
SessionRedis,
|
||||||
|
ThrottlerRedis,
|
||||||
|
SocketIoRedis,
|
||||||
|
MutexRedis,
|
||||||
|
],
|
||||||
overrides: [
|
overrides: [
|
||||||
cacheProvider,
|
cacheProvider,
|
||||||
sessionCacheProvider,
|
sessionCacheProvider,
|
||||||
socketIoRedisAdapterProvider,
|
socketIoRedisAdapterProvider,
|
||||||
throttlerStorageProvider,
|
throttlerStorageProvider,
|
||||||
|
mutexRedisAdapterProvider,
|
||||||
],
|
],
|
||||||
requires: ['plugins.redis.host'],
|
requires: ['plugins.redis.host'],
|
||||||
})
|
})
|
||||||
|
@ -54,3 +54,10 @@ export class SocketIoRedis extends Redis {
|
|||||||
super({ ...config.plugins.redis, db: (config.plugins.redis?.db ?? 0) + 3 });
|
super({ ...config.plugins.redis, db: (config.plugins.redis?.db ?? 0) + 3 });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class MutexRedis extends Redis {
|
||||||
|
constructor(config: Config) {
|
||||||
|
super({ ...config.plugins.redis, db: (config.plugins.redis?.db ?? 0) + 4 });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
96
packages/backend/server/src/plugins/redis/mutex.ts
Normal file
96
packages/backend/server/src/plugins/redis/mutex.ts
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
import { setTimeout } from 'node:timers/promises';
|
||||||
|
|
||||||
|
import { Injectable, Logger } from '@nestjs/common';
|
||||||
|
import Redis, { Command } from 'ioredis';
|
||||||
|
|
||||||
|
import {
|
||||||
|
BucketService,
|
||||||
|
type GraphqlContext,
|
||||||
|
LockGuard,
|
||||||
|
MUTEX_RETRY,
|
||||||
|
MUTEX_WAIT,
|
||||||
|
MutexService,
|
||||||
|
} from '../../fundamentals';
|
||||||
|
|
||||||
|
const lockScript = `local key = KEYS[1]
|
||||||
|
local clientId = ARGV[1]
|
||||||
|
local releaseTime = ARGV[2]
|
||||||
|
|
||||||
|
if redis.call("get", key) == clientId or redis.call("set", key, clientId, "NX", "PX", releaseTime) then
|
||||||
|
return 1
|
||||||
|
else
|
||||||
|
return 0
|
||||||
|
end`;
|
||||||
|
const unlockScript = `local key = KEYS[1]
|
||||||
|
local clientId = ARGV[1]
|
||||||
|
|
||||||
|
if redis.call("get", key) == clientId then
|
||||||
|
return redis.call("del", key)
|
||||||
|
else
|
||||||
|
return 0
|
||||||
|
end`;
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class MutexRedisService extends MutexService {
|
||||||
|
constructor(
|
||||||
|
private readonly redis: Redis,
|
||||||
|
context: GraphqlContext,
|
||||||
|
bucket: BucketService
|
||||||
|
) {
|
||||||
|
super(context, bucket);
|
||||||
|
this.logger = new Logger(MutexRedisService.name);
|
||||||
|
}
|
||||||
|
|
||||||
|
override async lock(
|
||||||
|
key: string,
|
||||||
|
releaseTimeInMS: number = 200
|
||||||
|
): Promise<LockGuard | undefined> {
|
||||||
|
const clientId = this.getId();
|
||||||
|
this.logger.debug(`Client ${clientId} lock try to lock ${key}`);
|
||||||
|
const releaseTime = releaseTimeInMS.toString();
|
||||||
|
|
||||||
|
const fetchLock = async (retry: number): Promise<LockGuard | undefined> => {
|
||||||
|
if (retry === 0) {
|
||||||
|
this.logger.error(
|
||||||
|
`Failed to fetch lock ${key} after ${MUTEX_RETRY} retry`
|
||||||
|
);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
const success = await this.redis.sendCommand(
|
||||||
|
new Command('EVAL', [lockScript, '1', key, clientId, releaseTime])
|
||||||
|
);
|
||||||
|
if (success === 1) {
|
||||||
|
return new LockGuard(this, key);
|
||||||
|
} else {
|
||||||
|
this.logger.warn(
|
||||||
|
`Failed to fetch lock ${key}, retrying in ${MUTEX_WAIT} ms`
|
||||||
|
);
|
||||||
|
await setTimeout(MUTEX_WAIT * (MUTEX_RETRY - retry + 1));
|
||||||
|
return fetchLock(retry - 1);
|
||||||
|
}
|
||||||
|
} catch (error: any) {
|
||||||
|
this.logger.error(
|
||||||
|
`Unexpected error when fetch lock ${key}: ${error.message}`
|
||||||
|
);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
return fetchLock(MUTEX_RETRY);
|
||||||
|
}
|
||||||
|
|
||||||
|
override async unlock(key: string, ignoreUnlockFail = false): Promise<void> {
|
||||||
|
const clientId = this.getId();
|
||||||
|
const result = await this.redis.sendCommand(
|
||||||
|
new Command('EVAL', [unlockScript, '1', key, clientId])
|
||||||
|
);
|
||||||
|
if (result === 0) {
|
||||||
|
if (!ignoreUnlockFail) {
|
||||||
|
throw new Error(`Failed to release lock ${key}`);
|
||||||
|
} else {
|
||||||
|
this.logger.warn(`Failed to release lock ${key}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -2,7 +2,7 @@
|
|||||||
"extends": "../../../../tsconfig.json",
|
"extends": "../../../../tsconfig.json",
|
||||||
"compilerOptions": {
|
"compilerOptions": {
|
||||||
"composite": true,
|
"composite": true,
|
||||||
"target": "ESNext",
|
"target": "ES2022",
|
||||||
"emitDecoratorMetadata": true,
|
"emitDecoratorMetadata": true,
|
||||||
"experimentalDecorators": true,
|
"experimentalDecorators": true,
|
||||||
"rootDir": ".",
|
"rootDir": ".",
|
||||||
|
@ -104,7 +104,7 @@ test('should create user if not exist', async t => {
|
|||||||
|
|
||||||
const user = await auth.getUserByEmail('u2@affine.pro');
|
const user = await auth.getUserByEmail('u2@affine.pro');
|
||||||
t.not(user, undefined, 'failed to create user');
|
t.not(user, undefined, 'failed to create user');
|
||||||
t.is(user?.name, 'Unnamed', 'failed to create user');
|
t.is(user?.name, 'u2', 'failed to create user');
|
||||||
});
|
});
|
||||||
|
|
||||||
test('should invite a user by link', async t => {
|
test('should invite a user by link', async t => {
|
||||||
@ -255,3 +255,25 @@ test('should support pagination for member', async t => {
|
|||||||
);
|
);
|
||||||
t.is(secondPageWorkspace.members.length, 1, 'failed to check invite id');
|
t.is(secondPageWorkspace.members.length, 1, 'failed to check invite id');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test('should limit member count correctly', async t => {
|
||||||
|
const { app } = t.context;
|
||||||
|
const u1 = await signUp(app, 'u1', 'u1@affine.pro', '1');
|
||||||
|
for (let i = 0; i < 10; i++) {
|
||||||
|
const workspace = await createWorkspace(app, u1.token.token);
|
||||||
|
await Promise.allSettled(
|
||||||
|
Array.from({ length: 10 }).map(async (_, i) =>
|
||||||
|
inviteUser(
|
||||||
|
app,
|
||||||
|
u1.token.token,
|
||||||
|
workspace.id,
|
||||||
|
`u${i}@affine.pro`,
|
||||||
|
'Admin'
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
const ws = await getWorkspace(app, u1.token.token, workspace.id);
|
||||||
|
t.assert(ws.members.length <= 3, 'failed to check member list');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
"extends": "../../../tsconfig.json",
|
"extends": "../../../tsconfig.json",
|
||||||
"compilerOptions": {
|
"compilerOptions": {
|
||||||
"composite": true,
|
"composite": true,
|
||||||
"target": "ESNext",
|
"target": "ES2022",
|
||||||
"module": "ESNext",
|
"module": "ESNext",
|
||||||
"emitDecoratorMetadata": true,
|
"emitDecoratorMetadata": true,
|
||||||
"experimentalDecorators": true,
|
"experimentalDecorators": true,
|
||||||
|
Loading…
Reference in New Issue
Block a user