UBERF-4319: Fix performance issues (#4631)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-02-14 22:23:22 +07:00 committed by GitHub
parent 22093e7296
commit 9d7c85813c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 246 additions and 182 deletions

View File

@ -875,6 +875,9 @@ dependencies:
browserslist:
specifier: 4.21.5
version: 4.21.5
bson:
specifier: ^6.3.0
version: 6.3.0
bufferutil:
specifier: ^4.0.7
version: 4.0.7
@ -7533,8 +7536,8 @@ packages:
node-int64: 0.4.0
dev: false
/bson@6.2.0:
resolution: {integrity: sha512-ID1cI+7bazPDyL9wYy9GaQ8gEEohWvcUl/Yf0dIdutJxnmInEEyCsb4awy/OiBfall7zBA179Pahi3vCdFze3Q==}
/bson@6.3.0:
resolution: {integrity: sha512-balJfqwwTBddxfnidJZagCBPP/f48zj9Sdp3OJswREOgsJzHiQSaOIAtApSgDQFYgHqAvFkp53AFSqjMDZoTFw==}
engines: {node: '>=16.20.1'}
dev: false
@ -12286,7 +12289,7 @@ packages:
optional: true
dependencies:
'@mongodb-js/saslprep': 1.1.0
bson: 6.2.0
bson: 6.3.0
mongodb-connection-string-url: 3.0.0
dev: false
@ -19517,7 +19520,7 @@ packages:
dev: false
file:projects/mongo.tgz(esbuild@0.16.17)(svelte@4.2.5)(ts-node@10.9.1):
resolution: {integrity: sha512-If093Z1+UW9cKO/z3F2Axqu70SNjzaIR++Bitfh7f8Q5uXfu0gLG2xbXNnX0tXVTgc4ONTEzlWVcDVQTd15YWA==, tarball: file:projects/mongo.tgz}
resolution: {integrity: sha512-btz2YaZZ+BrjtiwUcLbTtQyWwyIJjMaoAVGrJABTfbKJCdQ6C4Har5Bq3fdNXvRJs6yCop9hUCcBbkxNHf2s1w==, tarball: file:projects/mongo.tgz}
id: file:projects/mongo.tgz
name: '@rush-temp/mongo'
version: 0.0.0
@ -19526,6 +19529,7 @@ packages:
'@types/node': 20.11.16
'@typescript-eslint/eslint-plugin': 6.11.0(@typescript-eslint/parser@6.11.0)(eslint@8.54.0)(typescript@5.3.3)
'@typescript-eslint/parser': 6.11.0(eslint@8.54.0)(typescript@5.3.3)
bson: 6.3.0
eslint: 8.54.0
eslint-config-standard-with-typescript: 40.0.0(@typescript-eslint/eslint-plugin@6.11.0)(eslint-plugin-import@2.28.1)(eslint-plugin-n@15.7.0)(eslint-plugin-promise@6.1.1)(eslint@8.54.0)(typescript@5.3.3)
eslint-plugin-import: 2.28.1(eslint@8.54.0)

View File

@ -204,4 +204,8 @@ export function createModel (builder: Builder): void {
]
}
)
builder.mixin(core.class.Space, core.class.Class, core.mixin.FullTextSearchContext, {
childProcessingAllowed: false
})
}

View File

