UBERF-7692: Move FindAll slow print into mongo adapter (#6152)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-07-26 21:42:14 +07:00 committed by Andrey Sobolev
parent 2f415a554c
commit 3bec396384
No known key found for this signature in database
GPG Key ID: BD80F68D68D8F7F2
17 changed files with 294 additions and 166 deletions

View File

@ -372,7 +372,8 @@ export function devTool (
.command('upgrade-workspace <name>') .command('upgrade-workspace <name>')
.description('upgrade workspace') .description('upgrade workspace')
.option('-f|--force [force]', 'Force update', true) .option('-f|--force [force]', 'Force update', true)
.action(async (workspace, cmd: { force: boolean }) => { .option('-i|--indexes [indexes]', 'Force indexes rebuild', false)
.action(async (workspace, cmd: { force: boolean, indexes: boolean }) => {
const { mongodbUri, version, txes, migrateOperations } = prepareTools() const { mongodbUri, version, txes, migrateOperations } = prepareTools()
await withDatabase(mongodbUri, async (db) => { await withDatabase(mongodbUri, async (db) => {
const info = await getWorkspaceById(db, productId, workspace) const info = await getWorkspaceById(db, productId, workspace)
@ -391,7 +392,8 @@ export function devTool (
db, db,
info.workspaceUrl ?? info.workspace, info.workspaceUrl ?? info.workspace,
consoleModelLogger, consoleModelLogger,
cmd.force cmd.force,
cmd.indexes
) )
console.log(metricsToString(measureCtx.metrics, 'upgrade', 60), {}) console.log(metricsToString(measureCtx.metrics, 'upgrade', 60), {})
console.log('upgrade done') console.log('upgrade done')

View File

@ -310,8 +310,7 @@ export function createModel (builder: Builder): void {
stages: 1, stages: 1,
_id: 1, _id: 1,
modifiedOn: 1 modifiedOn: 1
}, }
sparse: true
} }
] ]
} }

View File

@ -28,12 +28,12 @@ import {
type Doc, type Doc,
type DocumentQuery, type DocumentQuery,
type Domain, type Domain,
type IndexingConfiguration,
type Markup, type Markup,
type Ref, type Ref,
type Space, type Space,
type Timestamp, type Timestamp,
type Tx, type Tx
type IndexingConfiguration
} from '@hcengineering/core' } from '@hcengineering/core'
import { import {
ArrOf, ArrOf,
@ -69,15 +69,15 @@ import {
type NotificationObjectPresenter, type NotificationObjectPresenter,
type NotificationPreferencesGroup, type NotificationPreferencesGroup,
type NotificationPreview, type NotificationPreview,
type NotificationProvider,
type NotificationProviderDefaults,
type NotificationProviderSetting,
type NotificationStatus, type NotificationStatus,
type NotificationTemplate, type NotificationTemplate,
type NotificationType, type NotificationType,
type PushSubscription,
type PushSubscriptionKeys,
type NotificationProvider,
type NotificationProviderSetting,
type NotificationTypeSetting, type NotificationTypeSetting,
type NotificationProviderDefaults type PushSubscription,
type PushSubscriptionKeys
} from '@hcengineering/notification' } from '@hcengineering/notification'
import { type Asset, type IntlString } from '@hcengineering/platform' import { type Asset, type IntlString } from '@hcengineering/platform'
import setting from '@hcengineering/setting' import setting from '@hcengineering/setting'
@ -91,7 +91,11 @@ export { notification as default }
export const DOMAIN_NOTIFICATION = 'notification' as Domain export const DOMAIN_NOTIFICATION = 'notification' as Domain
@Model(notification.class.BrowserNotification, core.class.Doc, DOMAIN_NOTIFICATION) export const DOMAIN_DOC_NOTIFY = 'notification-dnc' as Domain
export const DOMAIN_USER_NOTIFY = 'notification-user' as Domain
@Model(notification.class.BrowserNotification, core.class.Doc, DOMAIN_USER_NOTIFY)
export class TBrowserNotification extends TDoc implements BrowserNotification { export class TBrowserNotification extends TDoc implements BrowserNotification {
senderId?: Ref<Account> | undefined senderId?: Ref<Account> | undefined
tag!: Ref<Doc<Space>> tag!: Ref<Doc<Space>>
@ -102,7 +106,7 @@ export class TBrowserNotification extends TDoc implements BrowserNotification {
status!: NotificationStatus status!: NotificationStatus
} }
@Model(notification.class.PushSubscription, core.class.Doc, DOMAIN_NOTIFICATION) @Model(notification.class.PushSubscription, core.class.Doc, DOMAIN_USER_NOTIFY)
export class TPushSubscription extends TDoc implements PushSubscription { export class TPushSubscription extends TDoc implements PushSubscription {
user!: Ref<Account> user!: Ref<Account>
endpoint!: string endpoint!: string
@ -185,7 +189,7 @@ export class TNotificationContextPresenter extends TClass implements Notificatio
labelPresenter?: AnyComponent labelPresenter?: AnyComponent
} }
@Model(notification.class.DocNotifyContext, core.class.Doc, DOMAIN_NOTIFICATION) @Model(notification.class.DocNotifyContext, core.class.Doc, DOMAIN_DOC_NOTIFY)
export class TDocNotifyContext extends TDoc implements DocNotifyContext { export class TDocNotifyContext extends TDoc implements DocNotifyContext {
@Prop(TypeRef(core.class.Account), core.string.Account) @Prop(TypeRef(core.class.Account), core.string.Account)
@Index(IndexKind.Indexed) @Index(IndexKind.Indexed)
@ -626,6 +630,49 @@ export function createModel (builder: Builder): void {
disabled: [{ modifiedOn: 1 }, { modifiedBy: 1 }, { createdBy: 1 }, { isViewed: 1 }, { hidden: 1 }] disabled: [{ modifiedOn: 1 }, { modifiedBy: 1 }, { createdBy: 1 }, { isViewed: 1 }, { hidden: 1 }]
}) })
builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, {
domain: DOMAIN_DOC_NOTIFY,
indexes: [{ keys: { user: 1 } }],
disabled: [
{ _class: 1 },
{ modifiedOn: 1 },
{ modifiedBy: 1 },
{ createdBy: 1 },
{ isViewed: 1 },
{ hidden: 1 },
{ createdOn: -1 },
{ attachedTo: 1 }
]
})
builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, {
domain: DOMAIN_USER_NOTIFY,
indexes: [{ keys: { user: 1 } }],
disabled: [
{ _class: 1 },
{ modifiedOn: 1 },
{ modifiedBy: 1 },
{ createdBy: 1 },
{ isViewed: 1 },
{ hidden: 1 },
{ createdOn: -1 },
{ attachedTo: 1 }
]
})
builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, {
domain: DOMAIN_USER_NOTIFY,
indexes: [],
disabled: [
{ _class: 1 },
{ modifiedOn: 1 },
{ modifiedBy: 1 },
{ createdBy: 1 },
{ isViewed: 1 },
{ hidden: 1 },
{ createdOn: -1 },
{ attachedTo: 1 }
]
})
builder.mixin<Class<DocNotifyContext>, IndexingConfiguration<DocNotifyContext>>( builder.mixin<Class<DocNotifyContext>, IndexingConfiguration<DocNotifyContext>>(
notification.class.DocNotifyContext, notification.class.DocNotifyContext,
core.class.Class, core.class.Class,

View File

@ -21,10 +21,15 @@ import {
type MigrationClient, type MigrationClient,
type MigrationUpgradeClient type MigrationUpgradeClient
} from '@hcengineering/model' } from '@hcengineering/model'
import notification, { notificationId, type DocNotifyContext } from '@hcengineering/notification' import notification, {
notificationId,
NotificationStatus,
type BrowserNotification,
type DocNotifyContext
} from '@hcengineering/notification'
import { DOMAIN_PREFERENCE } from '@hcengineering/preference' import { DOMAIN_PREFERENCE } from '@hcengineering/preference'
import { DOMAIN_NOTIFICATION } from './index' import { DOMAIN_DOC_NOTIFY, DOMAIN_NOTIFICATION, DOMAIN_USER_NOTIFY } from './index'
export async function removeNotifications ( export async function removeNotifications (
client: MigrationClient, client: MigrationClient,
@ -127,8 +132,46 @@ export const notificationOperation: MigrateOperation = {
{ {
state: 'migrate-setting', state: 'migrate-setting',
func: migrateSettings func: migrateSettings
},
{
state: 'move-doc-notify',
func: async (client) => {
await client.move(DOMAIN_NOTIFICATION, { _class: notification.class.DocNotifyContext }, DOMAIN_DOC_NOTIFY)
}
},
{
state: 'remove-last-view',
func: async (client) => {
await client.deleteMany(DOMAIN_NOTIFICATION, { _class: 'notification:class:LastView' as any })
}
},
{
state: 'remove-notification',
func: async (client) => {
await client.deleteMany(DOMAIN_NOTIFICATION, { _class: 'notification:class:Notification' as any })
}
},
{
state: 'remove-email-notification',
func: async (client) => {
await client.deleteMany(DOMAIN_NOTIFICATION, { _class: 'notification:class:EmailNotification' as any })
}
},
{
state: 'move-user',
func: async (client) => {
await client.move(
DOMAIN_NOTIFICATION,
{ _class: { $in: [notification.class.BrowserNotification, notification.class.PushSubscription] } },
DOMAIN_USER_NOTIFY
)
}
} }
]) ])
await client.deleteMany<BrowserNotification>(DOMAIN_USER_NOTIFY, {
_class: notification.class.BrowserNotification,
status: NotificationStatus.Notified
})
}, },
async upgrade (state: Map<string, Set<string>>, client: () => Promise<MigrationUpgradeClient>): Promise<void> {} async upgrade (state: Map<string, Set<string>>, client: () => Promise<MigrationUpgradeClient>): Promise<void> {}
} }

