refactor(server): make redis required module (#9121)

This commit is contained in:
forehalo 2024-12-13 06:27:15 +00:00
parent 81c68032e1
commit 0e73737407
No known key found for this signature in database
GPG Key ID: 56709255DC7EC728
40 changed files with 292 additions and 728 deletions

View File

@ -11,6 +11,7 @@ services:
network_mode: service:db
environment:
DATABASE_URL: postgresql://affine:affine@db:5432/affine
REDIS_SERVER_HOST: redis
db:
image: postgres:latest
@ -21,6 +22,10 @@ services:
POSTGRES_PASSWORD: affine
POSTGRES_USER: affine
POSTGRES_DB: affine
redis:
image: redis
ports:
- 6379:6379
volumes:
postgres-data:

4
.docker/dev/.env.example Normal file
View File

@ -0,0 +1,4 @@
DATABASE_LOCATION=./postgres
DB_PASSWORD=affine
DB_USERNAME=affine
DB_DATABASE_NAME=affine

3
.docker/dev/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
postgres
.env
compose.yml

View File

@ -0,0 +1,28 @@
name: affine_dev_services
services:
postgres:
env_file:
- .env
image: postgres:16
ports:
- 5432:5432
environment:
POSTGRES_PASSWORD: ${DB_PASSWORD}
POSTGRES_USER: ${DB_USERNAME}
POSTGRES_DB: ${DB_DATABASE_NAME}
volumes:
- ${DATABASE_LOCATION}:/var/lib/postgresql/data
redis:
image: redis:latest
ports:
- 6379:6379
mailhog:
image: mailhog/mailhog:latest
ports:
- 1025:1025
- 8025:8025
networks:
dev:

View File

@ -304,6 +304,7 @@ jobs:
NODE_ENV: test
DISTRIBUTION: web
DATABASE_URL: postgresql://affine:affine@localhost:5432/affine
REDIS_SERVER_HOST: localhost
services:
postgres:
image: postgres
@ -316,6 +317,10 @@ jobs:
--health-retries 5
ports:
- 5432:5432
redis:
image: redis
ports:
- 6379:6379
mailer:
image: mailhog/mailhog
ports:
@ -379,6 +384,7 @@ jobs:
NODE_ENV: test
DISTRIBUTION: web
DATABASE_URL: postgresql://affine:affine@localhost:5432/affine
REDIS_SERVER_HOST: localhost
services:
postgres:
image: postgres
@ -391,6 +397,10 @@ jobs:
--health-retries 5
ports:
- 5432:5432
redis:
image: redis
ports:
- 6379:6379
mailer:
image: mailhog/mailhog
ports:
@ -461,6 +471,7 @@ jobs:
DISTRIBUTION: web
DATABASE_URL: postgresql://affine:affine@localhost:5432/affine
IN_CI_TEST: true
REDIS_SERVER_HOST: localhost
strategy:
fail-fast: false
matrix:
@ -480,6 +491,10 @@ jobs:
--health-retries 5
ports:
- 5432:5432
redis:
image: redis
ports:
- 6379:6379
steps:
- uses: actions/checkout@v4
@ -532,6 +547,7 @@ jobs:
env:
DISTRIBUTION: web
DATABASE_URL: postgresql://affine:affine@localhost:5432/affine
REDIS_SERVER_HOST: localhost
IN_CI_TEST: true
strategy:
fail-fast: false
@ -566,6 +582,10 @@ jobs:
--health-retries 5
ports:
- 5432:5432
redis:
image: redis
ports:
- 6379:6379
mailer:
image: mailhog/mailhog
ports:

View File

@ -42,6 +42,7 @@ jobs:
NODE_ENV: test
DISTRIBUTION: web
DATABASE_URL: postgresql://affine:affine@localhost:5432/affine
REDIS_SERVER_HOST: localhost
services:
postgres:
image: postgres
@ -54,6 +55,10 @@ jobs:
--health-retries 5
ports:
- 5432:5432
redis:
image: redis
ports:
- 6379:6379
mailer:
image: mailhog/mailhog
ports:
@ -100,6 +105,7 @@ jobs:
env:
DISTRIBUTION: web
DATABASE_URL: postgresql://affine:affine@localhost:5432/affine
REDIS_SERVER_HOST: localhost
IN_CI_TEST: true
strategy:
fail-fast: false
@ -120,6 +126,10 @@ jobs:
--health-retries 5
ports:
- 5432:5432
redis:
image: redis
ports:
- 6379:6379
steps:
- uses: actions/checkout@v4

View File

@ -5,81 +5,13 @@ This document explains how to start server (@affine/server) locally with Docker
> This document is not guaranteed to be up-to-date.
> If you find any outdated information, please feel free to open an issue or submit a PR.
## Run postgresql in docker
## Run required dev services in docker compose
```
docker pull postgres
docker run --rm --name affine-postgres -e POSTGRES_PASSWORD=affine -p 5432:5432 -v ~/Documents/postgres:/var/lib/postgresql/data postgres
```
cp ./.docker/dev/compose.yml.example ./.docker/dev/compose.yml
cp ./.docker/dev/.env.example ./.docker/dev/.env
### Optionally, use a dedicated volume
```
docker volume create affine-postgres
docker run --rm --name affine-postgres -e POSTGRES_PASSWORD=affine -p 5432:5432 -v affine-postgres:/var/lib/postgresql/data postgres
```
### mailhog (for local testing)
```
docker run --rm --name mailhog -p 1025:1025 -p 8025:8025 mailhog/mailhog
```
## prepare db
```
docker ps
docker exec -it affine-postgres psql -U postgres ## `affine-postgres` is the container name from the previous step
```
### in the terminal, following the example to user & table
```
psql (15.3 (Debian 15.3-1.pgdg120+1))
Type "help" for help.
postgres=# CREATE USER affine WITH PASSWORD 'affine';
CREATE ROLE
postgres=# ALTER USER affine WITH SUPERUSER;
ALTER ROLE
postgres=# CREATE DATABASE affine;
CREATE DATABASE
postgres=# \du
List of roles
Role name | Attributes | Member of
-----------+------------------------------------------------------------+-----------
affine | Superuser | {}
postgres | Superuser, Create role, Create DB, Replication, Bypass RLS | {}
```
### Set the following config to `packages/backend/server/.env`
In the following setup, we assume you have postgres server running at localhost:5432 and mailhog running at localhost:1025.
When logging in via email, you will see the mail arriving at localhost:8025 in a browser.
```
DATABASE_URL="postgresql://affine:affine@localhost:5432/affine"
MAILER_SENDER="noreply@toeverything.info"
MAILER_USER="auth"
MAILER_PASSWORD="auth"
MAILER_HOST="localhost"
MAILER_PORT="1025"
```
## Prepare prisma
```
yarn workspace @affine/server prisma db push
yarn workspace @affine/server data-migration run
```
Note, you may need to do it again if db schema changed.
### Enable prisma studio
```
yarn workspace @affine/server prisma studio
docker compose -f ./.docker/dev/compose.yml up -d
```
## Build native packages (you need to setup rust toolchain first)
@ -87,27 +19,46 @@ yarn workspace @affine/server prisma studio
```
# build native
yarn workspace @affine/server-native build
yarn workspace @affine/native build
```
## start server
## Prepare dev environment
```
yarn workspace @affine/server dev
cd packages/backend/server
cp .env.example .env
yarn prisma db push
yarn data-migration run
```
when server started, it will created a default user:
email: dev@affine.pro
name: Dev User
password: dev
## start core (web)
## Start server
```
yarn dev
```
when server started, it will created a default user for testing:
- email: dev@affine.pro
- name: Dev User
- password: dev
## Start frontend
```
# at project root
yarn dev
```
## Done
Now you should be able to start developing affine with server enabled.
## Bonus
### Enable prisma studio (Database GUI)
```
# available at http://localhost:5555
yarn prisma studio
```

View File

@ -1,4 +1,9 @@
# AFFINE_SERVER_PORT=3010
# AFFINE_SERVER_HOST=app.affine.pro
# AFFINE_SERVER_HTTPS=true
# DATABASE_URL="postgres://affine:affine@localhost:5432/affine"
# REDIS_SERVER_HOST=localhost
# MAILER_HOST=localhost
# MAILER_PORT=1025
# MAILER_SENDER="noreply@toeverything.info"
# MAILER_USER="noreply@toeverything.info"
# MAILER_PASSWORD="affine"
# MAILER_SECURE=false

View File

@ -19,6 +19,7 @@ import { MailModule } from './base/mailer';
import { MetricsModule } from './base/metrics';
import { MutexModule } from './base/mutex';
import { PrismaModule } from './base/prisma';
import { RedisModule } from './base/redis';
import { RuntimeModule } from './base/runtime';
import { StorageProviderModule } from './base/storage';
import { RateLimiterModule } from './base/throttler';
@ -42,6 +43,7 @@ export const FunctionalityModules = [
ConfigModule.forRoot(),
RuntimeModule,
EventModule,
RedisModule,
CacheModule,
MutexModule,
PrismaModule,

View File

@ -1,4 +1,3 @@
import { Type } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import type { NestExpressApplication } from '@nestjs/platform-express';
import cookieParser from 'cookie-parser';
@ -9,7 +8,7 @@ import {
CloudThrottlerGuard,
GlobalExceptionFilter,
} from './base';
import { SocketIoAdapter, SocketIoAdapterImpl } from './base/websocket';
import { SocketIoAdapter } from './base/websocket';
import { AuthGuard } from './core/auth';
import { ENABLED_FEATURES } from './core/config/server-feature';
import { serverTimingAndCache } from './middleware/timing';
@ -44,13 +43,6 @@ export async function createApp() {
app.use(cookieParser());
if (AFFiNE.flavor.sync) {
const SocketIoAdapter = app.get<Type<SocketIoAdapter>>(
SocketIoAdapterImpl,
{
strict: false,
}
);
const adapter = new SocketIoAdapter(app);
app.useWebSocketAdapter(adapter);
}

View File

@ -1,51 +0,0 @@
export interface CacheSetOptions {
/**
* in milliseconds
*/
ttl?: number;
}
// extends if needed
export interface Cache {
// standard operation
get<T = unknown>(key: string): Promise<T | undefined>;
set<T = unknown>(
key: string,
value: T,
opts?: CacheSetOptions
): Promise<boolean>;
setnx<T = unknown>(
key: string,
value: T,
opts?: CacheSetOptions
): Promise<boolean>;
increase(key: string, count?: number): Promise<number>;
decrease(key: string, count?: number): Promise<number>;
delete(key: string): Promise<boolean>;
has(key: string): Promise<boolean>;
ttl(key: string): Promise<number>;
expire(key: string, ttl: number): Promise<boolean>;
// list operations
pushBack<T = unknown>(key: string, ...values: T[]): Promise<number>;
pushFront<T = unknown>(key: string, ...values: T[]): Promise<number>;
len(key: string): Promise<number>;
list<T = unknown>(key: string, start: number, end: number): Promise<T[]>;
popFront<T = unknown>(key: string, count?: number): Promise<T[]>;
popBack<T = unknown>(key: string, count?: number): Promise<T[]>;
// map operations
mapSet<T = unknown>(
map: string,
key: string,
value: T,
opts: CacheSetOptions
): Promise<boolean>;
mapIncrease(map: string, key: string, count?: number): Promise<number>;
mapDecrease(map: string, key: string, count?: number): Promise<number>;
mapGet<T = unknown>(map: string, key: string): Promise<T | undefined>;
mapDelete(map: string, key: string): Promise<boolean>;
mapKeys(map: string): Promise<string[]>;
mapRandomKey(map: string): Promise<string | undefined>;
mapLen(map: string): Promise<number>;
}

View File

@ -1,13 +1,18 @@
import { Injectable } from '@nestjs/common';
import { LocalCache } from './local';
import { CacheRedis, SessionRedis } from '../redis';
import { CacheProvider } from './provider';
@Injectable()
export class Cache extends LocalCache {}
@Injectable()
export class SessionCache extends LocalCache {
constructor() {
super({ namespace: 'session' });
export class Cache extends CacheProvider {
constructor(redis: CacheRedis) {
super(redis);
}
}
@Injectable()
export class SessionCache extends CacheProvider {
constructor(redis: SessionRedis) {
super(redis);
}
}

View File

@ -1,286 +0,0 @@
import Keyv, { KeyvOptions } from 'keyv';
import type { Cache, CacheSetOptions } from './def';
export class LocalCache implements Cache {
private readonly kv: Keyv;
constructor(opts: KeyvOptions = {}) {
this.kv = new Keyv(opts);
}
// standard operation
async get<T = unknown>(key: string): Promise<T | undefined> {
return this.kv.get<T>(key).catch(() => undefined);
}
async set<T = unknown>(
key: string,
value: T,
opts: CacheSetOptions = {}
): Promise<boolean> {
return this.kv
.set(key, value, opts.ttl)
.then(() => true)
.catch(() => false);
}
async setnx<T = unknown>(
key: string,
value: T,
opts?: CacheSetOptions | undefined
): Promise<boolean> {
if (!(await this.has(key))) {
return this.set(key, value, opts);
}
return false;
}
async increase(key: string, count: number = 1): Promise<number> {
const prev = (await this.get(key)) ?? 0;
if (typeof prev !== 'number') {
throw new Error(
`Expect a Number keyed by ${key}, but found ${typeof prev}`
);
}
const curr = prev + count;
return (await this.set(key, curr)) ? curr : prev;
}
async decrease(key: string, count: number = 1): Promise<number> {
return this.increase(key, -count);
}
async delete(key: string): Promise<boolean> {
return this.kv.delete(key).catch(() => false);
}
async has(key: string): Promise<boolean> {
return this.kv.has(key).catch(() => false);
}
async ttl(key: string): Promise<number> {
return this.kv
.get(key, { raw: true })
.then(raw => (raw?.expires ? raw.expires - Date.now() : Infinity))
.catch(() => 0);
}
async expire(key: string, ttl: number): Promise<boolean> {
const value = await this.kv.get(key);
return this.set(key, value, { ttl });
}
// list operations
private async getArray<T = unknown>(key: string) {
const raw = await this.kv.get<T[]>(key, { raw: true });
if (raw && !Array.isArray(raw.value)) {
throw new Error(
`Expect an Array keyed by ${key}, but found ${raw.value}`
);
}
return raw;
}
private async setArray<T = unknown>(
key: string,
value: T[],
opts: CacheSetOptions = {}
) {
return this.set(key, value, opts).then(() => value.length);
}
async pushBack<T = unknown>(key: string, ...values: T[]): Promise<number> {
let list: any[] = [];
let ttl: number | undefined = undefined;
const raw = await this.getArray(key);
if (raw) {
if (raw.value) {
list = raw.value;
}
if (raw.expires) {
ttl = raw.expires - Date.now();
}
}
list = list.concat(values);
return this.setArray(key, list, { ttl });
}
async pushFront<T = unknown>(key: string, ...values: T[]): Promise<number> {
let list: any[] = [];
let ttl: number | undefined = undefined;
const raw = await this.getArray(key);
if (raw) {
if (raw.value) {
list = raw.value;
}
if (raw.expires) {
ttl = raw.expires - Date.now();
}
}
list = values.concat(list);
return this.setArray(key, list, { ttl });
}
async len(key: string): Promise<number> {
return this.getArray<any[]>(key).then(v => v?.value?.length ?? 0);
}
/**
* list array elements with `[start, end]`
* the end indice is inclusive
*/
async list<T = unknown>(
key: string,
start: number,
end: number
): Promise<T[]> {
const raw = await this.getArray<T>(key);
if (raw?.value) {
start = (raw.value.length + start) % raw.value.length;
end = ((raw.value.length + end) % raw.value.length) + 1;
return raw.value.slice(start, end);
} else {
return [];
}
}
private async trim<T = unknown>(key: string, start: number, end: number) {
const raw = await this.getArray<T>(key);
if (raw && raw.value) {
start = (raw.value.length + start) % raw.value.length;
// make negative end index work, and end indice is inclusive
end = ((raw.value.length + end) % raw.value.length) + 1;
const result = raw.value.splice(start, end);
await this.set(key, raw.value, {
ttl: raw.expires ? raw.expires - Date.now() : undefined,
});
return result;
}
return [];
}
async popFront<T = unknown>(key: string, count: number = 1) {
return this.trim<T>(key, 0, count - 1);
}
async popBack<T = unknown>(key: string, count: number = 1) {
return this.trim<T>(key, -count, count - 1);
}
// map operations
private async getMap<T = unknown>(map: string) {
const raw = await this.kv.get<Record<string, T>>(map, { raw: true });
if (raw) {
if (typeof raw.value !== 'object') {
throw new Error(
`Expect an Object keyed by ${map}, but found ${typeof raw}`
);
}
if (Array.isArray(raw.value)) {
throw new Error(`Expect an Object keyed by ${map}, but found an Array`);
}
}
return raw;
}
private async setMap<T = unknown>(
map: string,
value: Record<string, T>,
opts: CacheSetOptions = {}
) {
return this.kv.set(map, value, opts.ttl).then(() => true);
}
async mapGet<T = unknown>(map: string, key: string): Promise<T | undefined> {
const raw = await this.getMap<T>(map);
if (raw?.value) {
return raw.value[key];
}
return undefined;
}
async mapSet<T = unknown>(
map: string,
key: string,
value: T
): Promise<boolean> {
const raw = await this.getMap(map);
const data = raw?.value ?? {};
data[key] = value;
return this.setMap(map, data, {
ttl: raw?.expires ? raw.expires - Date.now() : undefined,
});
}
async mapDelete(map: string, key: string): Promise<boolean> {
const raw = await this.getMap(map);
if (raw?.value) {
delete raw.value[key];
return this.setMap(map, raw.value, {
ttl: raw.expires ? raw.expires - Date.now() : undefined,
});
}
return false;
}
async mapIncrease(
map: string,
key: string,
count: number = 1
): Promise<number> {
const prev = (await this.mapGet(map, key)) ?? 0;
if (typeof prev !== 'number') {
throw new Error(
`Expect a Number keyed by ${key}, but found ${typeof prev}`
);
}
const curr = prev + count;
return (await this.mapSet(map, key, curr)) ? curr : prev;
}
async mapDecrease(
map: string,
key: string,
count: number = 1
): Promise<number> {
return this.mapIncrease(map, key, -count);
}
async mapKeys(map: string): Promise<string[]> {
const raw = await this.getMap(map);
if (raw?.value) {
return Object.keys(raw.value);
}
return [];
}
async mapRandomKey(map: string): Promise<string | undefined> {
const keys = await this.mapKeys(map);
return keys[Math.floor(Math.random() * keys.length)];
}
async mapLen(map: string): Promise<number> {
const raw = await this.getMap(map);
return raw?.value ? Object.keys(raw.value).length : 0;
}
}

View File

@ -1,8 +1,13 @@
import { Redis } from 'ioredis';
import Redis from 'ioredis';
import type { Cache, CacheSetOptions } from '../../base/cache/def';
export interface CacheSetOptions {
/**
* in milliseconds
*/
ttl?: number;
}
export class RedisCache implements Cache {
export class CacheProvider {
constructor(private readonly redis: Redis) {}
// standard operation

View File

@ -20,7 +20,7 @@ export * from './guard';
export { CryptoHelper, URLHelper } from './helpers';
export { MailService } from './mailer';
export { CallMetric, metrics } from './metrics';
export { type ILocker, Lock, Locker, Mutex, RequestMutex } from './mutex';
export { Lock, Locker, Mutex, RequestMutex } from './mutex';
export {
GatewayErrorWrapper,
getOptionalModuleMetadata,

View File

@ -1,6 +1,6 @@
import { Global, Module } from '@nestjs/common';
import { Locker } from './local-lock';
import { Locker } from './locker';
import { Mutex, RequestMutex } from './mutex';
@Global()
@ -11,4 +11,4 @@ import { Mutex, RequestMutex } from './mutex';
export class MutexModule {}
export { Locker, Mutex, RequestMutex };
export { type Locker as ILocker, Lock } from './lock';
export { Lock } from './lock';

View File

@ -1,28 +0,0 @@
import { Injectable } from '@nestjs/common';
import { Cache } from '../cache';
import { Lock, Locker as ILocker } from './lock';
@Injectable()
export class Locker implements ILocker {
constructor(private readonly cache: Cache) {}
async lock(owner: string, key: string): Promise<Lock> {
const lockKey = `MutexLock:${key}`;
const prevOwner = await this.cache.get<string>(lockKey);
if (prevOwner && prevOwner !== owner) {
throw new Error(`Lock for resource [${key}] has been holder by others`);
}
const acquired = await this.cache.set(lockKey, owner);
if (acquired) {
return new Lock(async () => {
await this.cache.delete(lockKey);
});
}
throw new Error(`Failed to acquire lock for resource [${key}]`);
}
}

View File

@ -17,7 +17,3 @@ export class Lock implements AsyncDisposable {
await this.release();
}
}
export interface Locker {
lock(owner: string, key: string): Promise<Lock>;
}

View File

@ -1,8 +1,8 @@
import { Injectable, Logger } from '@nestjs/common';
import { Command } from 'ioredis';
import { ILocker, Lock } from '../../base';
import { SessionRedis } from './instances';
import { SessionRedis } from '../redis';
import { Lock } from './lock';
// === atomic mutex lock ===
// acquire lock
@ -36,8 +36,9 @@ else
end`;
@Injectable()
export class RedisMutexLocker implements ILocker {
private readonly logger = new Logger(RedisMutexLocker.name);
export class Locker {
private readonly logger = new Logger(Locker.name);
constructor(private readonly redis: SessionRedis) {}
async lock(owner: string, key: string): Promise<Lock> {

View File

@ -6,7 +6,7 @@ import type { Request } from 'express';
import { GraphqlContext } from '../graphql';
import { retryable } from '../utils/promise';
import { Locker } from './local-lock';
import { Locker } from './locker';
export const MUTEX_RETRY = 5;
export const MUTEX_WAIT = 100;
@ -26,7 +26,7 @@ export class Mutex {
* ```typescript
* {
* // lock is acquired here
* await using lock = await mutex.lock('resource-key');
* await using lock = await mutex.acquire('resource-key');
* if (lock) {
* // do something
* } else {
@ -38,7 +38,7 @@ export class Mutex {
* @param key resource key
* @returns LockGuard
*/
async lock(key: string, owner: string = 'global') {
async acquire(key: string, owner: string = 'global') {
try {
return await retryable(
() => this.locker.lock(owner, key),
@ -83,7 +83,7 @@ export class RequestMutex extends Mutex {
return id;
}
override lock(key: string) {
return super.lock(key, this.getId());
override acquire(key: string) {
return super.acquire(key, this.getId());
}
}

View File

@ -3,9 +3,9 @@ import { RedisOptions } from 'ioredis';
import { defineStartupConfig, ModuleConfig } from '../../base/config';
declare module '../config' {
interface PluginsConfig {
interface AppConfig {
redis: ModuleConfig<RedisOptions>;
}
}
defineStartupConfig('plugins.redis', {});
defineStartupConfig('redis', {});

View File

@ -0,0 +1,14 @@
import './config';
import { Global, Module } from '@nestjs/common';
import { CacheRedis, SessionRedis, SocketIoRedis } from './instances';
@Global()
@Module({
providers: [CacheRedis, SessionRedis, SocketIoRedis],
exports: [CacheRedis, SessionRedis, SocketIoRedis],
})
export class RedisModule {}
export { CacheRedis, SessionRedis, SocketIoRedis };

View File

@ -0,0 +1,35 @@
import { Injectable, OnModuleDestroy } from '@nestjs/common';
import { Redis as IORedis, RedisOptions } from 'ioredis';
import { Config } from '../../base/config';
class Redis extends IORedis implements OnModuleDestroy {
constructor(opts: RedisOptions) {
super(opts);
}
onModuleDestroy() {
this.disconnect();
}
}
@Injectable()
export class CacheRedis extends Redis {
constructor(config: Config) {
super(config.redis);
}
}
@Injectable()
export class SessionRedis extends Redis {
constructor(config: Config) {
super({ ...config.redis, db: (config.redis.db ?? 0) + 2 });
}
}
@Injectable()
export class SocketIoRedis extends Redis {
constructor(config: Config) {
super({ ...config.redis, db: (config.redis.db ?? 0) + 3 });
}
}

View File

@ -0,0 +1,56 @@
import { INestApplication } from '@nestjs/common';
import { IoAdapter } from '@nestjs/platform-socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { Server } from 'socket.io';
import { Config } from '../config';
import { AuthenticationRequired } from '../error';
import { SocketIoRedis } from '../redis';
import { WEBSOCKET_OPTIONS } from './options';
export class SocketIoAdapter extends IoAdapter {
constructor(private readonly app: INestApplication) {
super(app);
}
override createIOServer(port: number, options?: any): Server {
const config = this.app.get(WEBSOCKET_OPTIONS) as Config['websocket'];
const server: Server = super.createIOServer(port, {
...config,
...options,
});
if (config.canActivate) {
server.use((socket, next) => {
// @ts-expect-error checked
config
.canActivate(socket)
.then(pass => {
if (pass) {
next();
} else {
throw new AuthenticationRequired();
}
})
.catch(e => {
next(e);
});
});
}
const pubClient = this.app.get(SocketIoRedis);
pubClient.on('error', err => {
console.error(err);
});
const subClient = pubClient.duplicate();
subClient.on('error', err => {
console.error(err);
});
server.adapter(createAdapter(pubClient, subClient));
return server;
}
}

View File

@ -1,70 +1,14 @@
import './config';
import {
FactoryProvider,
INestApplicationContext,
Module,
Provider,
} from '@nestjs/common';
import { IoAdapter } from '@nestjs/platform-socket.io';
import { Server } from 'socket.io';
import { Module } from '@nestjs/common';
import { Config } from '../config';
import { AuthenticationRequired } from '../error';
export const SocketIoAdapterImpl = Symbol('SocketIoAdapterImpl');
export class SocketIoAdapter extends IoAdapter {
constructor(protected readonly app: INestApplicationContext) {
super(app);
}
override createIOServer(port: number, options?: any): Server {
const config = this.app.get(WEBSOCKET_OPTIONS) as Config['websocket'];
const server: Server = super.createIOServer(port, {
...config,
...options,
});
if (config.canActivate) {
server.use((socket, next) => {
// @ts-expect-error checked
config
.canActivate(socket)
.then(pass => {
if (pass) {
next();
} else {
throw new AuthenticationRequired();
}
})
.catch(e => {
next(e);
});
});
}
return server;
}
}
const SocketIoAdapterImplProvider: Provider = {
provide: SocketIoAdapterImpl,
useValue: SocketIoAdapter,
};
export const WEBSOCKET_OPTIONS = Symbol('WEBSOCKET_OPTIONS');
export const websocketOptionsProvider: FactoryProvider = {
provide: WEBSOCKET_OPTIONS,
useFactory: (config: Config) => {
return config.websocket;
},
inject: [Config],
};
import { WEBSOCKET_OPTIONS, websocketOptionsProvider } from './options';
@Module({
providers: [SocketIoAdapterImplProvider, websocketOptionsProvider],
exports: [SocketIoAdapterImplProvider, websocketOptionsProvider],
providers: [websocketOptionsProvider],
exports: [websocketOptionsProvider],
})
export class WebSocketModule {}
export { WEBSOCKET_OPTIONS };
export { SocketIoAdapter } from './adapter';

View File

@ -0,0 +1,13 @@
import { FactoryProvider } from '@nestjs/common';
import { Config } from '../config';
export const WEBSOCKET_OPTIONS = Symbol('WEBSOCKET_OPTIONS');
export const websocketOptionsProvider: FactoryProvider = {
provide: WEBSOCKET_OPTIONS,
useFactory: (config: Config) => {
return config.websocket;
},
inject: [Config],
};

View File

@ -29,11 +29,11 @@ AFFiNE.ENV_MAP = {
COPILOT_OPENAI_API_KEY: 'plugins.copilot.openai.apiKey',
COPILOT_FAL_API_KEY: 'plugins.copilot.fal.apiKey',
COPILOT_UNSPLASH_API_KEY: 'plugins.copilot.unsplashKey',
REDIS_SERVER_HOST: 'plugins.redis.host',
REDIS_SERVER_PORT: ['plugins.redis.port', 'int'],
REDIS_SERVER_USER: 'plugins.redis.username',
REDIS_SERVER_PASSWORD: 'plugins.redis.password',
REDIS_SERVER_DATABASE: ['plugins.redis.db', 'int'],
REDIS_SERVER_HOST: 'redis.host',
REDIS_SERVER_PORT: ['redis.port', 'int'],
REDIS_SERVER_USER: 'redis.username',
REDIS_SERVER_PASSWORD: 'redis.password',
REDIS_SERVER_DATABASE: ['redis.db', 'int'],
DOC_MERGE_INTERVAL: ['doc.manager.updatePollInterval', 'int'],
STRIPE_API_KEY: 'plugins.payment.stripe.keys.APIKey',
STRIPE_WEBHOOK_KEY: 'plugins.payment.stripe.keys.webhookKey',

View File

@ -58,13 +58,6 @@ AFFiNE.use('copilot', {
apiKey: '',
},
});
AFFiNE.use('redis', {
host: env.REDIS_SERVER_HOST,
db: 0,
port: 6379,
username: env.REDIS_SERVER_USER,
password: env.REDIS_SERVER_PASSWORD,
});
AFFiNE.use('payment', {
stripe: {
keys: {

View File

@ -71,14 +71,6 @@ AFFiNE.server.port = 3010;
// ## Plugins settings ##
// ###############################################################
//
// /* Redis Plugin */
// /* Provide caching and session storing backed by Redis. */
// /* Useful when you deploy AFFiNE server in a cluster. */
// AFFiNE.use('redis', {
// /* override options */
// });
//
//
// /* Payment Plugin */
// AFFiNE.use('payment', {
// stripe: { keys: {}, apiVersion: '2023-10-16' },

View File

@ -175,7 +175,7 @@ export class PgUserspaceDocStorageAdapter extends DocStorageAdapter {
workspaceId: string,
docId: string
) {
const lock = await this.mutex.lock(`userspace:${workspaceId}:${docId}`);
const lock = await this.mutex.acquire(`userspace:${workspaceId}:${docId}`);
if (!lock) {
throw new Error('Too many concurrent writings');

View File

@ -488,7 +488,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
workspaceId: string,
docId: string
) {
const lock = await this.mutex.lock(`doc:update:${workspaceId}:${docId}`);
const lock = await this.mutex.acquire(`doc:update:${workspaceId}:${docId}`);
if (!lock) {
throw new Error('Too many concurrent writings');

View File

@ -38,7 +38,7 @@ export class CustomSetupController {
throw new PasswordRequired();
}
await using lock = await this.mutex.lock('createFirstAdmin');
await using lock = await this.mutex.acquire('createFirstAdmin');
if (!lock) {
throw new InternalServerError();

View File

@ -80,7 +80,7 @@ export class TeamWorkspaceResolver {
// lock to prevent concurrent invite
const lockFlag = `invite:${workspaceId}`;
await using lock = await this.mutex.lock(lockFlag);
await using lock = await this.mutex.acquire(lockFlag);
if (!lock) {
return new TooManyRequest();
}
@ -231,7 +231,7 @@ export class TeamWorkspaceResolver {
try {
// lock to prevent concurrent invite and grant
const lockFlag = `invite:${workspaceId}`;
await using lock = await this.mutex.lock(lockFlag);
await using lock = await this.mutex.acquire(lockFlag);
if (!lock) {
return new TooManyRequest();
}
@ -281,7 +281,7 @@ export class TeamWorkspaceResolver {
try {
// lock to prevent concurrent invite and grant
const lockFlag = `invite:${workspaceId}`;
await using lock = await this.mutex.lock(lockFlag);
await using lock = await this.mutex.acquire(lockFlag);
if (!lock) {
return new TooManyRequest();
}

View File

@ -399,7 +399,7 @@ export class WorkspaceResolver {
try {
// lock to prevent concurrent invite and grant
const lockFlag = `invite:${workspaceId}`;
await using lock = await this.mutex.lock(lockFlag);
await using lock = await this.mutex.acquire(lockFlag);
if (!lock) {
return new TooManyRequest();
}
@ -524,7 +524,7 @@ export class WorkspaceResolver {
@Args('sendAcceptMail', { nullable: true }) sendAcceptMail: boolean
) {
const lockFlag = `invite:${workspaceId}`;
await using lock = await this.mutex.lock(lockFlag);
await using lock = await this.mutex.acquire(lockFlag);
if (!lock) {
return new TooManyRequest();
}

View File

@ -359,7 +359,7 @@ export class CopilotResolver {
user.id
);
const lockFlag = `${COPILOT_LOCKER}:session:${user.id}:${options.workspaceId}`;
await using lock = await this.mutex.lock(lockFlag);
await using lock = await this.mutex.acquire(lockFlag);
if (!lock) {
return new TooManyRequest('Server is busy');
}
@ -387,7 +387,7 @@ export class CopilotResolver {
user.id
);
const lockFlag = `${COPILOT_LOCKER}:session:${user.id}:${options.workspaceId}`;
await using lock = await this.mutex.lock(lockFlag);
await using lock = await this.mutex.acquire(lockFlag);
if (!lock) {
return new TooManyRequest('Server is busy');
}
@ -418,7 +418,7 @@ export class CopilotResolver {
return new NotFoundException('Session not found');
}
const lockFlag = `${COPILOT_LOCKER}:session:${user.id}:${options.workspaceId}`;
await using lock = await this.mutex.lock(lockFlag);
await using lock = await this.mutex.acquire(lockFlag);
if (!lock) {
return new TooManyRequest('Server is busy');
}
@ -439,7 +439,7 @@ export class CopilotResolver {
options: CreateChatMessageInput
) {
const lockFlag = `${COPILOT_LOCKER}:message:${user?.id}:${options.sessionId}`;
await using lock = await this.mutex.lock(lockFlag);
await using lock = await this.mutex.acquire(lockFlag);
if (!lock) {
return new TooManyRequest('Server is busy');
}

View File

@ -3,7 +3,6 @@ import './copilot';
import './gcloud';
import './oauth';
import './payment';
import './redis';
import './storage';
export {

View File

@ -555,7 +555,7 @@ export class SubscriptionService implements OnApplicationBootstrap {
return;
}
await using lock = await this.mutex.lock('init stripe prices');
await using lock = await this.mutex.acquire('init stripe prices');
if (!lock) {
return;

View File

@ -1,67 +0,0 @@
import './config';
import { Global, Provider, Type } from '@nestjs/common';
import { Redis } from 'ioredis';
import { ThrottlerStorageRedisService } from 'nestjs-throttler-storage-redis';
import { Cache, Locker, SessionCache } from '../../base';
import { ThrottlerStorage } from '../../base/throttler';
import { SocketIoAdapterImpl } from '../../base/websocket';
import { Plugin } from '../registry';
import { RedisCache } from './cache';
import { CacheRedis, SessionRedis, SocketIoRedis } from './instances';
import { RedisMutexLocker } from './mutex';
import { createSockerIoAdapterImpl } from './ws-adapter';
function makeProvider(token: Type, impl: Type<Redis>): Provider {
return {
provide: token,
useFactory: (redis: Redis) => {
return new RedisCache(redis);
},
inject: [impl],
};
}
// cache
const cacheProvider = makeProvider(Cache, CacheRedis);
const sessionCacheProvider = makeProvider(SessionCache, SessionRedis);
// throttler
const throttlerStorageProvider: Provider = {
provide: ThrottlerStorage,
useFactory: (redis: Redis) => {
return new ThrottlerStorageRedisService(redis);
},
inject: [SessionRedis],
};
// socket io
const socketIoRedisAdapterProvider: Provider = {
provide: SocketIoAdapterImpl,
useFactory: (redis: Redis) => {
return createSockerIoAdapterImpl(redis);
},
inject: [SocketIoRedis],
};
// mutex
const mutexRedisAdapterProvider: Provider = {
provide: Locker,
useClass: RedisMutexLocker,
};
@Global()
@Plugin({
name: 'redis',
providers: [CacheRedis, SessionRedis, SocketIoRedis],
overrides: [
cacheProvider,
sessionCacheProvider,
socketIoRedisAdapterProvider,
throttlerStorageProvider,
mutexRedisAdapterProvider,
],
requires: ['plugins.redis.host'],
})
export class RedisModule {}

View File

@ -1,49 +0,0 @@
import {
Injectable,
Logger,
OnModuleDestroy,
OnModuleInit,
} from '@nestjs/common';
import { Redis as IORedis, RedisOptions } from 'ioredis';
import { Config } from '../../base/config';
class Redis extends IORedis implements OnModuleDestroy, OnModuleInit {
logger = new Logger(Redis.name);
constructor(opts: RedisOptions) {
super({
...opts,
lazyConnect: true,
});
}
async onModuleInit() {
await this.connect().catch(() => {
this.logger.error('Failed to connect to Redis server.');
});
}
onModuleDestroy() {
this.disconnect();
}
}
@Injectable()
export class CacheRedis extends Redis {
constructor(config: Config) {
super(config.plugins.redis);
}
}
@Injectable()
export class SessionRedis extends Redis {
constructor(config: Config) {
super({ ...config.plugins.redis, db: (config.plugins.redis.db ?? 0) + 2 });
}
}
@Injectable()
export class SocketIoRedis extends Redis {
constructor(config: Config) {
super({ ...config.plugins.redis, db: (config.plugins.redis.db ?? 0) + 3 });
}
}

View File

@ -1,28 +0,0 @@
import { createAdapter } from '@socket.io/redis-adapter';
import { Redis } from 'ioredis';
import { Server, ServerOptions } from 'socket.io';
import { SocketIoAdapter } from '../../base/websocket';
export function createSockerIoAdapterImpl(
redis: Redis
): typeof SocketIoAdapter {
class RedisIoAdapter extends SocketIoAdapter {
override createIOServer(port: number, options?: ServerOptions): Server {
const pubClient = redis;
pubClient.on('error', err => {
console.error(err);
});
const subClient = pubClient.duplicate();
subClient.on('error', err => {
console.error(err);
});
const server = super.createIOServer(port, options);
server.adapter(createAdapter(pubClient, subClient));
return server;
}
}
return RedisIoAdapter;
}