@ -1655,12 +1655,14 @@ export function createModel (builder: Builder): void {
// Allow to use fuzzy search for mixins
builder.mixin(recruit.class.Vacancy, core.class.Class, core.mixin.FullTextSearchContext, {
fullTextSummary: true,
childProcessingAllowed: true,
propagate: []
})
builder.mixin(recruit.mixin.Candidate, core.class.Class, core.mixin.FullTextSearchContext, {
fullTextSummary: true,
propagate: [recruit.class.Applicant],
childProcessingAllowed: true,
propagateClasses: [
tags.class.TagReference,
chunter.class.ChatMessage,
@ -1673,6 +1675,7 @@ export function createModel (builder: Builder): void {
builder.mixin(recruit.class.Applicant, core.class.Class, core.mixin.FullTextSearchContext, {
fullTextSummary: true,
forceIndex: true,
childProcessingAllowed: true,
propagate: []
})

View File

@ -251,6 +251,7 @@ export function createModel (builder: Builder): void {
)
builder.mixin(telegram.class.Message, core.class.Class, core.mixin.FullTextSearchContext, {
parentPropagate: false
parentPropagate: false,
childProcessingAllowed: true
})
}

View File

@ -471,6 +471,8 @@ export interface FullTextSearchContext extends Class<Doc> {
// Do we need to propagate child value to parent one. Default(true)
parentPropagate?: boolean
childProcessingAllowed?: boolean
}
/**

View File

@ -54,6 +54,7 @@ import core, {
resultSort,
toFindResult
} from '@hcengineering/core'
import { PlatformError } from '@hcengineering/platform'
import { deepEqual } from 'fast-equals'
const CACHE_SIZE = 100
@ -105,7 +106,12 @@ export class LiveQuery extends TxProcessor implements Client {
if (!this.removeFromQueue(q)) {
try {
await this.refresh(q)
} catch (err) {
} catch (err: any) {
if (err instanceof PlatformError) {
if (err.message === 'connection closed') {
continue
}
}
console.error(err)
}
}
@ -114,7 +120,12 @@ export class LiveQuery extends TxProcessor implements Client {
for (const q of v) {
try {
await this.refresh(q)
} catch (err) {
} catch (err: any) {
if (err instanceof PlatformError) {
if (err.message === 'connection closed') {
continue
}
}
console.error(err)
}
}

View File

@ -19,33 +19,33 @@ import core, {
Class,
Doc,
DocIndexState,
docKey,
DocumentQuery,
FindOptions,
FindResult,
Hierarchy,
isFullTextAttribute,
isIndexedAttribute,
MeasureContext,
ObjQueryType,
Ref,
SearchOptions,
SearchQuery,
SearchResult,
ServerStorage,
toFindResult,
Tx,
TxCollectionCUD,
TxCUD,
TxCollectionCUD,
TxFactory,
TxResult,
WorkspaceId,
SearchQuery,
SearchOptions,
SearchResult
docKey,
isFullTextAttribute,
isIndexedAttribute,
toFindResult
} from '@hcengineering/core'
import { MinioService } from '@hcengineering/minio'
import { FullTextIndexPipeline } from './indexer'
import { createStateDoc, isClassIndexable } from './indexer/utils'
import { mapSearchResultDoc, getScoringConfig } from './mapper'
import type { FullTextAdapter, WithFind, IndexedDoc } from './types'
import { getScoringConfig, mapSearchResultDoc } from './mapper'
import type { FullTextAdapter, IndexedDoc, WithFind } from './types'
/**
* @public
@ -79,44 +79,58 @@ export class FullTextIndex implements WithFind {
await this.consistency
}
async tx (ctx: MeasureContext, tx: Tx): Promise<TxResult> {
let attachedTo: Ref<DocIndexState> | undefined
let attachedToClass: Ref<Class<Doc>> | undefined
if (tx._class === core.class.TxCollectionCUD) {
const txcol = tx as TxCollectionCUD<Doc, AttachedDoc>
attachedTo = txcol.objectId as Ref<DocIndexState>
attachedToClass = txcol.objectClass
tx = txcol.tx
}
if (this.hierarchy.isDerived(tx._class, core.class.TxCUD)) {
const cud = tx as TxCUD<Doc>
if (!isClassIndexable(this.hierarchy, cud.objectClass)) {
// No need, since no indixable fields or attachments.
return {}
async tx (ctx: MeasureContext, txes: Tx[]): Promise<TxResult> {
const stDocs = new Map<Ref<DocIndexState>, { create?: DocIndexState, updated: boolean, removed: boolean }>()
for (let tx of txes) {
let attachedTo: Ref<DocIndexState> | undefined
let attachedToClass: Ref<Class<Doc>> | undefined
if (tx._class === core.class.TxCollectionCUD) {
const txcol = tx as TxCollectionCUD<Doc, AttachedDoc>
attachedTo = txcol.objectId as Ref<DocIndexState>
attachedToClass = txcol.objectClass
tx = txcol.tx
}
if (this.hierarchy.isDerived(tx._class, core.class.TxCUD)) {
const cud = tx as TxCUD<Doc>
let stDoc: DocIndexState | undefined
if (cud._class === core.class.TxCreateDoc) {
// Add doc for indexing
stDoc = createStateDoc(cud.objectId, cud.objectClass, {
attributes: {},
stages: {},
attachedTo,
attachedToClass,
space: tx.objectSpace,
removed: false
})
if (!isClassIndexable(this.hierarchy, cud.objectClass)) {
// No need, since no indixable fields or attachments.
continue
}
let stDoc: DocIndexState | undefined
if (cud._class === core.class.TxCreateDoc) {
// Add doc for indexing
stDoc = createStateDoc(cud.objectId, cud.objectClass, {
attributes: {},
stages: {},
attachedTo,
attachedToClass,
space: tx.objectSpace,
removed: false
})
stDocs.set(cud.objectId as Ref<DocIndexState>, { create: stDoc, updated: false, removed: false })
} else {
const old = stDocs.get(cud.objectId as Ref<DocIndexState>)
if (cud._class === core.class.TxRemoveDoc && old?.create !== undefined) {
// Object created and deleted, skip index
continue
} else if (old !== undefined) {
// Create and update
// Skip update
continue
}
stDocs.set(cud.objectId as Ref<DocIndexState>, {
updated: cud._class !== core.class.TxRemoveDoc,
removed: cud._class === core.class.TxRemoveDoc
})
}
}
await this.indexer.queue(
cud.objectId as Ref<DocIndexState>,
cud._class === core.class.TxCreateDoc,
cud._class === core.class.TxRemoveDoc,
stDoc
)
this.indexer.triggerIndexing()
}
await ctx.with('queue', {}, async (ctx) => {
await this.indexer.queue(ctx, stDocs)
})
this.indexer.triggerIndexing()
return {}
}

View File

@ -117,9 +117,15 @@ export class IndexedFieldStage implements FullTextPipelineStage {
// Obtain real documents
const valueIds = new Map(values.map((it) => [it._id, it]))
const objClass = v as Ref<Class<Doc>>
const docs = await this.dbStorage.findAll(metrics, objClass, {
_id: { $in: Array.from(valueIds.keys()) }
})
const kids = Array.from(valueIds.keys())
const docs = await this.dbStorage.findAll(
metrics,
objClass,
{
_id: kids.length === 1 ? kids[0] : { $in: kids }
},
{ limit: kids.length }
)
const attributes = getFullTextIndexableAttributes(pipeline.hierarchy, objClass)
// Child docs.
@ -197,11 +203,21 @@ export class IndexedFieldStage implements FullTextPipelineStage {
if (propagate.length > 0) {
// We need to propagate all changes to all child's of following classes.
if (allChildDocs === undefined) {
const pc = metrics.newChild('propagate', {})
allChildDocs = await this.dbStorage.findAll(pc, core.class.DocIndexState, {
attachedTo: { $in: docs.map((it) => it._id) }
})
pc.end()
const ids = docs.map((it) => it._id)
allChildDocs = await metrics.with(
'propagate',
{},
async (ctx) =>
await this.dbStorage.findAll(
ctx,
core.class.DocIndexState,
{
attachedTo: ids.length === 1 ? ids[0] : { $in: ids }
},
{ limit: ids.length }
)
)
}
const childs = allChildDocs.filter((it) => it.attachedTo === docState._id)
for (const u of childs) {

View File

@ -125,38 +125,54 @@ export class FullTextPushStage implements FullTextPipelineStage {
) {
const attrStringValue = doc.attributes[attribute]
if (attrStringValue !== undefined && attrStringValue !== null && attrStringValue !== '') {
const refs = attrStringValue.split(',')
const refDocs = await metrics.with(
'ref-docs',
{},
async (ctx) =>
await this.dbStorage.findAll(ctx, core.class.DocIndexState, {
_id: { $in: refs }
})
)
if (refDocs.length > 0) {
refDocs.forEach((c) => {
updateDoc2Elastic(c.attributes, elasticDoc, c._id, attribute)
})
const refs: Ref<DocIndexState>[] = attrStringValue.split(',')
if (refs.length > 0) {
const refDocs = await metrics.with(
'ref-docs',
{},
async (ctx) =>
await this.dbStorage.findAll(
ctx,
core.class.DocIndexState,
{
_id: refs.length === 1 ? refs[0] : { $in: refs }
},
{ limit: refs.length }
)
)
if (refDocs.length > 0) {
for (const ref of refDocs) {
await metrics.with('updateDoc2Elastic', {}, async (ctx) => {
updateDoc2Elastic(ref.attributes, elasticDoc, ref._id, attribute)
})
}
}
}
}
}
}
}
async collect (toIndex: DocIndexState[], pipeline: FullTextPipeline, metrics: MeasureContext): Promise<void> {
async collect (toIndex: DocIndexState[], pipeline: FullTextPipeline, ctx: MeasureContext): Promise<void> {
const bulk: IndexedDoc[] = []
const part = [...toIndex]
while (part.length > 0) {
const toIndexPart = part.splice(0, 1000)
const toIndexPart = part.splice(0, 50)
const allChildDocs = await metrics.with(
const childIds = toIndexPart
.filter((it) => {
const fctx = getFullTextContext(pipeline.hierarchy, it.objectClass)
return fctx.childProcessingAllowed ?? true
})
.map((it) => it._id)
const allChildDocs = await ctx.with(
'find-child',
{},
async (ctx) =>
await this.dbStorage.findAll(ctx, core.class.DocIndexState, {
attachedTo: { $in: toIndexPart.map((it) => it._id) }
attachedTo: childIds.length === 1 ? childIds[0] : { $in: childIds }
})
)
@ -166,15 +182,19 @@ export class FullTextPushStage implements FullTextPipelineStage {
}
const elasticDoc = createElasticDoc(doc)
try {
updateDoc2Elastic(doc.attributes, elasticDoc)
await ctx.with('updateDoc2Elastic', {}, async () => {
updateDoc2Elastic(doc.attributes, elasticDoc)
})
// Include all child attributes
const childDocs = allChildDocs.filter((it) => it.attachedTo === doc._id)
if (childDocs.length > 0) {
for (const c of childDocs) {
const ctx = getFullTextContext(pipeline.hierarchy, c.objectClass)
if (ctx.parentPropagate ?? true) {
updateDoc2Elastic(c.attributes, elasticDoc, c._id)
const fctx = getFullTextContext(pipeline.hierarchy, c.objectClass)
if (fctx.parentPropagate ?? true) {
await ctx.with('updateDoc2Elastic', {}, async () => {
updateDoc2Elastic(c.attributes, elasticDoc, c._id)
})
}
}
}
@ -183,7 +203,7 @@ export class FullTextPushStage implements FullTextPipelineStage {
const propagate: Ref<Class<Doc>>[] = collectPropagate(pipeline, doc.attachedToClass)
if (propagate.some((it) => pipeline.hierarchy.isDerived(doc.objectClass, it))) {
// We need to include all parent content into this one.
;[parentDoc] = await metrics.with(
;[parentDoc] = await ctx.with(
'find-parent',
{},
async (ctx) =>
@ -192,25 +212,28 @@ export class FullTextPushStage implements FullTextPipelineStage {
})
)
if (parentDoc !== undefined) {
updateDoc2Elastic(parentDoc.attributes, elasticDoc, parentDoc._id)
const ppdoc = parentDoc
await ctx.with('updateDoc2Elastic', {}, async () => {
updateDoc2Elastic(ppdoc.attributes, elasticDoc, ppdoc._id)
})
const ctx = collectPropagateClasses(pipeline, parentDoc.objectClass)
if (ctx.length > 0) {
for (const p of ctx) {
const collections = await this.dbStorage.findAll(
metrics.newChild('propagate', {}),
core.class.DocIndexState,
{ attachedTo: parentDoc._id, objectClass: p }
)
for (const c of collections) {
const collectClasses = collectPropagateClasses(pipeline, parentDoc.objectClass)
if (collectClasses.length > 0) {
const collections = await this.dbStorage.findAll<DocIndexState>(
ctx.newChild('propagate', {}),
core.class.DocIndexState,
{ attachedTo: parentDoc._id, objectClass: { $in: collectClasses } }
)
for (const c of collections) {
await ctx.with('updateDoc2Elastic', {}, async () => {
updateDoc2Elastic(c.attributes, elasticDoc, c._id)
}
})
}
}
}
}
}
const [spaceDoc] = await metrics.with(
const [spaceDoc] = await ctx.with(
'find-space',
{},
async (ctx) =>
@ -222,7 +245,7 @@ export class FullTextPushStage implements FullTextPipelineStage {
const allAttributes = pipeline.hierarchy.getAllAttributes(elasticDoc._class)
// Include child ref attributes
await this.indexRefAttributes(allAttributes, doc, elasticDoc, metrics)
await this.indexRefAttributes(allAttributes, doc, elasticDoc, ctx)
await updateDocWithPresenter(pipeline.hierarchy, doc, elasticDoc, { parentDoc, spaceDoc })

View File

@ -30,11 +30,11 @@ import core, {
TxFactory,
WorkspaceId,
_getOperator,
docKey,
generateId,
setObjectValue,
toFindResult,
versionToString,
docKey,
generateId
versionToString
} from '@hcengineering/core'
import { DbAdapter } from '../adapter'
import { RateLimitter } from '../limitter'
@ -54,7 +54,7 @@ export * from './utils'
*/
export const globalIndexer = {
allowParallel: 2,
processingSize: 1000
processingSize: 25
}
const rateLimitter = new RateLimitter(() => ({ rate: globalIndexer.allowParallel }))
@ -171,19 +171,32 @@ export class FullTextIndexPipeline implements FullTextPipeline {
return doc
}
async queue (docId: Ref<DocIndexState>, create: boolean, removed: boolean, doc?: DocIndexState): Promise<void> {
if (doc !== undefined) {
await this.storage.upload(DOMAIN_DOC_INDEX_STATE, [doc])
async queue (
ctx: MeasureContext,
updates: Map<Ref<DocIndexState>, { create?: DocIndexState, updated: boolean, removed: boolean }>
): Promise<void> {
const entries = Array.from(updates.entries())
const uploads = entries.filter((it) => it[1].create !== undefined).map((it) => it[1].create) as DocIndexState[]
if (uploads.length > 0) {
await ctx.with('upload', {}, async () => {
await this.storage.upload(DOMAIN_DOC_INDEX_STATE, uploads)
})
}
if (!create) {
const onlyUpdates = entries.filter((it) => it[1].create === undefined)
if (onlyUpdates.length > 0) {
const ops = new Map<Ref<DocIndexState>, DocumentUpdate<DocIndexState>>()
const upd: DocumentUpdate<DocIndexState> = { removed }
for (const st of this.stages) {
;(upd as any)['stages.' + st.stageId] = false
for (const u of onlyUpdates) {
const upd: DocumentUpdate<DocIndexState> = { removed: u[1].removed }
// We need to clear only first state, to prevent multiple index operations to happen.
;(upd as any)['stages.' + this.stages[0].stageId] = false
ops.set(u[0], upd)
}
ops.set(docId, upd)
await this.storage.update(DOMAIN_DOC_INDEX_STATE, ops)
await ctx.with('upload', {}, async () => {
await this.storage.update(DOMAIN_DOC_INDEX_STATE, ops)
})
}
this.triggerIndexing()
}
@ -466,7 +479,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
// Do Indexing
this.currentStage = st
await ctx.with('collect', { collector: st.stageId }, async (ctx) => {
await ctx.with('collect-' + st.stageId, {}, async (ctx) => {
await st.collect(toIndex, this, ctx)
})
if (this.cancelling) {
@ -480,7 +493,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
const toIndex2 = this.matchStates(nst)
if (toIndex2.length > 0) {
this.currentStage = nst
await ctx.with('collect', { collector: nst.stageId }, async (ctx) => {
await ctx.with('collect-' + nst.stageId, {}, async (ctx) => {
await nst.collect(toIndex2, this, ctx)
})
}

View File

@ -108,12 +108,13 @@ export class FullSummaryStage implements FullTextPipelineStage {
while (part.length > 0) {
const toIndexPart = part.splice(0, 1000)
const kids = toIndexPart.map((it) => it._id)
const allChildDocs = await metrics.with(
'find-child',
{},
async (ctx) =>
await this.dbStorage.findAll(ctx, core.class.DocIndexState, {
attachedTo: { $in: toIndexPart.map((it) => it._id) }
attachedTo: kids.length === 1 ? kids[0] : { $in: kids }
})
)
@ -163,25 +164,24 @@ export class FullSummaryStage implements FullTextPipelineStage {
const [parentDoc] = await this.dbStorage.findAll(
metrics.newChild('propagate', {}),
core.class.DocIndexState,
{ _id: doc.attachedTo as Ref<DocIndexState> }
{ _id: doc.attachedTo as Ref<DocIndexState> },
{ limit: 1 }
)
if (parentDoc !== undefined) {
const ctx = collectPropagateClasses(pipeline, parentDoc.objectClass)
if (ctx.length > 0) {
for (const p of ctx) {
const collections = await this.dbStorage.findAll(
metrics.newChild('propagate', {}),
core.class.DocIndexState,
{ attachedTo: parentDoc._id, objectClass: p }
)
for (const c of collections) {
embeddingText +=
'\n' +
(await extractIndexedValues(c, pipeline.hierarchy, {
matchExtra: this.matchExtra,
fieldFilter: this.fieldFilter
}))
}
const collections = await this.dbStorage.findAll(
metrics.newChild('propagate', {}),
core.class.DocIndexState,
{ attachedTo: parentDoc._id, objectClass: ctx.length === 1 ? ctx[0] : { $in: ctx } }
)
for (const c of collections) {
embeddingText +=
'\n' +
(await extractIndexedValues(c, pipeline.hierarchy, {
matchExtra: this.matchExtra,
fieldFilter: this.fieldFilter
}))
}
}

View File

@ -249,7 +249,8 @@ export function getFullTextContext (
return {
fullTextSummary: false,
forceIndex: false,
propagate: []
propagate: [],
childProcessingAllowed: true
}
}

View File

@ -416,7 +416,8 @@ class TServerStorage implements ServerStorage {
if (query?.$search !== undefined) {
return await ctx.with(p + '-fulltext-find-all', {}, (ctx) => this.fulltext.findAll(ctx, clazz, query, options))
}
return await ctx.with(
const st = Date.now()
const result = await ctx.with(
p + '-find-all',
{ _class: clazz },
(ctx) => {
@ -424,6 +425,10 @@ class TServerStorage implements ServerStorage {
},
{ clazz, query, options }
)
if (Date.now() - st > 1000) {
console.error('FindAll', Date.now() - st, clazz, query, options)
}
return result
}
async searchFulltext (ctx: MeasureContext, query: SearchQuery, options: SearchOptions): Promise<SearchResult> {
@ -800,14 +805,9 @@ class TServerStorage implements ServerStorage {
derived = derived.concat(await this.processDerived(ctx, txToProcess, triggerFx, _findAll, removedMap))
// index object
for (const _tx of txToProcess) {
await ctx.with('fulltext-tx', {}, (ctx) => this.fulltext.tx(ctx, _tx))
}
// index derived objects
for (const tx of derived) {
await ctx.with('derived-processor', { _class: txClass(tx) }, (ctx) => this.fulltext.tx(ctx, tx))
}
await ctx.with('fulltext-tx', {}, async (ctx) => {
await this.fulltext.tx(ctx, [...txToProcess, ...derived])
})
for (const fx of triggerFx.effects) {
await fx()
@ -859,11 +859,6 @@ class Effects {
return [...this._effects]
}
}
function txClass (tx: Tx): Ref<Class<Tx>> {
return tx._class === core.class.TxCollectionCUD ? (tx as TxCollectionCUD<Doc, AttachedDoc>).tx._class : tx._class
}
/**
* @public
*/

View File

@ -35,6 +35,7 @@
"@hcengineering/core": "^0.6.28",
"@hcengineering/platform": "^0.6.9",
"@hcengineering/server-core": "^0.6.1",
"mongodb": "^6.3.0"
"mongodb": "^6.3.0",
"bson": "^6.3.0"
}
}

View File

@ -112,12 +112,13 @@ abstract class MongoAdapterBase implements DbAdapter {
async init (): Promise<void> {}
async toArray<T>(ctx: MeasureContext, cursor: AbstractCursor<T>, limit?: number): Promise<T[]> {
const data: T[] = []
for await (const r of cursor.stream()) {
data.push(r)
async toArray<T>(ctx: MeasureContext, cursor: AbstractCursor<T>): Promise<T[]> {
const st = Date.now()
const data = await cursor.toArray()
await cursor.close()
if (Date.now() - st > 1000) {
console.error('toArray', Date.now() - st, data.length)
}
void cursor.close()
return data
}
@ -446,8 +447,8 @@ abstract class MongoAdapterBase implements DbAdapter {
pipeline.push(match)
const resultPipeline: any[] = []
await this.fillSortPipeline(clazz, options, pipeline)
if (options?.limit !== undefined) {
resultPipeline.push({ $limit: options.limit })
if (options?.limit !== undefined || typeof query._id === 'string') {
resultPipeline.push({ $limit: options?.limit ?? 1 })
}
if (!slowPipeline) {
for (const step of steps) {
@ -479,7 +480,7 @@ abstract class MongoAdapterBase implements DbAdapter {
const result: WithLookup<T>[] = []
let total = options?.total === true ? 0 : -1
try {
const rres = await ctx.with('toArray', {}, async (ctx) => await this.toArray(ctx, cursor, options?.limit), {
const rres = await ctx.with('toArray', {}, async (ctx) => await this.toArray(ctx, cursor), {
domain,
pipeline
})
@ -596,30 +597,6 @@ abstract class MongoAdapterBase implements DbAdapter {
const coll = this.db.collection(domain)
const mongoQuery = this.translateQuery(_class, query)
// We have limit 1 or _id === exact id
if (options?.limit === 1 || typeof query._id === 'string') {
const data = await ctx.with(
'find-one',
{ _class },
async () =>
await coll.findOne<T>(mongoQuery, {
checkKeys: false,
enableUtf8Validation: false,
projection: this.calcProjection(options, _class),
sort: this.collectSort<T>(options, _class)
}),
{
_class,
mongoQuery,
domain
}
)
if (data != null) {
return toFindResult(this.stripHash([data]), 1)
}
return toFindResult([], 0)
}
let cursor = coll.find<T>(mongoQuery, {
checkKeys: false,
enableUtf8Validation: false
@ -632,24 +609,24 @@ abstract class MongoAdapterBase implements DbAdapter {
}
}
let total: number = -1
if (options !== null && options !== undefined) {
if (options != null) {
if (options.sort !== undefined) {
const sort = this.collectSort<T>(options, _class)
if (sort !== undefined) {
cursor = cursor.sort(sort)
}
}
if (options.limit !== undefined) {
if (options.limit !== undefined || typeof query._id === 'string') {
if (options.total === true) {
total = await coll.countDocuments(mongoQuery)
}
cursor = cursor.limit(options.limit)
cursor = cursor.limit(options.limit ?? 1)
}
}
// Error in case of timeout
try {
const res: T[] = await ctx.with('toArray', {}, async (ctx) => await this.toArray(ctx, cursor, options?.limit), {
const res: T[] = await ctx.with('toArray', {}, async (ctx) => await this.toArray(ctx, cursor), {
mongoQuery,
options,
domain
@ -817,8 +794,8 @@ abstract class MongoAdapterBase implements DbAdapter {
if (docs.length === 0) {
return []
}
const cursor = this.db.collection<Doc>(domain).find<Doc>({ _id: { $in: docs } })
const result = await this.toArray(new MeasureMetricsContext('', {}), cursor, docs.length)
const cursor = this.db.collection<Doc>(domain).find<Doc>({ _id: { $in: docs } }, { limit: docs.length })
const result = await this.toArray(new MeasureMetricsContext('', {}), cursor)
return this.stripHash(this.stripHash(result))
}

View File

@ -39,13 +39,12 @@ export async function shutdown (): Promise<void> {
* @public
*/
export async function getMongoClient (uri: string, options?: MongoClientOptions): Promise<MongoClient> {
const extraOptions = JSON.parse(process.env.MONGO_OPTIONS ?? '{}')
const client = await MongoClient.connect(uri, {
...options,
enableUtf8Validation: false,
maxConnecting: 1024,
minPoolSize: 128,
maxPoolSize: 512,
zlibCompressionLevel: 0
...extraOptions
})
connections.push(client)
return client