From b5f24fbd5d73bd4e8789805d722dbe41c3bf6128 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Thu, 21 Nov 2024 14:38:14 +0700 Subject: [PATCH] UBERF-8592: Fix live query performance (#7189) Signed-off-by: Andrey Sobolev --- dev/docker-compose.yaml | 3 +- dev/tool/package.json | 7 +- dev/tool/src/__start.ts | 9 + dev/tool/src/index.ts | 18 +- packages/core/src/memdb.ts | 2 +- packages/core/src/query.ts | 20 +- packages/presentation/src/utils.ts | 11 + packages/query/src/__tests__/query.test.ts | 45 ++ packages/query/src/index.ts | 559 ++++++++---------- packages/query/src/refs.ts | 91 +++ packages/query/src/results.ts | 99 ++++ packages/query/src/types.ts | 17 + .../src/inboxNotificationsClient.ts | 8 +- plugins/view-resources/src/actions.ts | 16 +- .../src/components/filter/ObjectFilter.svelte | 66 ++- .../src/components/filter/ValueFilter.svelte | 48 +- qms-tests/docker-compose.yaml | 14 +- qms-tests/tool-local.sh | 1 + qms-tests/tool.sh | 1 + server/account/src/operations.ts | 4 +- server/mongo/src/storage.ts | 6 + server/postgres/src/storage.ts | 6 + tests/tool-local.sh | 1 + tests/tool-pg.sh | 1 + tests/tool.sh | 1 + 25 files changed, 674 insertions(+), 380 deletions(-) create mode 100644 packages/query/src/refs.ts create mode 100644 packages/query/src/results.ts create mode 100644 packages/query/src/types.ts diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index 90db31d1bb..c83b0f3c48 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -36,7 +36,8 @@ services: - "8089:8080" command: start-single-node --insecure volumes: - - cockroach_db:/cockroach/cockroach-data" + - cockroach_db:/cockroach/cockroach-data + restart: unless-stopped minio: image: 'minio/minio' command: server /data --address ":9000" --console-address ":9001" diff --git a/dev/tool/package.json b/dev/tool/package.json index 05cd1c86fb..c92223a0f5 100644 --- a/dev/tool/package.json +++ b/dev/tool/package.json @@ -19,9 +19,10 @@ "docker:tbuild": "docker build -t hardcoreeng/tool . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/tool", "docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/tool staging", "docker:push": "../../common/scripts/docker_tag.sh hardcoreeng/tool", - "run-local": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 DB_URL=mongodb://localhost:27017 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --expose-gc --max-old-space-size=18000 ./bundle/bundle.js", - "run-local-pg": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 DB_URL=postgresql://postgres:example@localhost:5432 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --expose-gc --max-old-space-size=18000 ./bundle/bundle.js", - "run-local-brk": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 DB_URL=mongodb://localhost:27017 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --inspect-brk --enable-source-maps --max-old-space-size=18000 ./bundle/bundle.js", + "run-local": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost ACCOUNT_DB_URL=mongodb://localhost:27017 DB_URL=mongodb://localhost:27017 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --expose-gc --max-old-space-size=18000 ./bundle/bundle.js", + "run-local-pg": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost ACCOUNT_DB_URL=mongodb://localhost:27017 DB_URL=postgresql://postgres:example@localhost:5432 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --expose-gc --max-old-space-size=18000 ./bundle/bundle.js", + "run-local-cr": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3332 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost ACCOUNT_DB_URL=mongodb://localhost:27017 DB_URL=postgresql://root@host.docker.internal:26257/defaultdb?sslmode=disable TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --expose-gc --max-old-space-size=18000 ./bundle/bundle.js", + "run-local-brk": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost ACCOUNT_DB_URL=mongodb://localhost:27017 DB_URL=mongodb://localhost:27017 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --inspect-brk --enable-source-maps --max-old-space-size=18000 ./bundle/bundle.js", "run": "rush bundle --to @hcengineering/tool >/dev/null && cross-env node --max-old-space-size=8000 ./bundle/bundle.js", "upgrade": "rushx run-local upgrade", "format": "format src", diff --git a/dev/tool/src/__start.ts b/dev/tool/src/__start.ts index 293126ca5c..a0c33c3d39 100644 --- a/dev/tool/src/__start.ts +++ b/dev/tool/src/__start.ts @@ -92,4 +92,13 @@ export function getMongoDBUrl (): string { return url } +export function getAccountDBUrl (): string { + const url = process.env.ACCOUNT_DB_URL + if (url === undefined) { + console.error('please provide mongo ACCOUNT_DB_URL') + process.exit(1) + } + return url +} + devTool(prepareTools) diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 12673060a4..78824be277 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -100,7 +100,7 @@ import { backupDownload } from '@hcengineering/server-backup/src/backup' import type { PipelineFactory, StorageAdapter, StorageAdapterEx } from '@hcengineering/server-core' import { deepEqual } from 'fast-equals' import { createWriteStream, readFileSync } from 'fs' -import { getMongoDBUrl } from './__start' +import { getAccountDBUrl, getMongoDBUrl } from './__start' import { benchmark, benchmarkWorker, @@ -429,8 +429,10 @@ export function devTool ( .option('-f|--force [force]', 'Force update', true) .option('-i|--indexes [indexes]', 'Force indexes rebuild', false) .action(async (workspace, cmd: { force: boolean, indexes: boolean }) => { - const { dbUrl, version, txes, migrateOperations } = prepareTools() - await withDatabase(dbUrl, async (db) => { + const { version, txes, migrateOperations } = prepareTools() + + const accountUrl = getAccountDBUrl() + await withDatabase(accountUrl, async (db) => { const info = await getWorkspaceById(db, workspace) if (info === null) { throw new Error(`workspace ${workspace} not found`) @@ -468,16 +470,18 @@ export function devTool ( .description('upgrade') .option('-l|--logs ', 'Default logs folder', './logs') .option('-i|--ignore [ignore]', 'Ignore workspaces', '') + .option('-r|--region [region]', 'Region of workspaces', '') .option( '-c|--console', 'Display all information into console(default will create logs folder with {workspace}.log files', false ) .option('-f|--force [force]', 'Force update', false) - .action(async (cmd: { logs: string, force: boolean, console: boolean, ignore: string }) => { - const { dbUrl, version, txes, migrateOperations } = prepareTools() - await withDatabase(dbUrl, async (db) => { - const workspaces = (await listWorkspacesRaw(db)).filter((ws) => !cmd.ignore.includes(ws.workspace)) + .action(async (cmd: { logs: string, force: boolean, console: boolean, ignore: string, region: string }) => { + const { version, txes, migrateOperations } = prepareTools() + const accountUrl = getAccountDBUrl() + await withDatabase(accountUrl, async (db) => { + const workspaces = (await listWorkspacesRaw(db, cmd.region)).filter((ws) => !cmd.ignore.includes(ws.workspace)) workspaces.sort((a, b) => b.lastVisit - a.lastVisit) const measureCtx = new MeasureMetricsContext('upgrade', {}) diff --git a/packages/core/src/memdb.ts b/packages/core/src/memdb.ts index 471e940869..82b9fd9a10 100644 --- a/packages/core/src/memdb.ts +++ b/packages/core/src/memdb.ts @@ -183,7 +183,7 @@ export abstract class MemDb extends TxProcessor implements Storage { result = matchQuery(result, query, _class, this.hierarchy) } - if (options?.sort !== undefined) await resultSort(result, options?.sort, _class, this.hierarchy, this) + if (options?.sort !== undefined) resultSort(result, options?.sort, _class, this.hierarchy, this) const total = result.length result = result.slice(0, options?.limit) const tresult = this.hierarchy.clone(result) as WithLookup[] diff --git a/packages/core/src/query.ts b/packages/core/src/query.ts index 703e8e9a24..ae0824e5df 100644 --- a/packages/core/src/query.ts +++ b/packages/core/src/query.ts @@ -1,10 +1,10 @@ -import { DocumentQuery } from '.' +import { DocumentQuery, type MemDb } from '.' import { Class, Doc, Enum, EnumOf, Ref } from './classes' import core from './component' import { Hierarchy } from './hierarchy' import { getObjectValue } from './objvalue' import { createPredicates, isPredicate } from './predicate' -import { SortQuerySelector, SortingOrder, SortingQuery, SortingRules, Storage } from './storage' +import { SortQuerySelector, SortingOrder, SortingQuery, SortingRules } from './storage' /** * @public @@ -47,14 +47,14 @@ function getEnumValue ( /** * @public */ -export async function resultSort ( +export function resultSort ( result: T[], sortOptions: SortingQuery, _class: Ref>, hierarchy: Hierarchy, - modelDb: Storage -): Promise { - const enums = await getEnums(_class, sortOptions, hierarchy, modelDb) + modelDb: MemDb +): void { + const enums = getEnums(_class, sortOptions, hierarchy, modelDb) const sortFunc = (a: any, b: any): number => { for (const key in sortOptions) { const _enum = enums[key] @@ -116,12 +116,12 @@ function getSortingResult (aValue: any, bValue: any, order: SortingOrder | Sorti return res * orderOrder } -async function getEnums ( +function getEnums ( _class: Ref>, sortOptions: SortingQuery, hierarchy: Hierarchy, - modelDb: Storage -): Promise> { + modelDb: MemDb +): Record { const res: Record = {} for (const key in sortOptions) { const attr = hierarchy.findAttribute(_class, key) @@ -129,7 +129,7 @@ async function getEnums ( if (attr !== undefined) { if (attr.type._class === core.class.EnumOf) { const ref = (attr.type as EnumOf).of - const enu = await modelDb.findAll(core.class.Enum, { _id: ref }) + const enu = modelDb.findAllSync(core.class.Enum, { _id: ref }) res[key] = enu[0] } } diff --git a/packages/presentation/src/utils.ts b/packages/presentation/src/utils.ts index b94076d6d3..b92c77d889 100644 --- a/packages/presentation/src/utils.ts +++ b/packages/presentation/src/utils.ts @@ -240,6 +240,14 @@ export function getClient (): TxOperations & Client & OptimisticTxes { let txQueue: Tx[] = [] +export type RefreshListener = () => void + +const refreshListeners = new Set() + +export function addRefreshListener (r: RefreshListener): void { + refreshListeners.add(r) +} + /** * @public */ @@ -292,6 +300,9 @@ export async function refreshClient (clean: boolean): Promise { for (const q of globalQueries) { q.refreshClient() } + for (const listener of refreshListeners.values()) { + listener() + } } } diff --git a/packages/query/src/__tests__/query.test.ts b/packages/query/src/__tests__/query.test.ts index f8bd46df54..517dee02c6 100644 --- a/packages/query/src/__tests__/query.test.ts +++ b/packages/query/src/__tests__/query.test.ts @@ -18,6 +18,7 @@ import core, { createClient, Doc, generateId, + MeasureMetricsContext, Ref, SortingOrder, Space, @@ -757,6 +758,10 @@ describe('query', () => { ) }) + await new Promise((resolve) => { + setTimeout(resolve, 1) + }) + await factory.updateDoc(core.class.Space, core.space.Model, futureSpace, { name: '1' }) @@ -974,4 +979,44 @@ describe('query', () => { projects = await liveQuery.queryFind(test.mixin.TestProjectMixin, {}, { projection: { _id: 1 } }) expect(projects.length).toEqual(1) }) + + jest.setTimeout(25000) + it('test clone ops', async () => { + const { liveQuery, factory } = await getClient() + + const counter = 10000 + const ctx = new MeasureMetricsContext('tool', {}) + let data: Space[] = [] + const pp = new Promise((resolve) => { + liveQuery.query( + test.class.TestProject, + { private: false }, + (result) => { + data = result + if (data.length % 1000 === 0) { + console.info(data.length) + } + if (data.length === counter) { + resolve(null) + } + }, + {} + ) + }) + + for (let i = 0; i < counter; i++) { + await ctx.with('create-doc', {}, () => + factory.createDoc(test.class.TestProject, core.space.Space, { + archived: false, + description: '', + members: [], + private: false, + prjName: 'test project', + name: 'qwe' + }) + ) + } + expect(data.length).toBe(counter) + await pp + }) }) diff --git a/packages/query/src/index.ts b/packages/query/src/index.ts index c88bfe84c5..6977f58a0c 100644 --- a/packages/query/src/index.ts +++ b/packages/query/src/index.ts @@ -35,7 +35,6 @@ import core, { SearchResult, SortingQuery, Space, - Timestamp, Tx, TxCreateDoc, TxMixin, @@ -53,50 +52,29 @@ import core, { getObjectValue, matchQuery, reduceCalls, - resultSort, toFindResult } from '@hcengineering/core' import { PlatformError } from '@hcengineering/platform' import { deepEqual } from 'fast-equals' +import { Refs } from './refs' +import { ResultArray } from './results' +import { Callback, Query, type QueryId } from './types' -const CACHE_SIZE = 100 - -type Callback = (result: FindResult) => void - -type QueryId = number - -interface Query { - id: QueryId // uniq query identifier. - _class: Ref> - query: DocumentQuery - result: Doc[] | Promise - options?: FindOptions - total: number - callbacks: Map - - refresh: () => Promise -} - -interface DocumentRef { - doc: Doc - queries: QueryId[] - lastUsed: Timestamp -} +const CACHE_SIZE = 125 /** * @public */ export class LiveQuery implements WithTx, Client { private readonly client: Client - private readonly queries: Map>, Query[]> = new Map>, Query[]>() - private readonly queue: Query[] = [] + private readonly queries = new Map>, Map>() + private readonly queue = new Map() private queryCounter: number = 0 private closed: boolean = false - private readonly queriesToUpdate = new Map() + private readonly queriesToUpdate = new Map() - // A map of _class to documents. - private readonly documentRefs = new Map, DocumentRef>>() + private readonly refs = new Refs(() => this.getHierarchy()) constructor (client: Client) { this.client = client @@ -121,13 +99,13 @@ export class LiveQuery implements WithTx, Client { // Perform refresh of content since connection established. async refreshConnect (clean: boolean): Promise { - for (const q of [...this.queue]) { + for (const q of [...this.queue.values()]) { if (!this.removeFromQueue(q)) { try { if (clean) { this.cleanQuery(q) } - void this.refresh(q) + // No need to refresh, since it will be on next for } catch (err: any) { if (err instanceof PlatformError) { if (err.message === 'connection closed') { @@ -137,10 +115,13 @@ export class LiveQuery implements WithTx, Client { Analytics.handleError(err) console.error(err) } + } else { + // No callbacks, let's remove it on conenct + this.removeQueue(q) } } for (const v of this.queries.values()) { - for (const q of v) { + for (const q of v.values()) { try { if (clean) { this.cleanQuery(q) @@ -163,7 +144,7 @@ export class LiveQuery implements WithTx, Client { q.callbacks.forEach((callback) => { callback(toFindResult([], 0)) }) - q.result = [] + q.result = new ResultArray([], this.getHierarchy()) q.total = -1 } @@ -193,51 +174,14 @@ export class LiveQuery implements WithTx, Client { query: DocumentQuery, options?: FindOptions ): Query { - const callback: () => void = () => {} - const callbackId = generateId() - const q = this.createQuery(_class, query, { callback, callbackId }, options) - q.callbacks.delete(callbackId) - if (q.callbacks.size === 0) { - this.queue.push(q) + const q = this.createQuery(_class, query, undefined, options) + this.queue.set(q.id, { ...q, lastUsed: Date.now() }) + if (!(q.result instanceof Promise)) { + q.result.clean() } return q } - findFromDocs( - _class: Ref>, - query: DocumentQuery, - options?: FindOptions - ): FindResult | null { - const classKey = _class + ':' + JSON.stringify(options?.lookup ?? {}) - if (typeof query._id === 'string') { - // One document query - const doc = this.documentRefs.get(classKey)?.get(query._id)?.doc - if (doc !== undefined) { - const q = matchQuery([doc], query, _class, this.getHierarchy()) - if (q.length > 0) { - return toFindResult(this.clone([doc]), 1) as FindResult - } - } - } - if ( - options?.limit === 1 && - options.total !== true && - options?.sort === undefined && - options?.projection === undefined - ) { - const docs = this.documentRefs.get(classKey) - if (docs !== undefined) { - const _docs = Array.from(docs.values()).map((it) => it.doc) - - const q = matchQuery(_docs, query, _class, this.getHierarchy()) - if (q.length > 0) { - return toFindResult(this.clone([q[0]]), 1) as FindResult - } - } - } - return null - } - async findAll( _class: Ref>, query: DocumentQuery, @@ -246,9 +190,10 @@ export class LiveQuery implements WithTx, Client { if (this.client.getHierarchy().getDomain(_class) === DOMAIN_MODEL) { return await this.client.findAll(_class, query, options) } - if (options?.projection !== undefined) { - options.projection = { - ...options.projection, + const opt = { ...(options ?? {}) } + if (opt.projection !== undefined) { + opt.projection = { + ...opt.projection, _class: 1, space: 1, modifiedOn: 1 @@ -256,19 +201,20 @@ export class LiveQuery implements WithTx, Client { } // Perform one document queries if applicable. - const d = this.findFromDocs(_class, query, options) + const d = this.refs.findFromDocs(_class, query, opt) if (d !== null) { return d } - const q = this.findQuery(_class, query, options) ?? this.createDumpQuery(_class, query, options) + const q = this.findQuery(_class, query, opt) ?? this.createDumpQuery(_class, query, opt) if (q.result instanceof Promise) { q.result = await q.result } if (this.removeFromQueue(q, false)) { - this.queue.push(q) + this.queue.set(q.id, { ...q, lastUsed: Date.now() }) + q.result.clean() } - return toFindResult(this.clone(q.result), q.total) as FindResult + return toFindResult(q.result.getClone(), q.total) } searchFulltext (query: SearchQuery, options: SearchOptions): Promise { @@ -297,7 +243,7 @@ export class LiveQuery implements WithTx, Client { } options.limit = 1 - const d = this.findFromDocs(_class, query, options) + const d = this.refs.findFromDocs(_class, query, options) if (d !== null) { return d[0] } @@ -307,9 +253,10 @@ export class LiveQuery implements WithTx, Client { q.result = await q.result } if (this.removeFromQueue(q, false)) { - this.queue.push(q) + this.queue.set(q.id, { ...q, lastUsed: Date.now() }) + q.result.clean() } - return this.clone(q.result)[0] as WithLookup + return q.result.getClone>().shift() } private optionsCompare (opt1?: FindOptions, opt2?: FindOptions): boolean { @@ -318,28 +265,34 @@ export class LiveQuery implements WithTx, Client { return deepEqual(_opt1, _opt2) } + private queryCompare (q1: DocumentQuery, q2: DocumentQuery): boolean { + if (Object.keys(q1).length !== Object.keys(q2).length) { + return false + } + return deepEqual(q1, q2) + } + private findQuery( _class: Ref>, query: DocumentQuery, options?: FindOptions ): Query | undefined { - const queries = this.queries.get(_class) + const queries = this.getQueueMap(_class) if (queries === undefined) return - for (const q of queries) { - if (!deepEqual(query, q.query) || !this.optionsCompare(options, q.options)) continue + for (const q of queries.values()) { + if (!this.queryCompare(query, q.query) || !this.optionsCompare(options, q.options)) continue return q } } private removeFromQueue (q: Query, update = true): boolean { if (q.callbacks.size === 0) { - const queueIndex = this.queue.indexOf(q) - if (queueIndex !== -1) { - this.queue.splice(queueIndex, 1) + const removed = this.queue.delete(q.id) + if (removed) { if (update) { if (!(q.result instanceof Promise)) { - this.updateDocuments(q, q.result, true) + this.refs.updateDocuments(q, q.result.getDocs(), true) } } return true @@ -361,7 +314,7 @@ export class LiveQuery implements WithTx, Client { if (q.result instanceof Promise) { q.result = await q.result } - callback.callback(toFindResult(this.clone(q.result), q.total)) + callback.callback(toFindResult(q.result.getResult(callback.callbackId), q.total)) } }, 0) } @@ -384,27 +337,37 @@ export class LiveQuery implements WithTx, Client { } } + private getQueueMap (_class: Ref>): Map { + let cq = this.queries.get(_class) + if (cq === undefined) { + cq = new Map() + this.queries.set(_class, cq) + } + return cq + } + private createQuery( _class: Ref>, query: DocumentQuery, - callback: { callback: (result: FindResult) => void, callbackId: string }, + callback: { callback: (result: FindResult) => void, callbackId: string } | undefined, options?: FindOptions ): Query { - const queries = this.queries.get(_class) ?? [] - const localResult = this.findFromDocs(_class, query, options) + const localResult = this.refs.findFromDocs(_class, query, options) const result = localResult != null ? Promise.resolve(localResult) : this.client.findAll(_class, query, options) const q: Query = { id: ++this.queryCounter, _class, query, - result, + result: result.then((docs) => new ResultArray(docs, this.getHierarchy())), total: 0, options: options as FindOptions, callbacks: new Map(), refresh: reduceCalls(() => this.doRefresh(q)) } - q.callbacks.set(callback.callbackId, callback.callback as unknown as Callback) - queries.push(q) + if (callback !== undefined) { + q.callbacks.set(callback.callbackId, callback.callback as unknown as Callback) + } + this.getQueueMap(_class).set(q.id, q) result .then(async (result) => { q.total = result.total @@ -415,26 +378,29 @@ export class LiveQuery implements WithTx, Client { console.log('failed to update Live Query: ', err) }) - this.queries.set(_class, queries) - while (this.queue.length > CACHE_SIZE) { + if (this.queue.size > CACHE_SIZE) { this.remove() } return q } private remove (): void { - const q = this.queue.shift() - if (q === undefined) return - const queries = this.queries.get(q._class) - const pos = queries?.indexOf(q) ?? -1 - if (pos >= 0 && queries !== undefined) { - queries.splice(pos, 1) - if (!(q.result instanceof Promise)) { - this.updateDocuments(q, q.result, true) - } + const used = Array.from(this.queue.values()).sort((a, b) => a.lastUsed - b.lastUsed) + for (let i = 0; i < CACHE_SIZE / 10; i++) { + const q = used.shift() + if (q === undefined) return + this.removeQueue(q) } - if (queries?.length === 0) { - this.queries.delete(q._class) + } + + removeQueue (q: Query): void { + const queries = this.getQueueMap(q._class) + const removed = queries.delete(q.id) + this.queue.delete(q.id) + if (removed) { + if (!(q.result instanceof Promise)) { + this.refs.updateDocuments(q, q.result.getDocs(), true) + } } } @@ -460,7 +426,10 @@ export class LiveQuery implements WithTx, Client { return () => { q.callbacks.delete(callbackId) if (q.callbacks.size === 0) { - this.queue.push(q) + if (!(q.result instanceof Promise)) { + q.result.clean() + } + this.queue.set(q.id, { ...q, lastUsed: Date.now() }) } } } @@ -483,23 +452,18 @@ export class LiveQuery implements WithTx, Client { const q = this.createQuery( _class, query, - { - callback: () => { - // do nothing - }, - callbackId: generateId() - }, + undefined, // No need of callback options ) if (q.result instanceof Promise) { q.result = await q.result } - return toFindResult(this.clone(q.result), q.total) as FindResult + return toFindResult(q.result.getClone(), q.total) } if (current.result instanceof Promise) { current.result = await current.result } - return toFindResult(this.clone((current?.result as T[]) ?? []), current.total) + return toFindResult(current.result.getClone(), current.total) } private async checkSearch (q: Query, _id: Ref): Promise { @@ -512,20 +476,19 @@ export class LiveQuery implements WithTx, Client { await this.refresh(q) return true } else { - const pos = q.result.findIndex((p) => p._id === _id) - if (pos !== -1) { - const doc = q.result.splice(pos, 1) - this.updateDocuments(q, doc, true) + const doc = q.result.delete(_id) + if (doc !== undefined) { + this.refs.updateDocuments(q, [doc], true) if (q.options?.total === true) { q.total-- } } } } else { - const pos = q.result.findIndex((p) => p._id === _id) - if (pos !== -1) { - q.result[pos] = match - this.updateDocuments(q, [match]) + const doc = q.result.findDoc(_id) + if (doc !== undefined) { + q.result.updateDoc(match, false) + this.refs.updateDocuments(q, [match]) } } return false @@ -563,20 +526,21 @@ export class LiveQuery implements WithTx, Client { q.result = await q.result } - const pos = q.result.findIndex((p) => p._id === _id) + const pos = q.result.findDoc(_id) if (current !== undefined && this.match(q, current)) { - q.result[pos] = current - this.updateDocuments(q, [current]) + q.result.updateDoc(current) + this.refs.updateDocuments(q, [current]) } else { if (q.options?.limit === q.result.length) { await this.refresh(q) return true - } else if (pos !== -1) { - const doc = q.result.splice(pos, 1) - this.updateDocuments(q, doc, true) + } else if (pos !== undefined) { + q.result.delete(_id) + this.refs.updateDocuments(q, [pos], true) if (q.options?.total === true) { q.total-- } + return true } } return false @@ -593,25 +557,21 @@ export class LiveQuery implements WithTx, Client { await this.__updateLookup(q, updatedDoc, ops) } - private async checkUpdatedDocMatch (q: Query, updatedDoc: WithLookup): Promise { - if (q.result instanceof Promise) { - q.result = await q.result - } - const pos = q.result.findIndex((p) => p._id === updatedDoc._id) + private checkUpdatedDocMatch (q: Query, result: ResultArray, updatedDoc: WithLookup): boolean { if (!this.match(q, updatedDoc)) { - if (q.options?.limit === q.result.length) { - await this.refresh(q) + if (q.options?.limit === result.length) { + void this.refresh(q) return true } else { - const doc = q.result.splice(pos, 1) - this.updateDocuments(q, doc, true) + result.delete(updatedDoc._id) + this.refs.updateDocuments(q, [updatedDoc], true) if (q.options?.total === true) { q.total-- } } } else { - q.result[pos] = updatedDoc - this.updateDocuments(q, [updatedDoc]) + result.updateDoc(updatedDoc, false) + this.refs.updateDocuments(q, [updatedDoc]) } return false } @@ -619,20 +579,23 @@ export class LiveQuery implements WithTx, Client { protected async txMixin (tx: TxMixin, docCache: Map): Promise { const hierarchy = this.client.getHierarchy() - for (const queries of this.queries) { + for (const queries of this.queries.entries()) { const isTx = hierarchy.isDerived(queries[0], core.class.Tx) - for (const q of queries[1]) { + for (const q of queries[1].values()) { if (isTx) { // handle add since Txes are immutable - await this.handleDocAdd(q, tx, true, docCache) + if (this.match(q, tx, q.options?.lookup !== undefined)) { + await this.handleDocAdd(q, tx, true, docCache) + } + await this.handleDocAddLookup(q, tx) continue } if (q.result instanceof Promise) { q.result = await q.result } - const pos = q.result.findIndex((p) => p._id === tx.objectId) - if (pos !== -1) { + const updatedDoc = q.result.findDoc(tx.objectId) + if (updatedDoc !== undefined) { // If query contains search we must check use fulltext if (q.query.$search != null && q.query.$search.length > 0) { const searchRefresh = await this.checkSearch(q, tx.objectId) @@ -640,10 +603,9 @@ export class LiveQuery implements WithTx, Client { continue } } else { - const updatedDoc = q.result[pos] if (updatedDoc.modifiedOn < tx.modifiedOn) { await this.__updateMixinDoc(q, updatedDoc, tx) - const updateRefresh = await this.checkUpdatedDocMatch(q, updatedDoc) + const updateRefresh = this.checkUpdatedDocMatch(q, q.result, updatedDoc) if (updateRefresh) { continue } @@ -655,13 +617,16 @@ export class LiveQuery implements WithTx, Client { } } await this.sort(q, tx) - const udoc = q.result.find((p) => p._id === tx.objectId) - await this.updatedDocCallback(udoc, q) + const udoc = q.result.findDoc(tx.objectId) + await this.updatedDocCallback(q, q.result, udoc) } else if (queries[0] === tx.mixin) { // Mixin potentially added to object we doesn't have in out results const doc = await this.client.findOne(q._class, { ...q.query, _id: tx.objectId }, q.options) if (doc !== undefined) { - await this.handleDocAdd(q, doc, false, docCache) + if (this.match(q, doc, q.options?.lookup !== undefined)) { + await this.handleDocAdd(q, doc, false, docCache) + } + await this.handleDocAddLookup(q, doc) } } await this.handleDocUpdateLookup(q, tx) @@ -671,12 +636,16 @@ export class LiveQuery implements WithTx, Client { } async txUpdateDoc (tx: TxUpdateDoc, docCache: Map): Promise { - for (const queries of this.queries) { + for (const queries of this.queries.entries()) { const isTx = this.client.getHierarchy().isDerived(queries[0], core.class.Tx) - for (const q of queries[1]) { + for (const q of queries[1].values()) { if (isTx) { // handle add since Txes are immutable - await this.handleDocAdd(q, tx, true, docCache) + // await this.handleDocAdd(q, tx, true, docCache) + if (this.match(q, tx, q.options?.lookup !== undefined)) { + await this.handleDocAdd(q, tx, true, docCache) + } + await this.handleDocAddLookup(q, tx) continue } await this.handleDocUpdate(q, tx, docCache) @@ -689,30 +658,33 @@ export class LiveQuery implements WithTx, Client { if (q.result instanceof Promise) { q.result = await q.result } - const pos = q.result.findIndex((p) => p._id === tx.objectId) - if (pos !== -1) { + const updatedDoc = q.result.findDoc(tx.objectId) + if (updatedDoc !== undefined) { // If query contains search we must check use fulltext if (q.query.$search != null && q.query.$search.length > 0) { const searchRefresh = await this.checkSearch(q, tx.objectId) if (searchRefresh) return } else { - const updatedDoc = q.result[pos] if (updatedDoc.modifiedOn < tx.modifiedOn) { await this.__updateDoc(q, updatedDoc, tx) - const updateRefresh = await this.checkUpdatedDocMatch(q, updatedDoc) - if (updateRefresh) return + const updateRefresh = this.checkUpdatedDocMatch(q, q.result, updatedDoc) + if (updateRefresh) { + return + } } else { const currentRefresh = await this.getCurrentDoc(q, updatedDoc._id, updatedDoc.space, docCache) - if (currentRefresh) return + if (currentRefresh) { + return + } } } await this.sort(q, tx) - const udoc = q.result.find((p) => p._id === tx.objectId) - await this.updatedDocCallback(udoc, q) - } else if (await this.matchQuery(q, tx, docCache)) { + const udoc = q.result.findDoc(tx.objectId) + await this.updatedDocCallback(q, q.result, udoc) + } else if (this.matchQuerySync(q, tx) && (await this.matchQuery(q, tx, docCache))) { await this.sort(q, tx) - const udoc = q.result.find((p) => p._id === tx.objectId) - await this.updatedDocCallback(udoc, q) + const udoc = q.result.findDoc(tx.objectId) + await this.updatedDocCallback(q, q.result, udoc) } else if ( this.client.getHierarchy().isDerived(tx.objectClass, q._class) && q.options?.total === true && @@ -736,14 +708,14 @@ export class LiveQuery implements WithTx, Client { if (needCallback) { if (q.options?.sort !== undefined) { - await resultSort(q.result, q.options?.sort, q._class, this.getHierarchy(), this.client.getModel()) + q.result.sort(q._class, q.options.sort, this.getHierarchy(), this.client.getModel()) } await this.callback(q, true) } } private async processLookupUpdateDoc ( - docs: Doc[], + docs: ResultArray, lookup: Lookup, tx: TxUpdateDoc | TxMixin ): Promise { @@ -751,7 +723,7 @@ export class LiveQuery implements WithTx, Client { const lookupWays = this.getLookupWays(lookup, tx.objectClass) for (const lookupWay of lookupWays) { const [objWay, key, reverseLookupKey] = lookupWay - for (const resDoc of docs) { + for (const resDoc of docs.getDocs()) { const obj = getObjectValue(objWay, resDoc) if (obj === undefined) continue const value = getObjectValue('$lookup.' + key, obj) @@ -768,6 +740,7 @@ export class LiveQuery implements WithTx, Client { value.splice(index, 1) index = -1 needCallback = true + docs.updateDoc(resDoc, false) } else if (index === -1 && reverseLookupValue === obj._id) { const doc = await this.findOne(tx.objectClass, { _id: tx.objectId }) if (doc !== undefined) { @@ -775,6 +748,7 @@ export class LiveQuery implements WithTx, Client { index = value.length - 1 } needCallback = true + docs.updateDoc(resDoc, false) } } } @@ -785,6 +759,7 @@ export class LiveQuery implements WithTx, Client { TxProcessor.updateDoc2Doc(value[index], tx as TxUpdateDoc) } needCallback = true + docs.updateDoc(resDoc, false) } } else { if (obj[key] === tx.objectId) { @@ -795,6 +770,7 @@ export class LiveQuery implements WithTx, Client { TxProcessor.updateDoc2Doc(obj.$lookup[key], tx as TxUpdateDoc) } needCallback = true + docs.updateDoc(resDoc, false) } } } @@ -803,13 +779,6 @@ export class LiveQuery implements WithTx, Client { return needCallback } - /** - * Clone document with respect to mixin inner document cloning. - */ - private clone(results: T[]): T[] { - return this.getHierarchy().clone(results) as T[] - } - private async refresh (q: Query): Promise { this.queriesToUpdate.delete(q.id) await q.refresh() @@ -818,19 +787,22 @@ export class LiveQuery implements WithTx, Client { private async doRefresh (q: Query): Promise { const res = await this.client.findAll(q._class, q.query, q.options) if (!deepEqual(res, q.result) || (res.total !== q.total && q.options?.total === true)) { - q.result = res + q.result = new ResultArray(res, this.getHierarchy()) q.total = res.total await this.callback(q) } } - // Check if query is partially matched. - private async matchQuery (q: Query, tx: TxUpdateDoc, docCache: Map): Promise { + private matchQuerySync (q: Query, tx: TxUpdateDoc): boolean { const clazz = this.getHierarchy().isMixin(q._class) ? this.getHierarchy().getBaseClass(q._class) : q._class if (!this.client.getHierarchy().isDerived(tx.objectClass, clazz)) { return false } + return true + } + // Check if query is partially matched. + private async matchQuery (q: Query, tx: TxUpdateDoc, docCache: Map): Promise { const doc: Doc = { _id: tx.objectId, _class: tx.objectClass, @@ -881,10 +853,10 @@ export class LiveQuery implements WithTx, Client { q.result = await q.result } const doc = res[0] - const pos = q.result.findIndex((el) => el._id === doc._id) - if (pos !== -1) { - q.result[pos] = doc - this.updateDocuments(q, [doc]) + const pos = q.result.findDoc(doc._id) + if (pos !== undefined) { + q.result.updateDoc(doc) + this.refs.updateDocuments(q, [doc]) } else { q.result.push(doc) if (q.options?.total === true) { @@ -960,61 +932,62 @@ export class LiveQuery implements WithTx, Client { protected async txCreateDoc (tx: TxCreateDoc, docCache: Map): Promise { const docTx = TxProcessor.createDoc2Doc(tx) - for (const queries of this.queries) { + for (const queries of this.queries.entries()) { const doc = this.client.getHierarchy().isDerived(queries[0], core.class.Tx) ? tx : docTx - for (const q of queries[1]) { - await this.handleDocAdd(q, doc, true, docCache) + for (const q of queries[1].values()) { + // await this.handleDocAdd(q, doc, true, docCache) + if (this.match(q, doc, q.options?.lookup !== undefined)) { + await this.handleDocAdd(q, doc, true, docCache) + } + + await this.handleDocAddLookup(q, doc) } } return {} } private async handleDocAdd (q: Query, doc: Doc, handleLookup = true, docCache: Map): Promise { - if (this.match(q, doc, q.options?.lookup !== undefined)) { - let needPush = true - if (q.result instanceof Promise) { - q.result = await q.result - } - if (q.options?.lookup !== undefined && handleLookup) { - await this.lookup(q._class, doc, q.options.lookup) - const matched = this.match(q, doc) - if (!matched) needPush = false - } - if (needPush) { - // We could already have document inside results, if query is created during processing of document create transaction and not yet handled on client. - const pos = q.result.findIndex((p) => p._id === doc._id) - if (pos >= 0) { - // No need to update, document already in results. - needPush = false - } - } - if (needPush) { - // If query contains search we must check use fulltext - if (q.query.$search != null && q.query.$search.length > 0) { - const match = await this.client.findOne(q._class, { $search: q.query.$search, _id: doc._id }, q.options) - if (match === undefined) return - } - - q.result.push(doc) - if (q.options?.total === true) { - q.total++ - } - - if (q.options?.sort !== undefined) { - await resultSort(q.result, q.options?.sort, q._class, this.getHierarchy(), this.client.getModel()) - } - - if (q.options?.limit !== undefined && q.result.length > q.options.limit) { - if (q.result.pop()?._id !== doc._id || q.options?.total === true) { - await this.callback(q, true) - } - } else { - await this.callback(q, true) - } + let needPush = true + if (q.result instanceof Promise) { + q.result = await q.result + } + if (q.options?.lookup !== undefined && handleLookup) { + await this.lookup(q._class, doc, q.options.lookup) + const matched = this.match(q, doc) + if (!matched) needPush = false + } + if (needPush) { + // We could already have document inside results, if query is created during processing of document create transaction and not yet handled on client. + const pos = q.result.findDoc(doc._id) + if (pos !== undefined) { + // No need to update, document already in results. + needPush = false } } + if (needPush) { + // If query contains search we must check use fulltext + if (q.query.$search != null && q.query.$search.length > 0) { + const match = await this.client.findOne(q._class, { $search: q.query.$search, _id: doc._id }, q.options) + if (match === undefined) return + } - await this.handleDocAddLookup(q, doc) + q.result.push(doc) + if (q.options?.total === true) { + q.total++ + } + + if (q.options?.sort !== undefined) { + q.result.sort(q._class, q.options.sort, this.getHierarchy(), this.client.getModel()) + } + + if (q.options?.limit !== undefined && q.result.length > q.options.limit) { + if (q.result.pop()?._id !== doc._id || q.options?.total === true) { + await this.callback(q, true) + } + } else { + await this.callback(q, true) + } + } } private async callback (q: Query, bulkUpdate = false): Promise { @@ -1022,45 +995,14 @@ export class LiveQuery implements WithTx, Client { q.result = await q.result } - this.updateDocuments(q, q.result) - const result = q.result if (bulkUpdate) { - this.queriesToUpdate.set(q.id, [q, result]) + this.queriesToUpdate.set(q.id, q) } else { this.queriesToUpdate.delete(q.id) - Array.from(q.callbacks.values()).forEach((callback) => { - callback(toFindResult(this.clone(result), q.total)) - }) - } - } - - private updateDocuments (q: Query, docs: Doc[], clean: boolean = false): void { - if (q.options?.projection !== undefined) { - return - } - for (const d of docs) { - const classKey = Hierarchy.mixinOrClass(d) + ':' + JSON.stringify(q.options?.lookup ?? {}) - let docMap = this.documentRefs.get(classKey) - if (docMap === undefined) { - if (clean) { - continue - } - docMap = new Map() - this.documentRefs.set(classKey, docMap) - } - const queries = (docMap.get(d._id)?.queries ?? []).filter((it) => it !== q.id) - if (!clean) { - queries.push(q.id) - } - if (queries.length === 0) { - docMap.delete(d._id) - } else { - const q = docMap.get(d._id) - if ((q?.lastUsed ?? 0) < d.modifiedOn) { - docMap.set(d._id, { ...(q ?? {}), doc: d, queries, lastUsed: d.modifiedOn }) - } + for (const [id, callback] of q.callbacks.entries()) { + callback(toFindResult(result.getResult(id), q.total)) } } } @@ -1076,18 +1018,18 @@ export class LiveQuery implements WithTx, Client { if (needCallback) { if (q.options?.sort !== undefined) { - await resultSort(q.result, q.options?.sort, q._class, this.getHierarchy(), this.getModel()) + q.result.sort(q._class, q.options.sort, this.getHierarchy(), this.client.getModel()) } await this.callback(q, true) } } - private proccesLookupAddDoc (docs: Doc[], lookup: Lookup, doc: Doc): boolean { + private proccesLookupAddDoc (docs: ResultArray, lookup: Lookup, doc: Doc): boolean { let needCallback = false const lookupWays = this.getLookupWays(lookup, doc._class) for (const lookupWay of lookupWays) { const [objWay, key, reverseLookupKey] = lookupWay - for (const resDoc of docs) { + for (const resDoc of docs.getDocs()) { const obj = getObjectValue(objWay, resDoc) if (obj === undefined) continue let value = getObjectValue('$lookup.' + key, obj) @@ -1095,6 +1037,8 @@ export class LiveQuery implements WithTx, Client { if (value == null && reverseCheck) { value = [] obj.$lookup[key] = value + needCallback = true + docs.updateDoc(resDoc, false) } if (Array.isArray(value)) { if (this.client.getHierarchy().isDerived(doc._class, core.class.AttachedDoc) && reverseCheck) { @@ -1105,11 +1049,13 @@ export class LiveQuery implements WithTx, Client { value[idx] = doc } needCallback = true + docs.updateDoc(resDoc, false) } } else { if (obj[key] === doc._id) { obj.$lookup[key] = doc needCallback = true + docs.updateDoc(resDoc, false) } } } @@ -1118,12 +1064,17 @@ export class LiveQuery implements WithTx, Client { } protected async txRemoveDoc (tx: TxRemoveDoc, docCache: Map): Promise { - for (const queries of this.queries) { + for (const queries of this.queries.entries()) { const isTx = this.client.getHierarchy().isDerived(queries[0], core.class.Tx) - for (const q of queries[1]) { + for (const q of queries[1].values()) { if (isTx) { // handle add since Txes are immutable - await this.handleDocAdd(q, tx, true, docCache) + // await this.handleDocAdd(q, tx, true, docCache) + if (this.match(q, tx, q.options?.lookup !== undefined)) { + await this.handleDocAdd(q, tx, true, docCache) + } + + await this.handleDocAddLookup(q, tx) continue } await this.handleDocRemove(q, tx) @@ -1145,10 +1096,10 @@ export class LiveQuery implements WithTx, Client { await this.refresh(q) return } - const index = q.result.findIndex((p) => p._id === tx.objectId && h.isDerived(p._class, tx.objectClass)) - if (index > -1) { - const doc = q.result.splice(index, 1) - this.updateDocuments(q, doc, true) + const index = q.result.getDocs().find((p) => p._id === tx.objectId && h.isDerived(p._class, tx.objectClass)) + if (index !== undefined) { + q.result.delete(index._id) + this.refs.updateDocuments(q, [index], true) if (q.options?.total === true) { q.total-- @@ -1169,7 +1120,7 @@ export class LiveQuery implements WithTx, Client { for (const lookupWay of lookupWays) { const [objWay, key] = lookupWay const docs = q.result - for (const doc of docs) { + for (const doc of docs.getDocs()) { const obj = getObjectValue(objWay, doc) if (obj === undefined) continue const value = getObjectValue('$lookup.' + key, obj) @@ -1179,18 +1130,20 @@ export class LiveQuery implements WithTx, Client { if (index !== -1) { value.splice(index, 1) needCallback = true + docs.updateDoc(doc, false) } } else { if (value._id === tx.objectId) { obj.$lookup[key] = undefined needCallback = true + docs.updateDoc(doc, false) } } } } if (needCallback) { if (q.options?.sort !== undefined) { - await resultSort(q.result, q.options?.sort, q._class, this.getHierarchy(), this.getModel()) + q.result.sort(q._class, q.options.sort, this.getHierarchy(), this.client.getModel()) } await this.callback(q, true) } @@ -1269,10 +1222,14 @@ export class LiveQuery implements WithTx, Client { const copy = new Map(this.queriesToUpdate) this.queriesToUpdate.clear() - for (const [q, res] of copy.values()) { - Array.from(q.callbacks.values()).forEach((callback) => { - callback(toFindResult(this.clone(res), q.total)) - }) + for (const q of copy.values()) { + if (q.result instanceof Promise) { + q.result = await q.result + } + const qr = q.result + for (const [id, callback] of q.callbacks.entries()) { + callback(toFindResult(qr.getResult(id), q.total)) + } } } return result @@ -1285,7 +1242,7 @@ export class LiveQuery implements WithTx, Client { } if (evt.event === WorkspaceEvent.IndexingUpdate) { const indexingParam = evt.params as IndexingUpdateEvent - for (const q of [...this.queue]) { + for (const q of [...this.queue.values()]) { if (hasClass(q, indexingParam._class) && q.query.$search !== undefined) { if (!this.removeFromQueue(q)) { try { @@ -1295,19 +1252,12 @@ export class LiveQuery implements WithTx, Client { console.error(err) } } else { - const queries = this.queries.get(q._class) - const pos = queries?.indexOf(q) ?? -1 - if (pos >= 0 && queries !== undefined) { - queries.splice(pos, 1) - if (queries?.length === 0) { - this.queries.delete(q._class) - } - } + this.removeQueue(q) } } } for (const v of this.queries.values()) { - for (const q of v) { + for (const q of v.values()) { if (hasClass(q, indexingParam._class) && q.query.$search !== undefined) { try { await this.refresh(q) @@ -1321,7 +1271,7 @@ export class LiveQuery implements WithTx, Client { } if (evt.event === WorkspaceEvent.BulkUpdate) { const params = evt.params as BulkUpdateEvent - for (const q of [...this.queue]) { + for (const q of [...this.queue.values()]) { if (hasClass(q, params._class)) { if (!this.removeFromQueue(q)) { try { @@ -1334,7 +1284,7 @@ export class LiveQuery implements WithTx, Client { } } for (const v of this.queries.values()) { - for (const q of v) { + for (const q of v.values()) { if (hasClass(q, params._class)) { try { await this.refresh(q) @@ -1350,7 +1300,7 @@ export class LiveQuery implements WithTx, Client { private async changePrivateHandler (evt: TxWorkspaceEvent): Promise { if (evt.event === WorkspaceEvent.SecurityChange) { - for (const q of [...this.queue]) { + for (const q of [...this.queue.values()]) { if (typeof q.query.space !== 'string' || q.query.space === evt.objectSpace) { if (!this.removeFromQueue(q)) { try { @@ -1363,7 +1313,7 @@ export class LiveQuery implements WithTx, Client { } } for (const v of this.queries.values()) { - for (const q of v) { + for (const q of v.values()) { if (typeof q.query.space !== 'string' || q.query.space === evt.objectSpace) { try { await this.refresh(q) @@ -1470,7 +1420,12 @@ export class LiveQuery implements WithTx, Client { let needSort = sort.modifiedBy !== undefined || sort.modifiedOn !== undefined if (!needSort) needSort = this.checkNeedSort(sort, tx) - if (needSort) await resultSort(q.result as Doc[], sort, q._class, this.getHierarchy(), this.client.getModel()) + if (needSort) { + if (q.result instanceof Promise) { + q.result = await q.result + } + q.result.sort(q._class, sort, this.getHierarchy(), this.client.getModel()) + } } private checkNeedSort (sort: SortingQuery, tx: TxUpdateDoc | TxMixin): boolean { @@ -1490,19 +1445,17 @@ export class LiveQuery implements WithTx, Client { return false } - private async updatedDocCallback (updatedDoc: Doc | undefined, q: Query): Promise { - q.result = q.result as Doc[] - - if (q.options?.limit !== undefined && q.result.length > q.options.limit) { + private async updatedDocCallback (q: Query, res: ResultArray, updatedDoc: Doc | undefined): Promise { + if (q.options?.limit !== undefined && res.length > q.options.limit) { if (updatedDoc === undefined) { await this.refresh(q) return } - if (q.result[q.options?.limit]._id === updatedDoc._id) { + if (res.getDocs()[q.options?.limit]._id === updatedDoc._id) { await this.refresh(q) return } - if (q.result.pop()?._id !== updatedDoc._id) { + if (res.pop()?._id !== updatedDoc._id) { await this.callback(q, true) } } else { diff --git a/packages/query/src/refs.ts b/packages/query/src/refs.ts new file mode 100644 index 0000000000..ae22a3f073 --- /dev/null +++ b/packages/query/src/refs.ts @@ -0,0 +1,91 @@ +import { + clone, + Hierarchy, + matchQuery, + toFindResult, + type Class, + type Doc, + type DocumentQuery, + type FindOptions, + type FindResult, + type Ref, + type Timestamp +} from '@hcengineering/core' +import type { Query, QueryId } from './types' + +export interface DocumentRef { + doc: Doc + queries: QueryId[] + lastUsed: Timestamp +} + +export class Refs { + // A map of _class to documents. + private readonly documentRefs = new Map, DocumentRef>>() + + constructor (readonly getHierarchy: () => Hierarchy) {} + + public updateDocuments (q: Query, docs: Doc[], clean: boolean = false): void { + if (q.options?.projection !== undefined) { + return + } + for (const d of docs) { + const classKey = Hierarchy.mixinOrClass(d) + ':' + JSON.stringify(q.options?.lookup ?? {}) + let docMap = this.documentRefs.get(classKey) + if (docMap === undefined) { + if (clean) { + continue + } + docMap = new Map() + this.documentRefs.set(classKey, docMap) + } + const queries = (docMap.get(d._id)?.queries ?? []).filter((it) => it !== q.id) + if (!clean) { + queries.push(q.id) + } + if (queries.length === 0) { + docMap.delete(d._id) + } else { + const q = docMap.get(d._id) + if ((q?.lastUsed ?? 0) < d.modifiedOn) { + docMap.set(d._id, { ...(q ?? {}), doc: d, queries, lastUsed: d.modifiedOn }) + } + } + } + } + + public findFromDocs( + _class: Ref>, + query: DocumentQuery, + options?: FindOptions + ): FindResult | null { + const classKey = _class + ':' + JSON.stringify(options?.lookup ?? {}) + if (typeof query._id === 'string') { + // One document query + const doc = this.documentRefs.get(classKey)?.get(query._id)?.doc + if (doc !== undefined) { + const q = matchQuery([doc], query, _class, this.getHierarchy()) + if (q.length > 0) { + return toFindResult(clone([doc]), 1) + } + } + } + if ( + options?.limit === 1 && + options.total !== true && + options?.sort === undefined && + options?.projection === undefined + ) { + const docs = this.documentRefs.get(classKey) + if (docs !== undefined) { + const _docs = Array.from(docs.values()).map((it) => it.doc) + + const q = matchQuery(_docs, query, _class, this.getHierarchy()) + if (q.length > 0) { + return toFindResult(clone([q[0]]), 1) + } + } + } + return null + } +} diff --git a/packages/query/src/results.ts b/packages/query/src/results.ts new file mode 100644 index 0000000000..f9b2dbfe44 --- /dev/null +++ b/packages/query/src/results.ts @@ -0,0 +1,99 @@ +import { + resultSort, + type Class, + type Doc, + type Hierarchy, + type MemDb, + type Ref, + type SortingQuery +} from '@hcengineering/core' + +export class ResultArray { + private docs: Map, Doc> + + private readonly clones = new Map, Doc>>() + + get length (): number { + return this.docs.size + } + + constructor ( + docs: Doc[], + readonly hierarchy: Hierarchy + ) { + this.docs = new Map(docs.map((it) => [it._id, it])) + } + + clean (): void { + this.clones.clear() + } + + getDocs (): Doc[] { + return Array.from(this.docs.values()) + } + + findDoc (_id: Ref): Doc | undefined { + return this.docs.get(_id) + } + + getClone(): T[] { + return this.hierarchy.clone(this.getDocs()) + } + + getResult (id: string): Doc[] { + // Lets form a new list based on clones we have already. + const info = this.clones.get(id) + if (info === undefined) { + const docs = this.getClone() + this.clones.set(id, new Map(docs.map((it) => [it._id, it]))) + return docs + } else { + return Array.from(info.values()) + } + } + + delete (_id: Ref): Doc | undefined { + const doc = this.docs.get(_id) + this.docs.delete(_id) + for (const [, v] of this.clones.entries()) { + v.delete(_id) + } + return doc + } + + updateDoc (doc: Doc, mainClone = true): void { + this.docs.set(doc._id, mainClone ? this.hierarchy.clone(doc) : doc) + for (const [, v] of this.clones.entries()) { + v.set(doc._id, this.hierarchy.clone(doc)) + } + } + + push (doc: Doc): void { + this.docs.set(doc._id, this.hierarchy.clone(doc)) + for (const [, v] of this.clones.entries()) { + v.set(doc._id, this.hierarchy.clone(doc)) + } + // this.changes.add(doc._id) + } + + pop (): Doc | undefined { + const lastElement = Array.from(this.docs)[this.docs.size - 1] + if (lastElement !== undefined) { + this.docs.delete(lastElement[0]) + for (const [, v] of this.clones.entries()) { + v.delete(lastElement[0]) + } + return lastElement[1] + } + return undefined + } + + sort(_class: Ref>, sort: SortingQuery, hierarchy: Hierarchy, memdb: MemDb): void { + const docs = Array.from(this.docs.values()) + resultSort(docs, sort, _class, hierarchy, memdb) + this.docs = new Map(docs.map((it) => [it._id, it])) + for (const [k, v] of this.clones.entries()) { + this.clones.set(k, new Map(docs.map((it) => [it._id, v.get(it._id) ?? this.hierarchy.clone(it)]))) + } + } +} diff --git a/packages/query/src/types.ts b/packages/query/src/types.ts new file mode 100644 index 0000000000..3628782acd --- /dev/null +++ b/packages/query/src/types.ts @@ -0,0 +1,17 @@ +import type { Class, Doc, DocumentQuery, FindOptions, FindResult, Ref } from '@hcengineering/core' +import type { ResultArray } from './results' + +export type Callback = (result: FindResult) => void + +export type QueryId = number +export interface Query { + id: QueryId // uniq query identifier. + _class: Ref> + query: DocumentQuery + result: ResultArray | Promise + options?: FindOptions + total: number + callbacks: Map + + refresh: () => Promise +} diff --git a/plugins/notification-resources/src/inboxNotificationsClient.ts b/plugins/notification-resources/src/inboxNotificationsClient.ts index 8f41bfbb31..5b4170499c 100644 --- a/plugins/notification-resources/src/inboxNotificationsClient.ts +++ b/plugins/notification-resources/src/inboxNotificationsClient.ts @@ -63,14 +63,14 @@ export class InboxNotificationsClientImpl implements InboxNotificationsClient { ) readonly inboxNotificationsByContext = derived( - [this.contexts, this.inboxNotifications], - ([notifyContexts, inboxNotifications]) => { - if (inboxNotifications.length === 0 || notifyContexts.length === 0) { + [this.contextById, this.inboxNotifications], + ([contextById, inboxNotifications]) => { + if (inboxNotifications.length === 0 || contextById.size === 0) { return new Map, InboxNotification[]>() } return inboxNotifications.reduce((result, notification) => { - const notifyContext = notifyContexts.find(({ _id }) => _id === notification.docNotifyContext) + const notifyContext = contextById.get(notification.docNotifyContext) if (notifyContext === undefined) { return result diff --git a/plugins/view-resources/src/actions.ts b/plugins/view-resources/src/actions.ts index d190b5dbc9..3c0cbc5d1e 100644 --- a/plugins/view-resources/src/actions.ts +++ b/plugins/view-resources/src/actions.ts @@ -27,7 +27,7 @@ import core, { type WithLookup } from '@hcengineering/core' import { getResource } from '@hcengineering/platform' -import { getClient } from '@hcengineering/presentation' +import { addRefreshListener, getClient } from '@hcengineering/presentation' import { getEventPositionElement, showPopup } from '@hcengineering/ui' import { type Action, @@ -54,6 +54,12 @@ export function getSelection (focus: FocusSelection, selection: SelectionStore): return docs } +const allActions = new Map() + +addRefreshListener(() => { + allActions.clear() +}) + /** * @public * @@ -68,9 +74,11 @@ export async function getActions ( derived: Ref> = core.class.Doc, mode: ViewContextType = 'context' ): Promise { - const actions: Action[] = await client.findAll(view.class.Action, { - 'context.mode': mode - }) + let actions: Action[] | undefined = allActions.get(mode) + if (actions === undefined) { + actions = client.getModel().findAllSync(view.class.Action, { 'context.mode': mode }) + allActions.set(mode, actions) + } const filteredActions = await filterAvailableActions(actions, client, doc, derived) diff --git a/plugins/view-resources/src/components/filter/ObjectFilter.svelte b/plugins/view-resources/src/components/filter/ObjectFilter.svelte index bfdaf5c32d..56895ac5dd 100644 --- a/plugins/view-resources/src/components/filter/ObjectFilter.svelte +++ b/plugins/view-resources/src/components/filter/ObjectFilter.svelte @@ -13,8 +13,17 @@ // limitations under the License. -->