View File

@ -83,7 +83,7 @@
let categoryQueryOptions: Partial<FindOptions<Doc>> let categoryQueryOptions: Partial<FindOptions<Doc>>
$: categoryQueryOptions = { $: categoryQueryOptions = {
...noLookupOptions(resultOptions), ...noLookupSortingOptions(resultOptions),
projection: { projection: {
...resultOptions.projection, ...resultOptions.projection,
_id: 1, _id: 1,
@ -151,8 +151,8 @@
return newQuery return newQuery
} }
function noLookupOptions (options: FindOptions<Doc>): FindOptions<Doc> { function noLookupSortingOptions (options: FindOptions<Doc>): FindOptions<Doc> {
const { lookup, ...resultOptions } = options const { lookup, sort, ...resultOptions } = options
return resultOptions return resultOptions
} }

View File

@ -1188,7 +1188,8 @@ export async function upgradeWorkspace (
db: Db, db: Db,
workspaceUrl: string, workspaceUrl: string,
logger: ModelLogger = consoleModelLogger, logger: ModelLogger = consoleModelLogger,
forceUpdate: boolean = true forceUpdate: boolean = true,
forceIndexes: boolean = false
): Promise<string> { ): Promise<string> {
const ws = await getWorkspaceByUrl(db, productId, workspaceUrl) const ws = await getWorkspaceByUrl(db, productId, workspaceUrl)
if (ws === null) { if (ws === null) {
@ -1218,7 +1219,8 @@ export async function upgradeWorkspace (
migrationOperation, migrationOperation,
logger, logger,
false, false,
async (value) => {} async (value) => {},
forceIndexes
) )
await db.collection(WORKSPACE_COLLECTION).updateOne( await db.collection(WORKSPACE_COLLECTION).updateOne(

View File

@ -33,6 +33,7 @@ import {
type WorkspaceId type WorkspaceId
} from '@hcengineering/core' } from '@hcengineering/core'
import { type StorageAdapter } from './storage' import { type StorageAdapter } from './storage'
import type { ServerFindOptions } from './types'
export interface DomainHelperOperations { export interface DomainHelperOperations {
create: (domain: Domain) => Promise<void> create: (domain: Domain) => Promise<void>
@ -101,9 +102,7 @@ export interface DbAdapter {
ctx: MeasureContext, ctx: MeasureContext,
_class: Ref<Class<T>>, _class: Ref<Class<T>>,
query: DocumentQuery<T>, query: DocumentQuery<T>,
options?: FindOptions<T> & { options?: ServerFindOptions<T>
domain?: Domain // Allow to find for Doc's in specified domain only.
}
) => Promise<FindResult<T>> ) => Promise<FindResult<T>>
tx: (ctx: MeasureContext, ...tx: Tx[]) => Promise<TxResult[]> tx: (ctx: MeasureContext, ...tx: Tx[]) => Promise<TxResult[]>

View File

@ -362,8 +362,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
{ {
keys: { keys: {
['stages.' + st.stageId]: 1 ['stages.' + st.stageId]: 1
}, }
sparse: true
} }
] ]
}) })

