UBERF-7796: Rework index creation logic (#6246)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-08-05 12:37:13 +07:00 committed by GitHub
parent b50c44f236
commit 576027f98b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 330 additions and 252 deletions

1
.vscode/launch.json vendored
View File

@ -41,6 +41,7 @@
"METRICS_FILE": "${workspaceRoot}/metrics.txt", // Show metrics in console evert 30 seconds., "METRICS_FILE": "${workspaceRoot}/metrics.txt", // Show metrics in console evert 30 seconds.,
"STORAGE_CONFIG": "minio|localhost?accessKey=minioadmin&secretKey=minioadmin", "STORAGE_CONFIG": "minio|localhost?accessKey=minioadmin&secretKey=minioadmin",
"SERVER_SECRET": "secret", "SERVER_SECRET": "secret",
"ENABLE_CONSOLE": "true",
"COLLABORATOR_URL": "ws://localhost:3078", "COLLABORATOR_URL": "ws://localhost:3078",
"COLLABORATOR_API_URL": "http://localhost:3078", "COLLABORATOR_API_URL": "http://localhost:3078",
"REKONI_URL": "http://localhost:4004", "REKONI_URL": "http://localhost:4004",

View File

@ -1 +1 @@
"0.6.271" "0.6.274"

View File

@ -329,13 +329,13 @@ export class TDocIndexState extends TDoc implements DocIndexState {
attributes!: Record<string, any> attributes!: Record<string, any>
@Prop(TypeBoolean(), getEmbeddedLabel('Removed')) @Prop(TypeBoolean(), getEmbeddedLabel('Removed'))
@Index(IndexKind.Indexed) // @Index(IndexKind.Indexed)
@Hidden() @Hidden()
removed!: boolean removed!: boolean
// States for different stages // States for different stages
@Prop(TypeRecord(), getEmbeddedLabel('Stages')) @Prop(TypeRecord(), getEmbeddedLabel('Stages'))
@Index(IndexKind.Indexed) // @Index(IndexKind.Indexed)
@Hidden() @Hidden()
stages!: Record<string, boolean | string> stages!: Record<string, boolean | string>

View File

@ -27,7 +27,6 @@ import {
type AttachedDoc, type AttachedDoc,
type Class, type Class,
type Doc, type Doc,
type DocIndexState,
type IndexingConfiguration, type IndexingConfiguration,
type TxCollectionCUD type TxCollectionCUD
} from '@hcengineering/core' } from '@hcengineering/core'
@ -284,8 +283,10 @@ export function createModel (builder: Builder): void {
builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, { builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, {
domain: DOMAIN_DOC_INDEX_STATE, domain: DOMAIN_DOC_INDEX_STATE,
indexes: [{ keys: { removed: 1 }, filter: { removed: true } }],
disabled: [ disabled: [
{ attachedToClass: 1 }, { attachedToClass: 1 },
{ objectClass: 1 },
{ stages: 1 }, { stages: 1 },
{ generationId: 1 }, { generationId: 1 },
{ space: 1 }, { space: 1 },
@ -298,24 +299,6 @@ export function createModel (builder: Builder): void {
skip: ['stages.'] skip: ['stages.']
}) })
builder.mixin<Class<DocIndexState>, IndexingConfiguration<TxCollectionCUD<Doc, AttachedDoc>>>(
core.class.DocIndexState,
core.class.Class,
core.mixin.IndexConfiguration,
{
indexes: [
{
keys: {
_class: 1,
stages: 1,
_id: 1,
modifiedOn: 1
}
}
]
}
)
builder.mixin(core.class.Space, core.class.Class, core.mixin.FullTextSearchContext, { builder.mixin(core.class.Space, core.class.Class, core.mixin.FullTextSearchContext, {
childProcessingAllowed: false childProcessingAllowed: false
}) })

View File

@ -23,7 +23,6 @@ import {
type FindOptions, type FindOptions,
type FindResult, type FindResult,
type Hierarchy, type Hierarchy,
type IndexingConfiguration,
type MeasureContext, type MeasureContext,
type ModelDb, type ModelDb,
type Ref, type Ref,
@ -38,19 +37,23 @@ import type { ServerFindOptions } from './types'
export interface DomainHelperOperations { export interface DomainHelperOperations {
create: (domain: Domain) => Promise<void> create: (domain: Domain) => Promise<void>
exists: (domain: Domain) => boolean exists: (domain: Domain) => boolean
listDomains: () => Promise<Set<Domain>>
createIndex: (domain: Domain, value: string | FieldIndexConfig<Doc>, options?: { name: string }) => Promise<void> createIndex: (domain: Domain, value: string | FieldIndexConfig<Doc>, options?: { name: string }) => Promise<void>
dropIndex: (domain: Domain, name: string) => Promise<void> dropIndex: (domain: Domain, name: string) => Promise<void>
listIndexes: (domain: Domain) => Promise<{ name: string }[]> listIndexes: (domain: Domain) => Promise<{ name: string }[]>
hasDocuments: (domain: Domain, count: number) => Promise<boolean>
// Could return 0 even if it has documents
estimatedCount: (domain: Domain) => Promise<number>
} }
export interface DomainHelper { export interface DomainHelper {
checkDomain: ( checkDomain: (
ctx: MeasureContext, ctx: MeasureContext,
domain: Domain, domain: Domain,
forceCreate: boolean, documents: number,
operations: DomainHelperOperations operations: DomainHelperOperations
) => Promise<boolean> ) => Promise<void>
} }
export interface RawDBAdapterStream<T extends Doc> { export interface RawDBAdapterStream<T extends Doc> {
@ -87,15 +90,20 @@ export interface RawDBAdapter {
close: () => Promise<void> close: () => Promise<void>
} }
export type DbAdapterHandler = (
domain: Domain,
event: 'add' | 'update' | 'delete' | 'read',
count: number,
time: number,
helper: DomainHelperOperations
) => void
/** /**
* @public * @public
*/ */
export interface DbAdapter { export interface DbAdapter {
init?: () => Promise<void> init?: () => Promise<void>
helper?: () => DomainHelperOperations helper: () => DomainHelperOperations
createIndexes: (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>) => Promise<void>
removeOldIndex: (domain: Domain, deletePattern: RegExp[], keepPattern: RegExp[]) => Promise<void>
close: () => Promise<void> close: () => Promise<void>
findAll: <T extends Doc>( findAll: <T extends Doc>(
@ -116,6 +124,9 @@ export interface DbAdapter {
// Bulk update operations // Bulk update operations
update: (ctx: MeasureContext, domain: Domain, operations: Map<Ref<Doc>, DocumentUpdate<Doc>>) => Promise<void> update: (ctx: MeasureContext, domain: Domain, operations: Map<Ref<Doc>, DocumentUpdate<Doc>>) => Promise<void>
// Allow to register a handler to listen for domain operations
on?: (handler: DbAdapterHandler) => void
} }
/** /**

View File

@ -349,23 +349,36 @@ export class FullTextIndexPipeline implements FullTextPipeline {
keepPattern.push(new RegExp(st.stageId)) keepPattern.push(new RegExp(st.stageId))
} }
} }
const helper = this.storage.helper()
if (deletePattern.length > 0) { if (deletePattern.length > 0) {
await this.storage.removeOldIndex(DOMAIN_DOC_INDEX_STATE, deletePattern, keepPattern) try {
const existingIndexes = await helper.listIndexes(DOMAIN_DOC_INDEX_STATE)
for (const existingIndex of existingIndexes) {
if (existingIndex.name !== undefined) {
const name: string = existingIndex.name
if (deletePattern.some((it) => it.test(name)) && !keepPattern.some((it) => it.test(name))) {
await helper.dropIndex(DOMAIN_DOC_INDEX_STATE, name)
}
}
}
} catch (err: any) {
console.error(err)
}
} }
for (const st of this.stages) { for (const st of this.stages) {
if (this.cancelling) { if (this.cancelling) {
return return
} }
await this.storage.createIndexes(DOMAIN_DOC_INDEX_STATE, { await this.storage.helper().createIndex(
indexes: [ DOMAIN_DOC_INDEX_STATE,
{ {
keys: { keys: {
['stages.' + st.stageId]: 1 ['stages.' + st.stageId]: 1
}
} }
] },
}) { name: 'stages.' + st.stageId + '_1' }
)
} }
} }
@ -481,7 +494,9 @@ export class FullTextIndexPipeline implements FullTextPipeline {
async (ctx) => async (ctx) =>
await this.storage.findAll(ctx, core.class.DocIndexState, q, { await this.storage.findAll(ctx, core.class.DocIndexState, q, {
sort: { modifiedOn: SortingOrder.Descending }, sort: { modifiedOn: SortingOrder.Descending },
limit: globalIndexer.processingSize limit: globalIndexer.processingSize,
skipClass: true,
skipSpace: true
}) })
) )
const toRemove: DocIndexState[] = [] const toRemove: DocIndexState[] = []
@ -594,7 +609,9 @@ export class FullTextIndexPipeline implements FullTextPipeline {
_id: 1, _id: 1,
stages: 1, stages: 1,
objectClass: 1 objectClass: 1
} },
skipSpace: true,
skipClass: true
} }
) )

View File

@ -32,7 +32,7 @@ import core, {
type TxResult, type TxResult,
type WorkspaceId type WorkspaceId
} from '@hcengineering/core' } from '@hcengineering/core'
import { type DbAdapter } from './adapter' import { type DbAdapter, type DomainHelperOperations } from './adapter'
/** /**
* @public * @public
@ -49,6 +49,18 @@ export class DummyDbAdapter implements DbAdapter {
async init (): Promise<void> {} async init (): Promise<void> {}
helper (): DomainHelperOperations {
return {
create: async () => {},
exists: () => true,
listDomains: async () => new Set(),
createIndex: async () => {},
dropIndex: async () => {},
listIndexes: async () => [],
estimatedCount: async () => 0
}
}
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {} async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {}
async removeOldIndex (domain: Domain, deletePattern: RegExp[], keepPattern: RegExp[]): Promise<void> {} async removeOldIndex (domain: Domain, deletePattern: RegExp[], keepPattern: RegExp[]): Promise<void> {}

View File

@ -74,41 +74,24 @@ export class DomainIndexHelperImpl implements DomainHelper {
} }
/** /**
* return false if and only if domain underline structures are not required. * Check if some indexes need to be created for domain.
*/ */
async checkDomain ( async checkDomain (
ctx: MeasureContext, ctx: MeasureContext,
domain: Domain, domain: Domain,
forceCreate: boolean, documents: number,
operations: DomainHelperOperations operations: DomainHelperOperations
): Promise<boolean> { ): Promise<void> {
const domainInfo = this.domains.get(domain) const domainInfo = this.domains.get(domain)
const cfg = this.domainConfigurations.find((it) => it.domain === domain) const cfg = this.domainConfigurations.find((it) => it.domain === domain)
let exists = operations.exists(domain)
const hasDocuments = exists && (await operations.hasDocuments(domain, 1))
// Drop collection if it exists and should not exists or doesn't have documents.
if (exists && (cfg?.disableCollection === true || (!hasDocuments && !forceCreate))) {
// We do not need this collection
return false
}
if (forceCreate && !exists) {
await operations.create(domain)
ctx.info('collection will be created', domain)
exists = true
}
if (!exists) {
// Do not need to create, since not force and no documents.
return false
}
const bb: (string | FieldIndexConfig<Doc>)[] = [] const bb: (string | FieldIndexConfig<Doc>)[] = []
const added = new Set<string>() const added = new Set<string>()
try { try {
const has50Documents = await operations.hasDocuments(domain, 50) const has50Documents = documents > 50
const allIndexes = (await operations.listIndexes(domain)).filter((it) => it.name !== '_id_') const allIndexes = (await operations.listIndexes(domain)).filter((it) => it.name !== '_id_')
ctx.info('check indexes', { domain, has50Documents }) ctx.info('check indexes', { domain, has50Documents, documents })
if (has50Documents) { if (has50Documents) {
for (const vv of [...(domainInfo?.values() ?? []), ...(cfg?.indexes ?? [])]) { for (const vv of [...(domainInfo?.values() ?? []), ...(cfg?.indexes ?? [])]) {
try { try {
@ -188,7 +171,5 @@ export class DomainIndexHelperImpl implements DomainHelper {
if (bb.length > 0) { if (bb.length > 0) {
ctx.info('created indexes', { domain, bb }) ctx.info('created indexes', { domain, bb })
} }
return true
} }
} }

View File

@ -167,7 +167,7 @@ export async function createServerStorage (
const domainHelper = new DomainIndexHelperImpl(metrics, hierarchy, modelDb, conf.workspace) const domainHelper = new DomainIndexHelperImpl(metrics, hierarchy, modelDb, conf.workspace)
return new TServerStorage( const serverStorage = new TServerStorage(
conf.domains, conf.domains,
conf.defaultAdapter, conf.defaultAdapter,
adapters, adapters,
@ -184,6 +184,10 @@ export async function createServerStorage (
model, model,
domainHelper domainHelper
) )
await ctx.with('init-domain-info', {}, async () => {
await serverStorage.initDomainInfo()
})
return serverStorage
} }
/** /**

View File

@ -79,6 +79,11 @@ import type {
} from '../types' } from '../types'
import { SessionContextImpl, createBroadcastEvent } from '../utils' import { SessionContextImpl, createBroadcastEvent } from '../utils'
interface DomainInfo {
exists: boolean
documents: number
}
export class TServerStorage implements ServerStorage { export class TServerStorage implements ServerStorage {
private readonly fulltext: FullTextIndex private readonly fulltext: FullTextIndex
hierarchy: Hierarchy hierarchy: Hierarchy
@ -92,14 +97,8 @@ export class TServerStorage implements ServerStorage {
liveQuery: LQ liveQuery: LQ
branding: Branding | null branding: Branding | null
domainInfo = new Map< domainInfo = new Map<Domain, DomainInfo>()
Domain, statsCtx: MeasureContext
{
exists: boolean
checkPromise: Promise<boolean> | undefined
lastCheck: number
}
>()
emptyAdapter = new DummyDbAdapter() emptyAdapter = new DummyDbAdapter()
@ -126,6 +125,71 @@ export class TServerStorage implements ServerStorage {
this.branding = options.branding this.branding = options.branding
this.setModel(model) this.setModel(model)
this.statsCtx = metrics.newChild('stats-' + this.workspaceId.name, {})
}
async initDomainInfo (): Promise<void> {
const adapterDomains = new Map<DbAdapter, Set<Domain>>()
for (const d of this.hierarchy.domains()) {
// We need to init domain info
const info = this.getDomainInfo(d)
const adapter = this.adapters.get(d) ?? this.adapters.get(this.defaultAdapter)
if (adapter !== undefined) {
const h = adapter.helper?.()
if (h !== undefined) {
const dbDomains = adapterDomains.get(adapter) ?? (await h.listDomains())
adapterDomains.set(adapter, dbDomains)
const dbIdIndex = dbDomains.has(d)
info.exists = dbIdIndex !== undefined
if (info.exists) {
info.documents = await h.estimatedCount(d)
}
} else {
info.exists = true
}
} else {
info.exists = false
}
}
for (const adapter of this.adapters.values()) {
adapter.on?.((domain, event, count, time, helper) => {
const info = this.getDomainInfo(domain)
const oldDocuments = info.documents
switch (event) {
case 'add':
info.documents += count
break
case 'update':
break
case 'delete':
info.documents -= count
break
case 'read':
break
}
if (oldDocuments < 50 && info.documents > 50) {
// We have more 50 documents, we need to check for indexes
void this.domainHelper.checkDomain(this.metrics, domain, info.documents, helper)
}
if (oldDocuments > 50 && info.documents < 50) {
// We have more 50 documents, we need to check for indexes
void this.domainHelper.checkDomain(this.metrics, domain, info.documents, helper)
}
})
}
}
private getDomainInfo (domain: Domain): DomainInfo {
let info = this.domainInfo.get(domain)
if (info === undefined) {
info = {
documents: -1,
exists: false
}
this.domainInfo.set(domain, info)
}
return info
} }
private newCastClient (hierarchy: Hierarchy, modelDb: ModelDb, metrics: MeasureContext): Client { private newCastClient (hierarchy: Hierarchy, modelDb: ModelDb, metrics: MeasureContext): Client {
@ -172,13 +236,6 @@ export class TServerStorage implements ServerStorage {
async close (): Promise<void> { async close (): Promise<void> {
await this.fulltext.close() await this.fulltext.close()
for (const [domain, info] of this.domainInfo.entries()) {
if (info.checkPromise !== undefined) {
this.metrics.info('wait for check domain', { domain })
// We need to be sure we wait for check to be complete
await info.checkPromise
}
}
for (const o of this.adapters.values()) { for (const o of this.adapters.values()) {
await o.close() await o.close()
} }
@ -193,36 +250,13 @@ export class TServerStorage implements ServerStorage {
throw new Error('adapter not provided: ' + name) throw new Error('adapter not provided: ' + name)
} }
const helper = adapter.helper?.() const info = this.getDomainInfo(domain)
if (helper !== undefined) {
let info = this.domainInfo.get(domain) if (!info.exists && !requireExists) {
if (info == null) { return this.emptyAdapter
// For first time, lets assume all is fine
info = {
exists: true,
lastCheck: Date.now(),
checkPromise: undefined
}
this.domainInfo.set(domain, info)
return adapter
}
if (Date.now() - info.lastCheck > 5 * 60 * 1000) {
// Re-check every 5 minutes
const exists = helper.exists(domain)
// We will create necessary indexes if required, and not touch collection if not required.
info = {
exists,
lastCheck: Date.now(),
checkPromise: this.domainHelper.checkDomain(this.metrics, domain, requireExists, helper)
}
this.domainInfo.set(domain, info)
}
if (!info.exists && !requireExists) {
return this.emptyAdapter
}
// If we require it exists, it will be exists
info.exists = true
} }
// If we require it exists, it will be exists
info.exists = true
return adapter return adapter
} }

View File

@ -51,6 +51,9 @@ import { type StorageAdapter } from './storage'
export interface ServerFindOptions<T extends Doc> extends FindOptions<T> { export interface ServerFindOptions<T extends Doc> extends FindOptions<T> {
domain?: Domain // Allow to find for Doc's in specified domain only. domain?: Domain // Allow to find for Doc's in specified domain only.
prefix?: string prefix?: string
skipClass?: boolean
skipSpace?: boolean
} }
/** /**
* @public * @public

View File

@ -33,7 +33,7 @@ import {
WorkspaceId WorkspaceId
} from '@hcengineering/core' } from '@hcengineering/core'
import { getMetadata } from '@hcengineering/platform' import { getMetadata } from '@hcengineering/platform'
import serverCore, { DbAdapter } from '@hcengineering/server-core' import serverCore, { DbAdapter, type DomainHelperOperations } from '@hcengineering/server-core'
function getIndexName (): string { function getIndexName (): string {
return getMetadata(serverCore.metadata.ElasticIndexName) ?? 'storage_index' return getMetadata(serverCore.metadata.ElasticIndexName) ?? 'storage_index'
@ -61,7 +61,24 @@ class ElasticDataAdapter implements DbAdapter {
this.getDocId = (fulltext) => fulltext.slice(0, -1 * (this.workspaceString.length + 1)) as Ref<Doc> this.getDocId = (fulltext) => fulltext.slice(0, -1 * (this.workspaceString.length + 1)) as Ref<Doc>
} }
async groupBy<T>(ctx: MeasureContext, domain: Domain, field: string): Promise<Set<T>> { helper (): DomainHelperOperations {
return {
create: async () => {},
exists: () => true,
listDomains: async () => new Set(),
createIndex: async () => {},
dropIndex: async () => {},
listIndexes: async () => [],
estimatedCount: async () => 0
}
}
async groupBy<T, D extends Doc = Doc>(
ctx: MeasureContext,
domain: Domain,
field: string,
query?: DocumentQuery<D>
): Promise<Set<T>> {
return new Set() return new Set()
} }

View File

@ -37,7 +37,6 @@ import core, {
type FindResult, type FindResult,
type FullParamsType, type FullParamsType,
type Hierarchy, type Hierarchy,
type IndexingConfiguration,
type Lookup, type Lookup,
type MeasureContext, type MeasureContext,
type Mixin, type Mixin,
@ -65,6 +64,7 @@ import {
estimateDocSize, estimateDocSize,
updateHashForDoc, updateHashForDoc,
type DbAdapter, type DbAdapter,
type DbAdapterHandler,
type DomainHelperOperations, type DomainHelperOperations,
type ServerFindOptions, type ServerFindOptions,
type StorageAdapter, type StorageAdapter,
@ -76,7 +76,6 @@ import {
type AbstractCursor, type AbstractCursor,
type AnyBulkWriteOperation, type AnyBulkWriteOperation,
type Collection, type Collection,
type CreateIndexesOptions,
type Db, type Db,
type Document, type Document,
type Filter, type Filter,
@ -131,6 +130,18 @@ abstract class MongoAdapterBase implements DbAdapter {
findRateLimit = new RateLimiter(parseInt(process.env.FIND_RLIMIT ?? '1000')) findRateLimit = new RateLimiter(parseInt(process.env.FIND_RLIMIT ?? '1000'))
rateLimit = new RateLimiter(parseInt(process.env.TX_RLIMIT ?? '5')) rateLimit = new RateLimiter(parseInt(process.env.TX_RLIMIT ?? '5'))
handlers: DbAdapterHandler[] = []
on (handler: DbAdapterHandler): void {
this.handlers.push(handler)
}
handleEvent (domain: Domain, event: 'add' | 'update' | 'delete' | 'read', count: number, time: number): void {
for (const handler of this.handlers) {
handler(domain, event, count, time, this._db)
}
}
constructor ( constructor (
protected readonly db: Db, protected readonly db: Db,
protected readonly hierarchy: Hierarchy, protected readonly hierarchy: Hierarchy,
@ -151,45 +162,6 @@ abstract class MongoAdapterBase implements DbAdapter {
return this._db return this._db
} }
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {
for (const value of config.indexes) {
try {
if (typeof value === 'string') {
await this.collection(domain).createIndex(value)
} else {
const opt: CreateIndexesOptions = {}
if (value.filter !== undefined) {
opt.partialFilterExpression = value.filter
} else if (value.sparse === true) {
opt.sparse = true
}
await this.collection(domain).createIndex(value.keys, opt)
}
} catch (err: any) {
console.error('failed to create index', domain, value, err)
}
}
}
async removeOldIndex (domain: Domain, deletePattern: RegExp[], keepPattern: RegExp[]): Promise<void> {
try {
const existingIndexes = await this.collection(domain).indexes()
for (const existingIndex of existingIndexes) {
if (existingIndex.name !== undefined) {
const name: string = existingIndex.name
if (
deletePattern.some((it) => it.test(name)) &&
(existingIndex.sparse === true || !keepPattern.some((it) => it.test(name)))
) {
await this.collection(domain).dropIndex(name)
}
}
}
} catch (err: any) {
console.error(err)
}
}
async tx (ctx: MeasureContext, ...tx: Tx[]): Promise<TxResult[]> { async tx (ctx: MeasureContext, ...tx: Tx[]): Promise<TxResult[]> {
return [] return []
} }
@ -198,7 +170,11 @@ abstract class MongoAdapterBase implements DbAdapter {
this.client.close() this.client.close()
} }
private translateQuery<T extends Doc>(clazz: Ref<Class<T>>, query: DocumentQuery<T>): Filter<Document> { private translateQuery<T extends Doc>(
clazz: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: ServerFindOptions<T>
): Filter<Document> {
const translated: any = {} const translated: any = {}
for (const key in query) { for (const key in query) {
const value = (query as any)[key] const value = (query as any)[key]
@ -213,6 +189,13 @@ abstract class MongoAdapterBase implements DbAdapter {
} }
translated[tkey] = value translated[tkey] = value
} }
if (options?.skipSpace === true) {
delete translated.space
}
if (options?.skipClass === true) {
delete translated._class
return translated
}
const baseClass = this.hierarchy.getBaseClass(clazz) const baseClass = this.hierarchy.getBaseClass(clazz)
if (baseClass !== core.class.Doc) { if (baseClass !== core.class.Doc) {
const classes = this.hierarchy.getDescendants(baseClass).filter((it) => !this.hierarchy.isMixin(it)) const classes = this.hierarchy.getDescendants(baseClass).filter((it) => !this.hierarchy.isMixin(it))
@ -473,12 +456,15 @@ abstract class MongoAdapterBase implements DbAdapter {
private async findWithPipeline<T extends Doc>( private async findWithPipeline<T extends Doc>(
ctx: MeasureContext, ctx: MeasureContext,
domain: Domain,
clazz: Ref<Class<T>>, clazz: Ref<Class<T>>,
query: DocumentQuery<T>, query: DocumentQuery<T>,
options?: ServerFindOptions<T> options: ServerFindOptions<T>,
stTime: number
): Promise<FindResult<T>> { ): Promise<FindResult<T>> {
const st = Date.now()
const pipeline: any[] = [] const pipeline: any[] = []
const match = { $match: this.translateQuery(clazz, query) } const match = { $match: this.translateQuery(clazz, query, options) }
const slowPipeline = isLookupQuery(query) || isLookupSort(options?.sort) const slowPipeline = isLookupQuery(query) || isLookupSort(options?.sort)
const steps = await ctx.with('get-lookups', {}, async () => await this.getLookups(clazz, options?.lookup)) const steps = await ctx.with('get-lookups', {}, async () => await this.getLookups(clazz, options?.lookup))
if (slowPipeline) { if (slowPipeline) {
@ -506,9 +492,6 @@ abstract class MongoAdapterBase implements DbAdapter {
pipeline.push({ $project: projection }) pipeline.push({ $project: projection })
} }
// const domain = this.hierarchy.getDomain(clazz)
const domain = options?.domain ?? this.hierarchy.getDomain(clazz)
const cursor = this.collection(domain).aggregate<WithLookup<T>>(pipeline) const cursor = this.collection(domain).aggregate<WithLookup<T>>(pipeline)
let result: WithLookup<T>[] = [] let result: WithLookup<T>[] = []
let total = options?.total === true ? 0 : -1 let total = options?.total === true ? 0 : -1
@ -558,6 +541,17 @@ abstract class MongoAdapterBase implements DbAdapter {
) )
total = arr?.[0]?.total ?? 0 total = arr?.[0]?.total ?? 0
} }
const edTime = Date.now()
if (edTime - stTime > 1000 || st - stTime > 1000) {
ctx.error('aggregate', {
time: edTime - stTime,
clazz,
query: cutObjectArray(query),
options,
queueTime: st - stTime
})
}
this.handleEvent(domain, 'read', result.length, edTime - st)
return toFindResult(this.stripHash(result) as T[], total) return toFindResult(this.stripHash(result) as T[], total)
} }
@ -643,7 +637,12 @@ abstract class MongoAdapterBase implements DbAdapter {
} }
@withContext('groupBy') @withContext('groupBy')
async groupBy<T>(ctx: MeasureContext, domain: Domain, field: string): Promise<Set<T>> { async groupBy<T, D extends Doc = Doc>(
ctx: MeasureContext,
domain: Domain,
field: string,
query?: DocumentQuery<D>
): Promise<Set<T>> {
const result = await ctx.with( const result = await ctx.with(
'groupBy', 'groupBy',
{ domain }, { domain },
@ -651,6 +650,7 @@ abstract class MongoAdapterBase implements DbAdapter {
const coll = this.collection(domain) const coll = this.collection(domain)
const grResult = await coll const grResult = await coll
.aggregate([ .aggregate([
...(query !== undefined ? [{ $match: query }] : []),
{ {
$group: { $group: {
_id: '$' + field _id: '$' + field
@ -716,20 +716,20 @@ abstract class MongoAdapterBase implements DbAdapter {
const stTime = Date.now() const stTime = Date.now()
return await this.findRateLimit.exec(async () => { return await this.findRateLimit.exec(async () => {
const st = Date.now() const st = Date.now()
const domain = options?.domain ?? this.hierarchy.getDomain(_class)
const result = await this.collectOps( const result = await this.collectOps(
ctx, ctx,
this.hierarchy.findDomain(_class), domain,
'find', 'find',
async (ctx) => { async (ctx) => {
const domain = options?.domain ?? this.hierarchy.getDomain(_class)
if ( if (
options != null && options != null &&
(options?.lookup != null || this.isEnumSort(_class, options) || this.isRulesSort(options)) (options?.lookup != null || this.isEnumSort(_class, options) || this.isRulesSort(options))
) { ) {
return await this.findWithPipeline(ctx, _class, query, options) return await this.findWithPipeline(ctx, domain, _class, query, options, stTime)
} }
const coll = this.collection(domain) const coll = this.collection(domain)
const mongoQuery = this.translateQuery(_class, query) const mongoQuery = this.translateQuery(_class, query, options)
if (options?.limit === 1) { if (options?.limit === 1) {
// Skip sort/projection/etc. // Skip sort/projection/etc.
@ -825,6 +825,7 @@ abstract class MongoAdapterBase implements DbAdapter {
queueTime: st - stTime queueTime: st - stTime
}) })
} }
this.handleEvent(domain, 'read', result.length, edTime - st)
return result return result
}) })
} }
@ -1122,7 +1123,6 @@ class MongoAdapter extends MongoAdapterBase {
}) })
await this.rateLimit.exec(async () => { await this.rateLimit.exec(async () => {
const domains: Promise<void>[] = []
for (const [domain, txs] of byDomain) { for (const [domain, txs] of byDomain) {
if (domain === undefined) { if (domain === undefined) {
continue continue
@ -1146,75 +1146,80 @@ class MongoAdapter extends MongoAdapterBase {
) { ) {
continue continue
} }
domains.push( await this.collectOps(
this.collectOps( ctx,
ctx, domain,
domain, 'tx',
'tx', async (ctx) => {
async (ctx) => { const coll = this.db.collection<Doc>(domain)
const coll = this.db.collection<Doc>(domain)
// Minir optimizations // Minir optimizations
// Add Remove optimization // Add Remove optimization
if (domainBulk.add.length > 0) { if (domainBulk.add.length > 0) {
await ctx.with('insertMany', {}, async () => { await ctx.with('insertMany', {}, async () => {
await coll.insertMany(domainBulk.add, { ordered: false }) const st = Date.now()
}) const result = await coll.insertMany(domainBulk.add, { ordered: false })
} this.handleEvent(domain, 'add', result.insertedCount, Date.now() - st)
if (domainBulk.update.size > 0) { })
// Extract similar update to update many if possible
// TODO:
await ctx.with('updateMany-bulk', {}, async () => {
await coll.bulkWrite(
Array.from(domainBulk.update.entries()).map((it) => ({
updateOne: {
filter: { _id: it[0] },
update: {
$set: it[1]
}
}
})),
{
ordered: false
}
)
})
}
if (domainBulk.bulkOperations.length > 0) {
await ctx.with('bulkWrite', {}, async () => {
await coll.bulkWrite(domainBulk.bulkOperations, {
ordered: false
})
})
}
if (domainBulk.findUpdate.size > 0) {
await ctx.with('find-result', {}, async () => {
const docs = await coll.find({ _id: { $in: Array.from(domainBulk.findUpdate) } }).toArray()
result.push(...docs)
})
}
if (domainBulk.raw.length > 0) {
await ctx.with('raw', {}, async () => {
for (const r of domainBulk.raw) {
result.push({ object: await r() })
}
})
}
},
{
domain,
add: domainBulk.add.length,
update: domainBulk.update.size,
bulk: domainBulk.bulkOperations.length,
find: domainBulk.findUpdate.size,
raw: domainBulk.raw.length
} }
) if (domainBulk.update.size > 0) {
// Extract similar update to update many if possible
// TODO:
await ctx.with('updateMany-bulk', {}, async () => {
const st = Date.now()
const result = await coll.bulkWrite(
Array.from(domainBulk.update.entries()).map((it) => ({
updateOne: {
filter: { _id: it[0] },
update: {
$set: it[1]
}
}
})),
{
ordered: false
}
)
this.handleEvent(domain, 'update', result.modifiedCount, Date.now() - st)
})
}
if (domainBulk.bulkOperations.length > 0) {
await ctx.with('bulkWrite', {}, async () => {
const st = Date.now()
const result = await coll.bulkWrite(domainBulk.bulkOperations, {
ordered: false
})
this.handleEvent(domain, 'update', result.modifiedCount, Date.now() - st)
})
}
if (domainBulk.findUpdate.size > 0) {
await ctx.with('find-result', {}, async () => {
const st = Date.now()
const docs = await coll.find({ _id: { $in: Array.from(domainBulk.findUpdate) } }).toArray()
result.push(...docs)
this.handleEvent(domain, 'read', docs.length, Date.now() - st)
})
}
if (domainBulk.raw.length > 0) {
await ctx.with('raw', {}, async () => {
for (const r of domainBulk.raw) {
result.push({ object: await r() })
}
})
}
},
{
domain,
add: domainBulk.add.length,
update: domainBulk.update.size,
bulk: domainBulk.bulkOperations.length,
find: domainBulk.findUpdate.size,
raw: domainBulk.raw.length
}
) )
} }
await Promise.all(domains)
}) })
return result return result
} }
@ -1395,6 +1400,7 @@ class MongoAdapter extends MongoAdapterBase {
if (tx.retrieve === true) { if (tx.retrieve === true) {
bulk.raw.push(async () => { bulk.raw.push(async () => {
const st = Date.now()
const res = await this.collection(domain).findOneAndUpdate( const res = await this.collection(domain).findOneAndUpdate(
{ _id: tx.objectId }, { _id: tx.objectId },
{ {
@ -1407,6 +1413,9 @@ class MongoAdapter extends MongoAdapterBase {
} as unknown as UpdateFilter<Document>, } as unknown as UpdateFilter<Document>,
{ returnDocument: 'after', includeResultMetadata: true } { returnDocument: 'after', includeResultMetadata: true }
) )
const dnow = Date.now() - st
this.handleEvent(domain, 'read', 1, dnow)
this.handleEvent(domain, 'update', 1, dnow)
return res.value as TxResult return res.value as TxResult
}) })
} else { } else {
@ -1459,6 +1468,7 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
if (tx.length === 0) { if (tx.length === 0) {
return [] return []
} }
const st = Date.now()
await this.collectOps( await this.collectOps(
ctx, ctx,
DOMAIN_TX, DOMAIN_TX,
@ -1468,6 +1478,7 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
}, },
{ tx: tx.length } { tx: tx.length }
) )
this.handleEvent(DOMAIN_TX, 'add', tx.length, Date.now() - st)
return [] return []
} }

View File

@ -158,6 +158,11 @@ export class DBCollectionHelper implements DomainHelperOperations {
collections = new Map<string, Collection<any>>() collections = new Map<string, Collection<any>>()
constructor (readonly db: Db) {} constructor (readonly db: Db) {}
async listDomains (): Promise<Set<Domain>> {
const collections = await this.db.listCollections({}, { nameOnly: true }).toArray()
return new Set(collections.map((it) => it.name as unknown as Domain))
}
async init (domain?: Domain): Promise<void> { async init (domain?: Domain): Promise<void> {
if (domain === undefined) { if (domain === undefined) {
// Init existing collecfions // Init existing collecfions
@ -224,7 +229,8 @@ export class DBCollectionHelper implements DomainHelperOperations {
return await this.collection(domain).listIndexes().toArray() return await this.collection(domain).listIndexes().toArray()
} }
async hasDocuments (domain: Domain, count: number): Promise<boolean> { async estimatedCount (domain: Domain): Promise<number> {
return (await this.collection(domain).countDocuments({}, { limit: count })) >= count const c = this.collection(domain)
return await c.estimatedDocumentCount()
} }
} }

View File

@ -1,6 +1,5 @@
/* eslint-disable @typescript-eslint/unbound-method */ /* eslint-disable @typescript-eslint/unbound-method */
import { type Branding, type MeasureContext, type WorkspaceId } from '@hcengineering/core' import { type Branding, type MeasureContext, type WorkspaceId } from '@hcengineering/core'
import { OpenAIEmbeddingsStage } from '@hcengineering/openai'
import { CollaborativeContentRetrievalStage } from '@hcengineering/server-collaboration' import { CollaborativeContentRetrievalStage } from '@hcengineering/server-collaboration'
import { import {
ContentRetrievalStage, ContentRetrievalStage,
@ -65,14 +64,14 @@ export function createIndexStages (
const pushStage = new FullTextPushStage(storage, adapter, workspace, branding) const pushStage = new FullTextPushStage(storage, adapter, workspace, branding)
stages.push(pushStage) stages.push(pushStage)
// OpenAI prepare stage // // OpenAI prepare stage
const openAIStage = new OpenAIEmbeddingsStage(adapter, workspace) // const openAIStage = new OpenAIEmbeddingsStage(adapter, workspace)
// We depend on all available stages. // // We depend on all available stages.
openAIStage.require = stages.map((it) => it.stageId) // openAIStage.require = stages.map((it) => it.stageId)
openAIStage.updateSummary(summaryStage) // openAIStage.updateSummary(summaryStage)
stages.push(openAIStage) // stages.push(openAIStage)
return stages return stages
} }

View File

@ -34,7 +34,12 @@ import core, {
} from '@hcengineering/core' } from '@hcengineering/core'
import { createMongoAdapter } from '@hcengineering/mongo' import { createMongoAdapter } from '@hcengineering/mongo'
import { PlatformError, unknownError } from '@hcengineering/platform' import { PlatformError, unknownError } from '@hcengineering/platform'
import { DbAdapter, StorageAdapter, type StorageAdapterEx } from '@hcengineering/server-core' import {
DbAdapter,
StorageAdapter,
type DomainHelperOperations,
type StorageAdapterEx
} from '@hcengineering/server-core'
class StorageBlobAdapter implements DbAdapter { class StorageBlobAdapter implements DbAdapter {
constructor ( constructor (
@ -53,6 +58,10 @@ class StorageBlobAdapter implements DbAdapter {
return await this.blobAdapter.findAll(ctx, _class, query, options) return await this.blobAdapter.findAll(ctx, _class, query, options)
} }
helper (): DomainHelperOperations {
return this.blobAdapter.helper()
}
async groupBy<T>(ctx: MeasureContext, domain: Domain, field: string): Promise<Set<T>> { async groupBy<T>(ctx: MeasureContext, domain: Domain, field: string): Promise<Set<T>> {
return await this.blobAdapter.groupBy(ctx, domain, field) return await this.blobAdapter.groupBy(ctx, domain, field)
} }

View File

@ -516,17 +516,7 @@ async function createUpdateIndexes (
if (domain === DOMAIN_MODEL || domain === DOMAIN_TRANSIENT || domain === DOMAIN_BENCHMARK) { if (domain === DOMAIN_MODEL || domain === DOMAIN_TRANSIENT || domain === DOMAIN_BENCHMARK) {
continue continue
} }
const result = await domainHelper.checkDomain(ctx, domain, false, dbHelper) await domainHelper.checkDomain(ctx, domain, await dbHelper.estimatedCount(domain), dbHelper)
if (!result && dbHelper.exists(domain)) {
try {
logger.log('dropping domain', { domain })
if ((await db.collection(domain).countDocuments({})) === 0) {
await db.dropCollection(domain)
}
} catch (err) {
logger.error('error: failed to delete collection', { domain, err })
}
}
completed++ completed++
await progress((100 / allDomains.length) * completed) await progress((100 / allDomains.length) * completed)
} }

View File

@ -9,7 +9,7 @@ export SERVER_SECRET=secret
# Restore workspace contents in mongo/elastic # Restore workspace contents in mongo/elastic
./tool-local.sh backup-restore ./sanity-ws sanity-ws ./tool-local.sh backup-restore ./sanity-ws sanity-ws
./tool-local.sh upgrade-workspace sanity-ws ./tool-local.sh upgrade-workspace sanity-ws --indexes
# Re-assign user to workspace. # Re-assign user to workspace.
./tool-local.sh assign-workspace user1 sanity-ws ./tool-local.sh assign-workspace user1 sanity-ws

View File

@ -3,7 +3,7 @@
# Restore workspace contents in mongo/elastic # Restore workspace contents in mongo/elastic
./tool.sh backup-restore ./sanity-ws sanity-ws ./tool.sh backup-restore ./sanity-ws sanity-ws
./tool.sh upgrade-workspace sanity-ws ./tool.sh upgrade-workspace sanity-ws --indexes
# Re-assign user to workspace. # Re-assign user to workspace.
./tool.sh assign-workspace user1 sanity-ws ./tool.sh assign-workspace user1 sanity-ws