fix(server): del staled update count cache if unmatch (#5674)

This commit is contained in:
liuyi 2024-01-23 08:19:29 +00:00
parent 8300df4a26
commit 62169c59c8
No known key found for this signature in database
GPG Key ID: 56709255DC7EC728
2 changed files with 60 additions and 13 deletions

View File

@ -4,6 +4,7 @@ import {
OnModuleDestroy,
OnModuleInit,
} from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { Snapshot, Update } from '@prisma/client';
import { chunk } from 'lodash-es';
import { defer, retry } from 'rxjs';
@ -83,6 +84,7 @@ export function isEmptyBuffer(buf: Buffer): boolean {
}
const MAX_SEQ_NUM = 0x3fffffff; // u31
const UPDATES_QUEUE_CACHE_KEY = 'doc:manager:updates';
/**
* Since we can't directly save all client updates into database, in which way the database will overload,
@ -667,26 +669,44 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
count: number
) {
const result = await this.cache.mapIncrease(
`doc:manager:updates`,
UPDATES_QUEUE_CACHE_KEY,
`${workspaceId}::${guid}`,
count
);
if (result <= 0) {
await this.cache.mapDelete(
`doc:manager:updates`,
UPDATES_QUEUE_CACHE_KEY,
`${workspaceId}::${guid}`
);
}
}
private async getAutoSquashCandidateFromCache() {
const key = await this.cache.mapRandomKey('doc:manager:updates');
const key = await this.cache.mapRandomKey(UPDATES_QUEUE_CACHE_KEY);
if (key) {
const count = await this.cache.mapGet<number>('doc:manager:updates', key);
if (typeof count === 'number' && count > 0) {
const cachedCount = await this.cache.mapIncrease(
UPDATES_QUEUE_CACHE_KEY,
key,
0
);
if (cachedCount > 0) {
const [workspaceId, id] = key.split('::');
const count = await this.db.update.count({
where: {
workspaceId,
id,
},
});
// FIXME(@forehalo): somehow the update count in cache is not accurate
if (count === 0) {
await this.cache.mapDelete(UPDATES_QUEUE_CACHE_KEY, key);
return null;
}
return { id, workspaceId };
}
}
@ -694,22 +714,38 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
return null;
}
private async doWithLock<T>(lock: string, job: () => Promise<T>) {
private async doWithLock<T>(
lockScope: string,
lockResource: string,
job: () => Promise<T>
) {
const lock = `lock:${lockScope}:${lockResource}`;
const acquired = await this.cache.setnx(lock, 1, {
ttl: 60 * 1000,
});
metrics.doc.counter('lock').add(1, { scope: lockScope });
if (!acquired) {
metrics.doc.counter('lock_failed').add(1, { scope: lockScope });
return;
}
metrics.doc.counter('lock_required').add(1, { scope: lockScope });
try {
return await job();
} finally {
await this.cache.delete(lock).catch(e => {
// safe, the lock will be expired when ttl ends
this.logger.error(`Failed to release lock ${lock}`, e);
});
await this.cache
.delete(lock)
.then(() => {
metrics.doc.counter('lock_released').add(1, { scope: lockScope });
})
.catch(e => {
metrics.doc
.counter('lock_release_failed')
.add(1, { scope: lockScope });
// safe, the lock will be expired when ttl ends
this.logger.error(`Failed to release lock ${lock}`, e);
});
}
}
@ -719,7 +755,8 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
job: () => Promise<T>
) {
return this.doWithLock(
`doc:manager:updates-lock:${workspaceId}::${guid}`,
'doc:manager:updates',
`${workspaceId}::${guid}`,
job
);
}
@ -730,8 +767,16 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
job: () => Promise<T>
) {
return this.doWithLock(
`doc:manager:snapshot-lock:${workspaceId}::${guid}`,
'doc:manager:snapshot',
`${workspaceId}::${guid}`,
job
);
}
@Cron(CronExpression.EVERY_MINUTE)
async reportUpdatesQueueCount() {
metrics.doc
.gauge('updates_queue_count')
.record(await this.db.update.count());
}
}

View File

@ -1,5 +1,7 @@
export interface CacheSetOptions {
// in milliseconds
/**
* in milliseconds
*/
ttl?: number;
}