View File

@ -49,7 +49,7 @@ export class DomainIndexHelperImpl implements DomainHelper {
keys: { keys: {
[a.name]: a.index === IndexKind.Indexed ? IndexOrder.Ascending : IndexOrder.Descending [a.name]: a.index === IndexKind.Indexed ? IndexOrder.Ascending : IndexOrder.Descending
}, },
sparse: true // Default to sparse indexes sparse: false // Default to non sparse indexes
}) })
} }
} }
@ -59,7 +59,7 @@ export class DomainIndexHelperImpl implements DomainHelper {
const config = hierarchy.as(c, core.mixin.IndexConfiguration) const config = hierarchy.as(c, core.mixin.IndexConfiguration)
for (const attr of config.indexes) { for (const attr of config.indexes) {
if (typeof attr === 'string') { if (typeof attr === 'string') {
domainAttrs.add({ keys: { [attr]: IndexOrder.Ascending }, sparse: true }) domainAttrs.add({ keys: { [attr]: IndexOrder.Ascending }, sparse: false })
} else { } else {
domainAttrs.add(attr) domainAttrs.add(attr)
} }

View File

@ -22,7 +22,6 @@ import core, {
DOMAIN_TX, DOMAIN_TX,
TxFactory, TxFactory,
TxProcessor, TxProcessor,
cutObjectArray,
toFindResult, toFindResult,
type Account, type Account,
type AttachedDoc, type AttachedDoc,
@ -72,6 +71,7 @@ import { type Triggers } from '../triggers'
import type { import type {
FullTextAdapter, FullTextAdapter,
ObjectDDParticipant, ObjectDDParticipant,
ServerFindOptions,
ServerStorage, ServerStorage,
ServerStorageOptions, ServerStorageOptions,
SessionContext, SessionContext,
@ -96,7 +96,7 @@ export class TServerStorage implements ServerStorage {
Domain, Domain,
{ {
exists: boolean exists: boolean
checkPromise: Promise<boolean> checkPromise: Promise<boolean> | undefined
lastCheck: number lastCheck: number
} }
>() >()
@ -196,7 +196,17 @@ export class TServerStorage implements ServerStorage {
const helper = adapter.helper?.() const helper = adapter.helper?.()
if (helper !== undefined) { if (helper !== undefined) {
let info = this.domainInfo.get(domain) let info = this.domainInfo.get(domain)
if (info == null || Date.now() - info.lastCheck > 5 * 60 * 1000) { if (info == null) {
// For first time, lets assume all is fine
info = {
exists: true,
lastCheck: Date.now(),
checkPromise: undefined
}
this.domainInfo.set(domain, info)
return adapter
}
if (Date.now() - info.lastCheck > 5 * 60 * 1000) {
// Re-check every 5 minutes // Re-check every 5 minutes
const exists = helper.exists(domain) const exists = helper.exists(domain)
// We will create necessary indexes if required, and not touch collection if not required. // We will create necessary indexes if required, and not touch collection if not required.
@ -434,10 +444,7 @@ export class TServerStorage implements ServerStorage {
ctx: MeasureContext, ctx: MeasureContext,
clazz: Ref<Class<T>>, clazz: Ref<Class<T>>,
query: DocumentQuery<T>, query: DocumentQuery<T>,
options?: FindOptions<T> & { options?: ServerFindOptions<T>
domain?: Domain // Allow to find for Doc's in specified domain only.
prefix?: string
}
): Promise<FindResult<T>> { ): Promise<FindResult<T>> {
const p = options?.prefix ?? 'client' const p = options?.prefix ?? 'client'
const domain = options?.domain ?? this.hierarchy.getDomain(clazz) const domain = options?.domain ?? this.hierarchy.getDomain(clazz)
@ -447,7 +454,6 @@ export class TServerStorage implements ServerStorage {
if (domain === DOMAIN_MODEL) { if (domain === DOMAIN_MODEL) {
return this.modelDb.findAllSync(clazz, query, options) return this.modelDb.findAllSync(clazz, query, options)
} }
const st = Date.now()
const result = await ctx.with( const result = await ctx.with(
p + '-find-all', p + '-find-all',
{ _class: clazz }, { _class: clazz },
@ -456,9 +462,6 @@ export class TServerStorage implements ServerStorage {
}, },
{ clazz, query, options } { clazz, query, options }
) )
if (Date.now() - st > 1000) {
ctx.error('FindAll', { time: Date.now() - st, clazz, query: cutObjectArray(query), options })
}
return result return result
} }

View File

@ -48,6 +48,10 @@ import { type Readable } from 'stream'
import { type ServiceAdaptersManager } from './service' import { type ServiceAdaptersManager } from './service'
import { type StorageAdapter } from './storage' import { type StorageAdapter } from './storage'
export interface ServerFindOptions<T extends Doc> extends FindOptions<T> {
domain?: Domain // Allow to find for Doc's in specified domain only.
prefix?: string
}
/** /**
* @public * @public
*/ */
@ -58,10 +62,7 @@ export interface ServerStorage extends LowLevelStorage {
ctx: MeasureContext, ctx: MeasureContext,
_class: Ref<Class<T>>, _class: Ref<Class<T>>,
query: DocumentQuery<T>, query: DocumentQuery<T>,
options?: FindOptions<T> & { options?: ServerFindOptions<T>
domain?: Domain // Allow to find for Doc's in specified domain only.
prefix?: string
}
) => Promise<FindResult<T>> ) => Promise<FindResult<T>>
searchFulltext: (ctx: MeasureContext, query: SearchQuery, options: SearchOptions) => Promise<SearchResult> searchFulltext: (ctx: MeasureContext, query: SearchQuery, options: SearchOptions) => Promise<SearchResult>
tx: (ctx: SessionOperationContext, tx: Tx) => Promise<TxResult> tx: (ctx: SessionOperationContext, tx: Tx) => Promise<TxResult>

View File

@ -27,6 +27,7 @@ describe('minio operations', () => {
} }
const toolCtx = new MeasureMetricsContext('test', {}) const toolCtx = new MeasureMetricsContext('test', {})
it('check root bucket', async () => { it('check root bucket', async () => {
jest.setTimeout(50000)
const minioService = new MinioService({ ...(config.storages[0] as MinioConfig), rootBucket: 'test-bucket' }) const minioService = new MinioService({ ...(config.storages[0] as MinioConfig), rootBucket: 'test-bucket' })
let existingTestBuckets = await minioService.listBuckets(toolCtx, '') let existingTestBuckets = await minioService.listBuckets(toolCtx, '')

View File

@ -97,6 +97,7 @@ describe('mongo operations', () => {
}) })
afterAll(async () => { afterAll(async () => {
mongoClient.close()
await shutdown() await shutdown()
}) })
@ -136,6 +137,8 @@ describe('mongo operations', () => {
await txStorage.tx(mctx, t) await txStorage.tx(mctx, t)
} }
await txStorage.close()
const conf: DbConfiguration = { const conf: DbConfiguration = {
domains: { domains: {
[DOMAIN_TX]: 'MongoTx', [DOMAIN_TX]: 'MongoTx',
@ -223,14 +226,19 @@ describe('mongo operations', () => {
it('check add', async () => { it('check add', async () => {
jest.setTimeout(500000) jest.setTimeout(500000)
const times: number[] = []
for (let i = 0; i < 50; i++) { for (let i = 0; i < 50; i++) {
const t = Date.now()
await operations.createDoc(taskPlugin.class.Task, '' as Ref<Space>, { await operations.createDoc(taskPlugin.class.Task, '' as Ref<Space>, {
name: `my-task-${i}`, name: `my-task-${i}`,
description: `${i * i}`, description: `${i * i}`,
rate: 20 + i rate: 20 + i
}) })
times.push(Date.now() - t)
} }
console.log('createDoc times', times)
const r = await client.findAll<Task>(taskPlugin.class.Task, {}) const r = await client.findAll<Task>(taskPlugin.class.Task, {})
expect(r.length).toEqual(50) expect(r.length).toEqual(50)

View File

@ -66,6 +66,7 @@ import {
updateHashForDoc, updateHashForDoc,
type DbAdapter, type DbAdapter,
type DomainHelperOperations, type DomainHelperOperations,
type ServerFindOptions,
type StorageAdapter, type StorageAdapter,
type TxAdapter type TxAdapter
} from '@hcengineering/server-core' } from '@hcengineering/server-core'
@ -75,6 +76,7 @@ import {
type AbstractCursor, type AbstractCursor,
type AnyBulkWriteOperation, type AnyBulkWriteOperation,
type Collection, type Collection,
type CreateIndexesOptions,
type Db, type Db,
type Document, type Document,
type Filter, type Filter,
@ -153,9 +155,15 @@ abstract class MongoAdapterBase implements DbAdapter {
for (const value of config.indexes) { for (const value of config.indexes) {
try { try {
if (typeof value === 'string') { if (typeof value === 'string') {
await this.collection(domain).createIndex(value, { sparse: true }) await this.collection(domain).createIndex(value)
} else { } else {
await this.collection(domain).createIndex(value.keys, { sparse: value.sparse ?? true }) const opt: CreateIndexesOptions = {}
if (value.filter !== undefined) {
opt.partialFilterExpression = value.filter
} else if (value.sparse === true) {
opt.sparse = true
}
await this.collection(domain).createIndex(value.keys, opt)
} }
} catch (err: any) { } catch (err: any) {
console.error('failed to create index', domain, value, err) console.error('failed to create index', domain, value, err)
@ -171,7 +179,7 @@ abstract class MongoAdapterBase implements DbAdapter {
const name: string = existingIndex.name const name: string = existingIndex.name
if ( if (
deletePattern.some((it) => it.test(name)) && deletePattern.some((it) => it.test(name)) &&
(existingIndex.sparse !== true || !keepPattern.some((it) => it.test(name))) (existingIndex.sparse === true || !keepPattern.some((it) => it.test(name)))
) { ) {
await this.collection(domain).dropIndex(name) await this.collection(domain).dropIndex(name)
} }
@ -469,9 +477,7 @@ abstract class MongoAdapterBase implements DbAdapter {
ctx: MeasureContext, ctx: MeasureContext,
clazz: Ref<Class<T>>, clazz: Ref<Class<T>>,
query: DocumentQuery<T>, query: DocumentQuery<T>,
options?: FindOptions<T> & { options?: ServerFindOptions<T>
domain?: Domain // Allow to find for Doc's in specified domain only.
}
): Promise<FindResult<T>> { ): Promise<FindResult<T>> {
const pipeline: any[] = [] const pipeline: any[] = []
const match = { $match: this.translateQuery(clazz, query) } const match = { $match: this.translateQuery(clazz, query) }
@ -506,10 +512,8 @@ abstract class MongoAdapterBase implements DbAdapter {
// const domain = this.hierarchy.getDomain(clazz) // const domain = this.hierarchy.getDomain(clazz)
const domain = options?.domain ?? this.hierarchy.getDomain(clazz) const domain = options?.domain ?? this.hierarchy.getDomain(clazz)
const cursor = this.collection(domain).aggregate<WithLookup<T>>(pipeline, {
checkKeys: false, const cursor = this.collection(domain).aggregate<WithLookup<T>>(pipeline)
enableUtf8Validation: false
})
let result: WithLookup<T>[] = [] let result: WithLookup<T>[] = []
let total = options?.total === true ? 0 : -1 let total = options?.total === true ? 0 : -1
try { try {
@ -662,12 +666,12 @@ abstract class MongoAdapterBase implements DbAdapter {
ctx: MeasureContext, ctx: MeasureContext,
_class: Ref<Class<T>>, _class: Ref<Class<T>>,
query: DocumentQuery<T>, query: DocumentQuery<T>,
options?: FindOptions<T> & { options?: ServerFindOptions<T>
domain?: Domain // Allow to find for Doc's in specified domain only.
}
): Promise<FindResult<T>> { ): Promise<FindResult<T>> {
const stTime = Date.now()
return await this.findRateLimit.exec(async () => { return await this.findRateLimit.exec(async () => {
return await this.collectOps( const st = Date.now()
const result = await this.collectOps(
this.globalCtx, this.globalCtx,
this.hierarchy.findDomain(_class), this.hierarchy.findDomain(_class),
'find', 'find',
@ -737,6 +741,17 @@ abstract class MongoAdapterBase implements DbAdapter {
options options
} }
) )
const edTime = Date.now()
if (edTime - st > 1000 || st - stTime > 1000) {
ctx.error('FindAll', {
time: edTime - st,
_class,
query: cutObjectArray(query),
options,
queueTime: st - stTime
})
}
return result
}) })
} }
@ -1027,97 +1042,101 @@ class MongoAdapter extends MongoAdapterBase {
return undefined return undefined
}) })
for (const [domain, txs] of byDomain) { await this.rateLimit.exec(async () => {
if (domain === undefined) { const domains: Promise<void>[] = []
continue for (const [domain, txs] of byDomain) {
} if (domain === undefined) {
const domainBulk: OperationBulk = { continue
add: [], }
update: new Map(), const domainBulk: OperationBulk = {
bulkOperations: [], add: [],
findUpdate: new Set(), update: new Map(),
raw: [] bulkOperations: [],
} findUpdate: new Set(),
for (const t of txs) { raw: []
this.updateBulk(domainBulk, t) }
} for (const t of txs) {
if ( this.updateBulk(domainBulk, t)
domainBulk.add.length === 0 && }
domainBulk.update.size === 0 && if (
domainBulk.bulkOperations.length === 0 && domainBulk.add.length === 0 &&
domainBulk.findUpdate.size === 0 && domainBulk.update.size === 0 &&
domainBulk.raw.length === 0 domainBulk.bulkOperations.length === 0 &&
) { domainBulk.findUpdate.size === 0 &&
continue domainBulk.raw.length === 0
} ) {
await this.rateLimit.exec(async () => { continue
await this.collectOps( }
this.globalCtx, domains.push(
domain, this.collectOps(
'tx', this.globalCtx,
async (ctx) => {
const coll = this.db.collection<Doc>(domain)
// Minir optimizations
// Add Remove optimization
if (domainBulk.add.length > 0) {
await ctx.with('insertMany', {}, async () => {
await coll.insertMany(domainBulk.add, { ordered: false })
})
}
if (domainBulk.update.size > 0) {
// Extract similar update to update many if possible
// TODO:
await ctx.with('updateMany-bulk', {}, async () => {
await coll.bulkWrite(
Array.from(domainBulk.update.entries()).map((it) => ({
updateOne: {
filter: { _id: it[0] },
update: {
$set: it[1]
}
}
})),
{
ordered: false
}
)
})
}
if (domainBulk.bulkOperations.length > 0) {
await ctx.with('bulkWrite', {}, async () => {
await coll.bulkWrite(domainBulk.bulkOperations, {
ordered: false
})
})
}
if (domainBulk.findUpdate.size > 0) {
await ctx.with('find-result', {}, async () => {
const docs = await coll.find({ _id: { $in: Array.from(domainBulk.findUpdate) } }).toArray()
result.push(...docs)
})
}
if (domainBulk.raw.length > 0) {
await ctx.with('raw', {}, async () => {
for (const r of domainBulk.raw) {
result.push({ object: await r() })
}
})
}
},
{
domain, domain,
add: domainBulk.add.length, 'tx',
update: domainBulk.update.size, async (ctx) => {
bulk: domainBulk.bulkOperations.length, const coll = this.db.collection<Doc>(domain)
find: domainBulk.findUpdate.size,
raw: domainBulk.raw.length // Minir optimizations
} // Add Remove optimization
if (domainBulk.add.length > 0) {
await ctx.with('insertMany', {}, async () => {
await coll.insertMany(domainBulk.add, { ordered: false })
})
}
if (domainBulk.update.size > 0) {
// Extract similar update to update many if possible
// TODO:
await ctx.with('updateMany-bulk', {}, async () => {
await coll.bulkWrite(
Array.from(domainBulk.update.entries()).map((it) => ({
updateOne: {
filter: { _id: it[0] },
update: {
$set: it[1]
}
}
})),
{
ordered: false
}
)
})
}
if (domainBulk.bulkOperations.length > 0) {
await ctx.with('bulkWrite', {}, async () => {
await coll.bulkWrite(domainBulk.bulkOperations, {
ordered: false
})
})
}
if (domainBulk.findUpdate.size > 0) {
await ctx.with('find-result', {}, async () => {
const docs = await coll.find({ _id: { $in: Array.from(domainBulk.findUpdate) } }).toArray()
result.push(...docs)
})
}
if (domainBulk.raw.length > 0) {
await ctx.with('raw', {}, async () => {
for (const r of domainBulk.raw) {
result.push({ object: await r() })
}
})
}
},
{
domain,
add: domainBulk.add.length,
update: domainBulk.update.size,
bulk: domainBulk.bulkOperations.length,
find: domainBulk.findUpdate.size,
raw: domainBulk.raw.length
}
)
) )
}) }
} await Promise.all(domains)
})
return result return result
} }

View File

@ -131,7 +131,6 @@ export function getMongoClient (uri: string, options?: MongoClientOptions): Mong
MongoClient.connect(uri, { MongoClient.connect(uri, {
appName: 'transactor', appName: 'transactor',
...options, ...options,
enableUtf8Validation: false,
...extraOptions ...extraOptions
}), }),
() => { () => {
@ -208,11 +207,10 @@ export class DBCollectionHelper implements DomainHelperOperations {
if (value.filter !== undefined) { if (value.filter !== undefined) {
await this.collection(domain).createIndex(value.keys, { await this.collection(domain).createIndex(value.keys, {
...options, ...options,
sparse: false,
partialFilterExpression: value.filter partialFilterExpression: value.filter
}) })
} else { } else {
await this.collection(domain).createIndex(value.keys, { ...options, sparse: value.sparse ?? true }) await this.collection(domain).createIndex(value.keys, { ...options, sparse: value.sparse ?? false })
} }
} }
} }

View File

@ -27,6 +27,7 @@ describe('s3 operations', () => {
} }
const toolCtx = new MeasureMetricsContext('test', {}) const toolCtx = new MeasureMetricsContext('test', {})
it('check root bucket', async () => { it('check root bucket', async () => {
jest.setTimeout(50000)
const minioService = new S3Service({ ...(config.storages[0] as S3Config), rootBucket: 'haiodo-test-bucket' }) const minioService = new S3Service({ ...(config.storages[0] as S3Config), rootBucket: 'haiodo-test-bucket' })
let existingTestBuckets = await minioService.listBuckets(toolCtx, '') let existingTestBuckets = await minioService.listBuckets(toolCtx, '')

View File

@ -249,7 +249,8 @@ export async function upgradeModel (
migrateOperations: [string, MigrateOperation][], migrateOperations: [string, MigrateOperation][],
logger: ModelLogger = consoleModelLogger, logger: ModelLogger = consoleModelLogger,
skipTxUpdate: boolean = false, skipTxUpdate: boolean = false,
progress: (value: number) => Promise<void> progress: (value: number) => Promise<void>,
forceIndexes: boolean = false
): Promise<Tx[]> { ): Promise<Tx[]> {
const { mongodbUri, txes } = prepareTools(rawTxes) const { mongodbUri, txes } = prepareTools(rawTxes)
@ -347,6 +348,25 @@ export async function upgradeModel (
workspaceId workspaceId
) )
const upgradeIndexes = async (): Promise<void> => {
ctx.info('Migrate to sparse indexes')
// Create update indexes
await createUpdateIndexes(
ctx,
hierarchy,
modelDb,
db,
logger,
async (value) => {
await progress(90 + (Math.min(value, 100) / 100) * 10)
},
workspaceId
)
}
if (forceIndexes) {
await upgradeIndexes()
}
await ctx.with('migrate', {}, async (ctx) => { await ctx.with('migrate', {}, async (ctx) => {
let i = 0 let i = 0
for (const op of migrateOperations) { for (const op of migrateOperations) {
@ -366,22 +386,8 @@ export async function upgradeModel (
await tryMigrate(migrateClient, coreId, [ await tryMigrate(migrateClient, coreId, [
{ {
state: '#sparse', state: 'sparse',
func: async () => { func: upgradeIndexes
ctx.info('Migrate to sparse indexes')
// Create update indexes
await createUpdateIndexes(
ctx,
hierarchy,
modelDb,
db,
logger,
async (value) => {
await progress(90 + (Math.min(value, 100) / 100) * 10)
},
workspaceId
)
}
} }
]) ])
}) })