mirror of
https://github.com/toeverything/AFFiNE.git
synced 2024-12-22 23:01:35 +03:00
fix(infra): retry search and aggregate on indexeddb (#8767)
This commit is contained in:
parent
e2f281ac18
commit
2aceed8824
@ -1,6 +1,8 @@
|
|||||||
|
import { DebugLogger } from '@affine/debug';
|
||||||
import type { Observable } from 'rxjs';
|
import type { Observable } from 'rxjs';
|
||||||
import { from, merge, of, Subject, throttleTime } from 'rxjs';
|
import { merge, of, Subject, throttleTime } from 'rxjs';
|
||||||
|
|
||||||
|
import { backoffRetry, fromPromise } from '../../../../livedata';
|
||||||
import { exhaustMapWithTrailing } from '../../../../utils/';
|
import { exhaustMapWithTrailing } from '../../../../utils/';
|
||||||
import {
|
import {
|
||||||
type AggregateOptions,
|
type AggregateOptions,
|
||||||
@ -16,6 +18,8 @@ import {
|
|||||||
} from '../../';
|
} from '../../';
|
||||||
import { DataStruct, type DataStructRWTransaction } from './data-struct';
|
import { DataStruct, type DataStructRWTransaction } from './data-struct';
|
||||||
|
|
||||||
|
const logger = new DebugLogger('IndexedDBIndex');
|
||||||
|
|
||||||
export class IndexedDBIndex<S extends Schema> implements Index<S> {
|
export class IndexedDBIndex<S extends Schema> implements Index<S> {
|
||||||
data: DataStruct = new DataStruct(this.databaseName, this.schema);
|
data: DataStruct = new DataStruct(this.databaseName, this.schema);
|
||||||
broadcast$ = new Subject();
|
broadcast$ = new Subject();
|
||||||
@ -63,12 +67,15 @@ export class IndexedDBIndex<S extends Schema> implements Index<S> {
|
|||||||
return merge(of(1), this.broadcast$).pipe(
|
return merge(of(1), this.broadcast$).pipe(
|
||||||
throttleTime(3000, undefined, { leading: true, trailing: true }),
|
throttleTime(3000, undefined, { leading: true, trailing: true }),
|
||||||
exhaustMapWithTrailing(() => {
|
exhaustMapWithTrailing(() => {
|
||||||
return from(
|
return fromPromise(async () => {
|
||||||
(async () => {
|
try {
|
||||||
const trx = await this.data.readonly();
|
const trx = await this.data.readonly();
|
||||||
return this.data.search(trx, query, options);
|
return this.data.search(trx, query, options);
|
||||||
})()
|
} catch (error) {
|
||||||
);
|
logger.error('search error', error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}).pipe(backoffRetry());
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -90,12 +97,15 @@ export class IndexedDBIndex<S extends Schema> implements Index<S> {
|
|||||||
return merge(of(1), this.broadcast$).pipe(
|
return merge(of(1), this.broadcast$).pipe(
|
||||||
throttleTime(3000, undefined, { leading: true, trailing: true }),
|
throttleTime(3000, undefined, { leading: true, trailing: true }),
|
||||||
exhaustMapWithTrailing(() => {
|
exhaustMapWithTrailing(() => {
|
||||||
return from(
|
return fromPromise(async () => {
|
||||||
(async () => {
|
try {
|
||||||
const trx = await this.data.readonly();
|
const trx = await this.data.readonly();
|
||||||
return this.data.aggregate(trx, query, field, options);
|
return this.data.aggregate(trx, query, field, options);
|
||||||
})()
|
} catch (error) {
|
||||||
);
|
logger.error('aggregate error', error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}).pipe(backoffRetry());
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user