From 2aceed8824f8a50a36e3242b527dd766ce5fff2f Mon Sep 17 00:00:00 2001 From: EYHN Date: Mon, 11 Nov 2024 02:32:29 +0000 Subject: [PATCH] fix(infra): retry search and aggregate on indexeddb (#8767) --- .../src/sync/indexer/impl/indexeddb/index.ts | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/packages/common/infra/src/sync/indexer/impl/indexeddb/index.ts b/packages/common/infra/src/sync/indexer/impl/indexeddb/index.ts index 2cbca87a5b..ea1153a32f 100644 --- a/packages/common/infra/src/sync/indexer/impl/indexeddb/index.ts +++ b/packages/common/infra/src/sync/indexer/impl/indexeddb/index.ts @@ -1,6 +1,8 @@ +import { DebugLogger } from '@affine/debug'; import type { Observable } from 'rxjs'; -import { from, merge, of, Subject, throttleTime } from 'rxjs'; +import { merge, of, Subject, throttleTime } from 'rxjs'; +import { backoffRetry, fromPromise } from '../../../../livedata'; import { exhaustMapWithTrailing } from '../../../../utils/'; import { type AggregateOptions, @@ -16,6 +18,8 @@ import { } from '../../'; import { DataStruct, type DataStructRWTransaction } from './data-struct'; +const logger = new DebugLogger('IndexedDBIndex'); + export class IndexedDBIndex implements Index { data: DataStruct = new DataStruct(this.databaseName, this.schema); broadcast$ = new Subject(); @@ -63,12 +67,15 @@ export class IndexedDBIndex implements Index { return merge(of(1), this.broadcast$).pipe( throttleTime(3000, undefined, { leading: true, trailing: true }), exhaustMapWithTrailing(() => { - return from( - (async () => { + return fromPromise(async () => { + try { const trx = await this.data.readonly(); return this.data.search(trx, query, options); - })() - ); + } catch (error) { + logger.error('search error', error); + throw error; + } + }).pipe(backoffRetry()); }) ); } @@ -90,12 +97,15 @@ export class IndexedDBIndex implements Index { return merge(of(1), this.broadcast$).pipe( throttleTime(3000, undefined, { leading: true, trailing: true }), exhaustMapWithTrailing(() => { - return from( - (async () => { + return fromPromise(async () => { + try { const trx = await this.data.readonly(); return this.data.aggregate(trx, query, field, options); - })() - ); + } catch (error) { + logger.error('aggregate error', error); + throw error; + } + }).pipe(backoffRetry()); }) ); }